Changeset - r28691:95aae09d5d97
[Not reviewed]
master
0 6 0
Rubidium - 3 months ago 2024-02-04 16:02:08
rubidium@openttd.org
Codechange: use std::vector for the incoming command queue
6 files changed with 37 insertions and 114 deletions:
0 comments (0 inline, 0 general)
src/network/core/tcp_game.h
Show inline comments
 
@@ -131,24 +131,12 @@ enum PacketGameType : uint8_t {
 
/** Packet that wraps a command */
 
struct CommandPacket;
 

	
 
/** A queue of CommandPackets. */
 
class CommandQueue {
 
	CommandPacket *first; ///< The first packet in the queue.
 
	CommandPacket *last;  ///< The last packet in the queue; only valid when first != nullptr.
 
	uint count;           ///< The number of items in the queue.
 

	
 
public:
 
	/** Initialise the command queue. */
 
	CommandQueue() : first(nullptr), last(nullptr), count(0) {}
 
	/** Clear the command queue. */
 
	~CommandQueue() { this->Free(); }
 
	void Append(CommandPacket *p);
 
	CommandPacket *Pop(bool ignore_paused = false);
 
	CommandPacket *Peek(bool ignore_paused = false);
 
	void Free();
 
	/** Get the number of items in the queue. */
 
	uint Count() const { return this->count; }
 
};
 
/**
 
 * A "queue" of CommandPackets.
 
 * Not a std::queue because, when paused, some commands remain on the queue.
 
 * In other words, you do not always pop the first element from this queue.
 
 */
 
using CommandQueue = std::vector<CommandPacket>;
 

	
 
/** Base socket handler for all TCP sockets */
 
class NetworkGameSocketHandler : public NetworkTCPSocketHandler {
src/network/network_client.cpp
Show inline comments
 
@@ -972,7 +972,7 @@ NetworkRecvStatus ClientNetworkGameSocke
 
		return NETWORK_RECV_STATUS_MALFORMED_PACKET;
 
	}
 

	
 
	this->incoming_queue.Append(&cp);
 
	this->incoming_queue.push_back(cp);
 

	
 
	return NETWORK_RECV_STATUS_OKAY;
 
}
src/network/network_command.cpp
Show inline comments
 
@@ -165,76 +165,6 @@ static constexpr auto _cmd_dispatch = Ma
 
#	pragma GCC diagnostic pop
 
#endif
 

	
 

	
 
/**
 
 * Append a CommandPacket at the end of the queue.
 
 * @param p The packet to append to the queue.
 
 * @note A new instance of the CommandPacket will be made.
 
 */
 
void CommandQueue::Append(CommandPacket *p)
 
{
 
	CommandPacket *add = new CommandPacket();
 
	*add = *p;
 
	add->next = nullptr;
 
	if (this->first == nullptr) {
 
		this->first = add;
 
	} else {
 
		this->last->next = add;
 
	}
 
	this->last = add;
 
	this->count++;
 
}
 

	
 
/**
 
 * Return the first item in the queue and remove it from the queue.
 
 * @param ignore_paused Whether to ignore commands that may not be executed while paused.
 
 * @return the first item in the queue.
 
 */
 
CommandPacket *CommandQueue::Pop(bool ignore_paused)
 
{
 
	CommandPacket **prev = &this->first;
 
	CommandPacket *ret = this->first;
 
	CommandPacket *prev_item = nullptr;
 
	if (ignore_paused && _pause_mode != PM_UNPAUSED) {
 
		while (ret != nullptr && !IsCommandAllowedWhilePaused(ret->cmd)) {
 
			prev_item = ret;
 
			prev = &ret->next;
 
			ret = ret->next;
 
		}
 
	}
 
	if (ret != nullptr) {
 
		if (ret == this->last) this->last = prev_item;
 
		*prev = ret->next;
 
		this->count--;
 
	}
 
	return ret;
 
}
 

	
 
/**
 
 * Return the first item in the queue, but don't remove it.
 
 * @param ignore_paused Whether to ignore commands that may not be executed while paused.
 
 * @return the first item in the queue.
 
 */
 
CommandPacket *CommandQueue::Peek(bool ignore_paused)
 
