Changeset - r28681:3e90fe92bfc5
[Not reviewed]
master
0 5 0
Rubidium - 10 months ago 2024-02-03 18:35:36
rubidium@openttd.org
Codechange: use std::deque of std::unique_ptr to queue packets
5 files changed with 32 insertions and 99 deletions:
0 comments (0 inline, 0 general)
src/network/core/packet.cpp
Show inline comments
 
@@ -28,7 +28,7 @@
 
 *                          loose some the data of the packet, so there you pass the maximum
 
 *                          size for the packet you expect from the network.
 
 */
 
Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size) : next(nullptr), pos(0), limit(limit)
 
Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size) : pos(0), limit(limit)
 
{
 
	assert(cs != nullptr);
 

	
 
@@ -44,45 +44,20 @@ Packet::Packet(NetworkSocketHandler *cs,
 
 *              the limit as it might break things if the other side is not expecting
 
 *              much larger packets than what they support.
 
 */
 
Packet::Packet(PacketType type, size_t limit) : next(nullptr), pos(0), limit(limit), cs(nullptr)
 
Packet::Packet(PacketType type, size_t limit) : pos(0), limit(limit), cs(nullptr)
 
{
 
	/* Allocate space for the the size so we can write that in just before sending the packet. */
 
	this->Send_uint16(0);
 
	this->Send_uint8(type);
 
}
 

	
 
/**
 
 * Add the given Packet to the end of the queue of packets.
 
 * @param queue  The pointer to the begin of the queue.
 
 * @param packet The packet to append to the queue.
 
 */
 
/* static */ void Packet::AddToQueue(Packet **queue, Packet *packet)
 
{
 
	while (*queue != nullptr) queue = &(*queue)->next;
 
	*queue = packet;
 
}
 

	
 
/**
 
 * Pop the packet from the begin of the queue and set the
 
 * begin of the queue to the second element in the queue.
 
 * @param queue  The pointer to the begin of the queue.
 
 * @return The Packet that used to be a the begin of the queue.
 
 */
 
/* static */ Packet *Packet::PopFromQueue(Packet **queue)
 
{
 
	Packet *p = *queue;
 
	*queue = p->next;
 
	p->next = nullptr;
 
	return p;
 
}
 

	
 

	
 
/**
 
 * Writes the packet size from the raw packet from packet->size
 
 */
 
void Packet::PrepareToSend()
 
{
 
	assert(this->cs == nullptr && this->next == nullptr);
 
	assert(this->cs == nullptr);
 

	
 
	this->buffer[0] = GB(this->Size(), 0, 8);
 
	this->buffer[1] = GB(this->Size(), 8, 8);
 
@@ -268,7 +243,7 @@ size_t Packet::Size() const
 
 */
 
bool Packet::ParsePacketSize()
 
{
 
	assert(this->cs != nullptr && this->next == nullptr);
 
	assert(this->cs != nullptr);
 
	size_t size = (size_t)this->buffer[0];
 
	size       += (size_t)this->buffer[1] << 8;
 

	
src/network/core/packet.h
Show inline comments
 
@@ -41,8 +41,6 @@ typedef uint8_t  PacketType; ///< Identi
 
 */
 
struct Packet {
 
private:
 
	/** The next packet. Used for queueing packets before sending. */
 
	Packet *next;
 
	/** The current read/write position in the packet */
 
	PacketSize pos;
 
	/** The buffer of this packet. */
 
@@ -57,9 +55,6 @@ public:
 
	Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size = sizeof(PacketSize));
 
	Packet(PacketType type, size_t limit = COMPAT_MTU);
 

	
 
	static void AddToQueue(Packet **queue, Packet *packet);
 
	static Packet *PopFromQueue(Packet **queue);
 

	
 
	/* Sending/writing of packets */
 
	void PrepareToSend();
 

	
src/network/core/tcp.cpp
Show inline comments
 
@@ -22,29 +22,16 @@
 
 */
 
NetworkTCPSocketHandler::NetworkTCPSocketHandler(SOCKET s) :
 
		NetworkSocketHandler(),
 
		packet_queue(nullptr), packet_recv(nullptr),
 
		sock(s), writable(false)
 
{
 
}
 

	
 
NetworkTCPSocketHandler::~NetworkTCPSocketHandler()
 
{
 
	this->EmptyPacketQueue();
 
	this->CloseSocket();
 
}
 

	
 
/**
 
 * Free all pending and partially received packets.
 
 */
 
void NetworkTCPSocketHandler::EmptyPacketQueue()
 
{
 
	while (this->packet_queue != nullptr) {
 
		delete Packet::PopFromQueue(&this->packet_queue);
 
	}
 
	this->packet_recv = nullptr;
 
}
 

	
 
/**
 
 * Close the actual socket of the connection.
 
 * Please make sure CloseConnection is called before CloseSocket, as
 
 * otherwise not all resources might be released.
 
@@ -66,7 +53,8 @@ NetworkRecvStatus NetworkTCPSocketHandle
 
	this->MarkClosed();
 
	this->writable = false;
 

	
 
	this->EmptyPacketQueue();
 
	this->packet_queue.clear();
 
	this->packet_recv = nullptr;
 

	
 
	return NETWORK_RECV_STATUS_OKAY;
 
}
 
@@ -82,7 +70,7 @@ void NetworkTCPSocketHandler::SendPacket
 
	assert(packet != nullptr);
 

	
 
	packet->PrepareToSend();
 
	Packet::AddToQueue(&this->packet_queue, packet);
 
	this->packet_queue.push_back(std::unique_ptr<Packet>(packet));
 
}
 

	
 
/**
 
@@ -97,15 +85,13 @@ void NetworkTCPSocketHandler::SendPacket
 
 */
 
SendPacketsState NetworkTCPSocketHandler::SendPackets(bool closing_down)
 
{
 
	ssize_t res;
 
	Packet *p;
 

	
 
	/* We can not write to this socket!! */
 
	if (!this->writable) return SPS_NONE_SENT;
 
	if (!this->IsConnected()) return SPS_CLOSED;
 

	
 
	while ((p = this->packet_queue) != nullptr) {
 
		res = p->TransferOut<int>(send, this->sock, 0);
 
	while (!this->packet_queue.empty()) {
 
		Packet *p = this->packet_queue.front().get();
 
		ssize_t res = p->TransferOut<int>(send, this->sock, 0);
 
		if (res == -1) {
 
			NetworkError err = NetworkError::GetLast();
 
			if (!err.WouldBlock()) {
 
@@ -127,7 +113,7 @@ SendPacketsState NetworkTCPSocketHandler
 
		/* Is this packet sent? */
 
		if (p->RemainingBytesToTransfer() == 0) {
 
			/* Go to the next packet */
 
			delete Packet::PopFromQueue(&this->packet_queue);
 
			this->packet_queue.pop_front();
 
		} else {
 
			return SPS_PARTLY_SENT;
 
		}
src/network/core/tcp.h
Show inline comments
 
@@ -30,7 +30,7 @@ enum SendPacketsState {
 
/** Base socket handler for all TCP sockets */
 
class NetworkTCPSocketHandler : public NetworkSocketHandler {
 
private:
 
	Packet *packet_queue;     ///< Packets that are awaiting delivery
 
	std::deque<std::unique_ptr<Packet>> packet_queue; ///< Packets that are awaiting delivery. Cannot be std::queue as that does not have a clear() function.
 
	std::unique_ptr<Packet> packet_recv; ///< Partially received packet
 

	
 
	void EmptyPacketQueue();
 
@@ -58,7 +58,7 @@ public:
 
	 * Whether there is something pending in the send queue.
 
	 * @return true when something is pending in the send queue.
 
	 */
 
	bool HasSendQueue() { return this->packet_queue != nullptr; }
 
	bool HasSendQueue() { return !this->packet_queue.empty(); }
 

	
 
	NetworkTCPSocketHandler(SOCKET s = INVALID_SOCKET);
 
	~NetworkTCPSocketHandler();
src/network/network_server.cpp
Show inline comments
 
@@ -61,9 +61,9 @@ template SocketList TCPListenHandler<Ser
 
/** Writing a savegame directly to a number of packets. */
 
struct PacketWriter : SaveFilter {
 
	ServerNetworkGameSocketHandler *cs; ///< Socket we are associated with.
 
	Packet *current;                    ///< The packet we're currently writing to.
 
	std::unique_ptr<Packet> current; ///< The packet we're currently writing to.
 
	size_t total_size;                  ///< Total size of the compressed savegame.
 
	Packet *packets;                    ///< Packet queue of the savegame; send these "slowly" to the client.
 
	std::deque<std::unique_ptr<Packet>> packets; ///< Packet queue of the savegame; send these "slowly" to the client. Cannot be a std::queue as we want to push the map size packet in front of the data packets.
 
	std::mutex mutex;                   ///< Mutex for making threaded saving safe.
 
	std::condition_variable exit_sig;   ///< Signal for threaded destruction of this packet writer.
 

	
 
@@ -71,7 +71,7 @@ struct PacketWriter : SaveFilter {
 
	 * Create the packet writer.
 
	 * @param cs The socket handler we're making the packets for.
 
	 */
 
	PacketWriter(ServerNetworkGameSocketHandler *cs) : SaveFilter(nullptr), cs(cs), current(nullptr), total_size(0), packets(nullptr)
 
	PacketWriter(ServerNetworkGameSocketHandler *cs) : SaveFilter(nullptr), cs(cs), total_size(0)
 
	{
 
	}
 

	
 
@@ -84,11 +84,9 @@ struct PacketWriter : SaveFilter {
 

	
 
		/* This must all wait until the Destroy function is called. */
 

	
 
		while (this->packets != nullptr) {
 
			delete Packet::PopFromQueue(&this->packets);
 
		}
 

	
 
		delete this->current;
 
		Debug(net, 0, "Destruct!");
 
		this->packets.clear();
 
		this->current = nullptr;
 
	}
 

	
 
	/**
 
@@ -125,14 +123,14 @@ struct PacketWriter : SaveFilter {
 
	bool TransferToNetworkQueue(ServerNetworkGameSocketHandler *socket)
 
	{
 
		/* Unsafe check for the queue being empty or not. */
 
		if (this->packets == nullptr) return false;
 
		if (this->packets.empty()) return false;
 

	
 
		std::lock_guard<std::mutex> lock(this->mutex);
 

	
 
		while (this->packets != nullptr) {
 
			Packet *p = Packet::PopFromQueue(&this->packets);
 
			bool last_packet = p->GetPacketType() == PACKET_SERVER_MAP_DONE;
 
			socket->SendPacket(p);
 
		while (!this->packets.empty()) {
 
			bool last_packet = this->packets.front()->GetPacketType() == PACKET_SERVER_MAP_DONE;
 
			socket->SendPacket(this->packets.front().release());
 
			this->packets.pop_front();
 

	
 
			if (last_packet) return true;
 
		}
 
@@ -140,32 +138,12 @@ struct PacketWriter : SaveFilter {
 
		return false;
 
	}
 

	
 
	/** Append the current packet to the queue. */
 
	void AppendQueue()
 
	{
 
		if (this->current == nullptr) return;
 

	
 
		Packet::AddToQueue(&this->packets, this->current);
 
		this->current = nullptr;
 
	}
 

	
 
	/** Prepend the current packet to the queue. */
 
	void PrependQueue()
 
	{
 
		if (this->current == nullptr) return;
 

	
 
		/* Reversed from AppendQueue so the queue gets added to the current one. */
 
		Packet::AddToQueue(&this->current, this->packets);
 
		this->packets = this->current;
 
		this->current = nullptr;
 
	}
 

	
 
	void Write(byte *buf, size_t size) override
 
	{
 
		/* We want to abort the saving when the socket is closed. */
 
		if (this->cs == nullptr) SlError(STR_NETWORK_ERROR_LOSTCONNECTION);
 

	
 
		if (this->current == nullptr) this->current = new Packet(PACKET_SERVER_MAP_DATA, TCP_MTU);
 
		if (this->current == nullptr) this->current = std::make_unique<Packet>(PACKET_SERVER_MAP_DATA, TCP_MTU);
 

	
 
		std::lock_guard<std::mutex> lock(this->mutex);
 

	
 
@@ -175,8 +153,8 @@ struct PacketWriter : SaveFilter {
 
			buf += written;
 

	
 
			if (!this->current->CanWriteToPacket(1)) {
 
				this->AppendQueue();
 
				if (buf != bufe) this->current = new Packet(PACKET_SERVER_MAP_DATA, TCP_MTU);
 
				this->packets.push_back(std::move(this->current));
 
				if (buf != bufe) this->current = std::make_unique<Packet>(PACKET_SERVER_MAP_DATA, TCP_MTU);
 
			}
 
		}
 

	
 
@@ -191,16 +169,15 @@ struct PacketWriter : SaveFilter {
 
		std::lock_guard<std::mutex> lock(this->mutex);
 

	
 
		/* Make sure the last packet is flushed. */
 
		this->AppendQueue();
 
		if (this->current != nullptr) this->packets.push_back(std::move(this->current));
 

	
 
		/* Add a packet stating that this is the end to the queue. */
 
		this->current = new Packet(PACKET_SERVER_MAP_DONE);
 
		this->AppendQueue();
 
		this->packets.push_back(std::make_unique<Packet>(PACKET_SERVER_MAP_DONE));
 

	
 
		/* Fast-track the size to the client. */
 
		this->current = new Packet(PACKET_SERVER_MAP_SIZE);
 
		this->current->Send_uint32((uint32_t)this->total_size);
 
		this->PrependQueue();
 
		auto p = std::make_unique<Packet>(PACKET_SERVER_MAP_SIZE);
 
		p->Send_uint32((uint32_t)this->total_size);
 
		this->packets.push_front(std::move(p));
 
	}
 
};
 

	
0 comments (0 inline, 0 general)