{
 
	if (!ignore_paused || _pause_mode == PM_UNPAUSED) return this->first;
 

	
 
	for (CommandPacket *p = this->first; p != nullptr; p = p->next) {
 
		if (IsCommandAllowedWhilePaused(p->cmd)) return p;
 
	}
 
	return nullptr;
 
}
 

	
 
/** Free everything that is in the queue. */
 
void CommandQueue::Free()
 
{
 
	CommandPacket *cp;
 
	while ((cp = this->Pop()) != nullptr) {
 
		delete cp;
 
	}
 
	assert(this->count == 0);
 
}
 

	
 
/** Local queue of packets waiting for handling. */
 
static CommandQueue _local_wait_queue;
 
/** Local queue of packets waiting for execution. */
 
@@ -282,7 +212,7 @@ void NetworkSendCommand(Commands cmd, St
 
		c.frame = _frame_counter_max + 1;
 
		c.my_cmd = true;
 

	
 
		_local_wait_queue.Append(&c);
 
		_local_wait_queue.push_back(c);
 
		return;
 
	}
 

	
 
@@ -303,8 +233,8 @@ void NetworkSendCommand(Commands cmd, St
 
 */
 
void NetworkSyncCommandQueue(NetworkClientSocket *cs)
 
{
 
	for (CommandPacket *p = _local_execution_queue.Peek(); p != nullptr; p = p->next) {
 
		CommandPacket &c = cs->outgoing_queue.emplace_back(*p);
 
	for (auto &p : _local_execution_queue) {
 
		CommandPacket &c = cs->outgoing_queue.emplace_back(p);
 
		c.callback = nullptr;
 
	}
 
}
 
@@ -318,8 +248,8 @@ void NetworkExecuteLocalCommandQueue()
 

	
 
	CommandQueue &queue = (_network_server ? _local_execution_queue : ClientNetworkGameSocketHandler::my_client->incoming_queue);
 

	
 
	CommandPacket *cp;
 
	while ((cp = queue.Peek()) != nullptr) {
 
	auto cp = queue.begin();
 
	for (; cp != queue.end(); cp++) {
 
		/* The queue is always in order, which means
 
		 * that the first element will be executed first. */
 
		if (_frame_counter < cp->frame) break;
 
@@ -335,11 +265,9 @@ void NetworkExecuteLocalCommandQueue()
 
		size_t cb_index = FindCallbackIndex(cp->callback);
 
		assert(cb_index < _callback_tuple_size);
 
		assert(_cmd_dispatch[cp->cmd].Unpack[cb_index] != nullptr);
 
		_cmd_dispatch[cp->cmd].Unpack[cb_index](cp);
 

	
 
		queue.Pop();
 
		delete cp;
 
		_cmd_dispatch[cp->cmd].Unpack[cb_index](&*cp);
 
	}
 
	queue.erase(queue.begin(), cp);
 

	
 
	/* Local company may have changed, so we should not restore the old value */
 
	_current_company = _local_company;
 
@@ -350,8 +278,8 @@ void NetworkExecuteLocalCommandQueue()
 
 */
 
void NetworkFreeLocalCommandQueue()
 
{
 
	_local_wait_queue.Free();
 
	_local_execution_queue.Free();
 
	_local_wait_queue.clear();
 
	_local_execution_queue.clear();
 
}
 

	
 
/**
 
@@ -376,7 +304,7 @@ static void DistributeCommandPacket(Comm
 

	
 
	cp.callback = (nullptr != owner) ? nullptr : callback;
 
	cp.my_cmd = (nullptr == owner);
 
	_local_execution_queue.Append(&cp);
 
	_local_execution_queue.push_back(cp);
 
}
 

	
 
/**
 
@@ -384,7 +312,7 @@ static void DistributeCommandPacket(Comm
 
 * @param queue The queue of commands that has to be distributed.
 
 * @param owner The client that owns the commands,
 
 */
 
static void DistributeQueue(CommandQueue *queue, const NetworkClientSocket *owner)
 
static void DistributeQueue(CommandQueue &queue, const NetworkClientSocket *owner)
 
{
 
#ifdef DEBUG_DUMP_COMMANDS
 
	/* When replaying we do not want this limitation. */
 
@@ -397,11 +325,20 @@ static void DistributeQueue(CommandQueue
 
	}
 
#endif
 

	
 
	CommandPacket *cp;
 
	while (--to_go >= 0 && (cp = queue->Pop(true)) != nullptr) {
 
	/* Not technically the most performant way, but consider clients rarely click more than once per tick. */
 
	for (auto cp = queue.begin(); cp != queue.end(); /* removing some items */) {
 
		/* Limit the number of commands per client per tick. */
 
		if (--to_go < 0) break;
 

	
 
		/* Do not distribute commands when paused and the command is not allowed while paused. */
 
		if (_pause_mode != PM_UNPAUSED && !IsCommandAllowedWhilePaused(cp->cmd)) {
 
			++cp;
 
			continue;
 
		}
 

	
 
		DistributeCommandPacket(*cp, owner);
 
		NetworkAdminCmdLogging(owner, cp);
 
		delete cp;
 
		NetworkAdminCmdLogging(owner, &*cp);
 
		cp = queue.erase(cp);
 
	}
 
}
 

	
 
@@ -409,11 +346,11 @@ static void DistributeQueue(CommandQueue
 
void NetworkDistributeCommands()
 
{
 
	/* First send the server's commands. */
 
	DistributeQueue(&_local_wait_queue, nullptr);
 
	DistributeQueue(_local_wait_queue, nullptr);
 

	
 
	/* Then send the queues of the others. */
 
	for (NetworkClientSocket *cs : NetworkClientSocket::Iterate()) {
 
		DistributeQueue(&cs->incoming_queue, cs);
 
		DistributeQueue(cs->incoming_queue, cs);
 
	}
 
}
 

	
src/network/network_internal.h
Show inline comments
 
@@ -107,9 +107,7 @@ void UpdateNetworkGameWindow();
 
 * Everything we need to know about a command to be able to execute it.
 
 */
 
struct CommandPacket {
 
	/** Make sure the pointer is nullptr. */
 
	CommandPacket() : next(nullptr), company(INVALID_COMPANY), frame(0), my_cmd(false) {}
 
	CommandPacket *next; ///< the next command packet (if in queue)
 
	CommandPacket() : company(INVALID_COMPANY), frame(0), my_cmd(false) {}
 
	CompanyID company;   ///< company that is executing the command
 
	uint32_t frame;        ///< the frame in which this packet is executed
 
	bool my_cmd;         ///< did the command originate from "me"
src/network/network_server.cpp
Show inline comments
 
@@ -1060,7 +1060,7 @@ NetworkRecvStatus ServerNetworkGameSocke
 
		return this->SendError(NETWORK_ERROR_NOT_EXPECTED);
 
	}
 

	
 
	if (this->incoming_queue.Count() >= _settings_client.network.max_commands_in_queue) {
 
	if (this->incoming_queue.size() >= _settings_client.network.max_commands_in_queue) {
 
		return this->SendError(NETWORK_ERROR_TOO_MANY_COMMANDS);
 
	}
 

	
 
@@ -1115,7 +1115,7 @@ NetworkRecvStatus ServerNetworkGameSocke
 

	
 
	if (GetCommandFlags(cp.cmd) & CMD_CLIENT_ID) NetworkReplaceCommandClientId(cp, this->client_id);
 

	
 
	this->incoming_queue.Append(&cp);
 
	this->incoming_queue.push_back(cp);
 
	return NETWORK_RECV_STATUS_OKAY;
 
}
 

	
src/network/network_server.h
Show inline comments
 
@@ -66,7 +66,7 @@ public:
 
	byte last_token;             ///< The last random token we did send to verify the client is listening
 
	uint32_t last_token_frame;     ///< The last frame we received the right token
 
	ClientStatus status;         ///< Status of this client
 
	std::vector<CommandPacket> outgoing_queue; ///< The command-queue awaiting delivery; conceptually more a bucket to gather commands in, after which the whole bucket is sent to the client.
 
	CommandQueue outgoing_queue; ///< The command-queue awaiting delivery; conceptually more a bucket to gather commands in, after which the whole bucket is sent to the client.
 
	size_t receive_limit;        ///< Amount of bytes that we can receive at this moment
 

	
 
	std::shared_ptr<struct PacketWriter> savegame; ///< Writer used to write the savegame.
0 comments (0 inline, 0 general)