Changeset - r28681:3e90fe92bfc5
[Not reviewed]
master
0 5 0
Rubidium - 4 months ago 2024-02-03 18:35:36
rubidium@openttd.org
Codechange: use std::deque of std::unique_ptr to queue packets
5 files changed with 32 insertions and 99 deletions:
0 comments (0 inline, 0 general)
src/network/core/packet.cpp
Show inline comments
 
@@ -25,13 +25,13 @@
 
 *                          packet's size. That default is the wanted for streams such as TCP
 
 *                          as you do not want to read data of the next packet yet. For UDP
 
 *                          you need to read the whole packet at once otherwise you might
 
 *                          loose some the data of the packet, so there you pass the maximum
 
 *                          size for the packet you expect from the network.
 
 */
 
Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size) : next(nullptr), pos(0), limit(limit)
 
Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size) : pos(0), limit(limit)
 
{
 
	assert(cs != nullptr);
 

	
 
	this->cs = cs;
 
	this->buffer.resize(initial_read_size);
 
}
 
@@ -41,51 +41,26 @@ Packet::Packet(NetworkSocketHandler *cs,
 
 * @param type  The type of the packet to send
 
 * @param limit The maximum number of bytes the packet may have. Default is COMPAT_MTU.
 
 *              Be careful of compatibility with older clients/servers when changing
 
 *              the limit as it might break things if the other side is not expecting
 
 *              much larger packets than what they support.
 
 */
 
Packet::Packet(PacketType type, size_t limit) : next(nullptr), pos(0), limit(limit), cs(nullptr)
 
Packet::Packet(PacketType type, size_t limit) : pos(0), limit(limit), cs(nullptr)
 
{
 
	/* Allocate space for the the size so we can write that in just before sending the packet. */
 
	this->Send_uint16(0);
 
	this->Send_uint8(type);
 
}
 

	
 
/**
 
 * Add the given Packet to the end of the queue of packets.
 
 * @param queue  The pointer to the begin of the queue.
 
 * @param packet The packet to append to the queue.
 
 */
 
/* static */ void Packet::AddToQueue(Packet **queue, Packet *packet)
 
{
 
	while (*queue != nullptr) queue = &(*queue)->next;
 
	*queue = packet;
 
}
 

	
 
/**
 
 * Pop the packet from the begin of the queue and set the
 
 * begin of the queue to the second element in the queue.
 
 * @param queue  The pointer to the begin of the queue.
 
 * @return The Packet that used to be a the begin of the queue.
 
 */
 
/* static */ Packet *Packet::PopFromQueue(Packet **queue)
 
{
 
	Packet *p = *queue;
 
	*queue = p->next;
 
	p->next = nullptr;
 
	return p;
 
}
 

	
 

	
 
/**
 
 * Writes the packet size from the raw packet from packet->size
 
 */
 
void Packet::PrepareToSend()
 
{
 
	assert(this->cs == nullptr && this->next == nullptr);
 
	assert(this->cs == nullptr);
 

	
 
	this->buffer[0] = GB(this->Size(), 0, 8);
 
	this->buffer[1] = GB(this->Size(), 8, 8);
 

	
 
	this->pos  = 0; // We start reading from here
 
	this->buffer.shrink_to_fit();
 
@@ -265,13 +240,13 @@ size_t Packet::Size() const
 
/**
 
 * Reads the packet size from the raw packet and stores it in the packet->size
 
 * @return True iff the packet size seems plausible.
 
 */
 
bool Packet::ParsePacketSize()
 
{
 
	assert(this->cs != nullptr && this->next == nullptr);
 
	assert(this->cs != nullptr);
 
	size_t size = (size_t)this->buffer[0];
 
	size       += (size_t)this->buffer[1] << 8;
 

	
 
	/* If the size of the packet is less than the bytes required for the size and type of
 
	 * the packet, or more than the allowed limit, then something is wrong with the packet.
 
	 * In those cases the packet can generally be regarded as containing garbage data. */
src/network/core/packet.h
Show inline comments
 
@@ -38,14 +38,12 @@ typedef uint8_t  PacketType; ///< Identi
 
 *      Thus, the length of the strings is not sent.
 
 *  - years that are leap years in the 'days since X' to 'date' calculations:
 
 *     (year % 4 == 0) and ((year % 100 != 0) or (year % 400 == 0))
 
 */
 
struct Packet {
 
private:
 
	/** The next packet. Used for queueing packets before sending. */
 
	Packet *next;
 
	/** The current read/write position in the packet */
 
	PacketSize pos;
 
	/** The buffer of this packet. */
 
	std::vector<byte> buffer;
 
	/** The limit for the packet size. */
 
	size_t limit;
 
@@ -54,15 +52,12 @@ private:
 
	NetworkSocketHandler *cs;
 

	
 
public:
 
	Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size = sizeof(PacketSize));
 
	Packet(PacketType type, size_t limit = COMPAT_MTU);
 

	
 
	static void AddToQueue(Packet **queue, Packet *packet);
 
	static Packet *PopFromQueue(Packet **queue);
 

	
 
	/* Sending/writing of packets */
 
	void PrepareToSend();
 

	
 
	bool   CanWriteToPacket(size_t bytes_to_write);
 
	void   Send_bool  (bool   data);
 
	void   Send_uint8 (uint8_t  data);
src/network/core/tcp.cpp
Show inline comments
 
@@ -19,35 +19,22 @@
 
/**
 
 * Construct a socket handler for a TCP connection.
 
 * @param s The just opened TCP connection.
 
 */
 
NetworkTCPSocketHandler::NetworkTCPSocketHandler(SOCKET s) :
 
		NetworkSocketHandler(),
 
		packet_queue(nullptr), packet_recv(nullptr),
 
		sock(s), writable(false)
 
{
 
}
 

	
 
NetworkTCPSocketHandler::~NetworkTCPSocketHandler()
 
{
 
	this->EmptyPacketQueue();
 
	this->CloseSocket();
 
}
 

	
 
/**
 
 * Free all pending and partially received packets.
 
 */
 
void NetworkTCPSocketHandler::EmptyPacketQueue()
 
{
 
	while (this->packet_queue != nullptr) {
 
		delete Packet::PopFromQueue(&this->packet_queue);
 
	}
 
	this->packet_recv = nullptr;
 
}
 

	
 
/**
 
 * Close the actual socket of the connection.
 
 * Please make sure CloseConnection is called before CloseSocket, as
 
 * otherwise not all resources might be released.
 
 */
 
void NetworkTCPSocketHandler::CloseSocket()
 
{
 
@@ -63,13 +50,14 @@ void NetworkTCPSocketHandler::CloseSocke
 
 */
 
NetworkRecvStatus NetworkTCPSocketHandler::CloseConnection([[maybe_unused]] bool error)
 
{
 
	this->MarkClosed();
 
	this->writable = false;
 

	
 
	this->EmptyPacketQueue();
 
	this->packet_queue.clear();
 
	this->packet_recv = nullptr;
 

	
 
	return NETWORK_RECV_STATUS_OKAY;
 
}
 

	
 
/**
 
 * This function puts the packet in the send-queue and it is send as
 
@@ -79,13 +67,13 @@ NetworkRecvStatus NetworkTCPSocketHandle
 
 */
 
void NetworkTCPSocketHandler::SendPacket(Packet *packet)
 
{
 
	assert(packet != nullptr);
 

	
 
	packet->PrepareToSend();
 
	Packet::AddToQueue(&this->packet_queue, packet);
 
	this->packet_queue.push_back(std::unique_ptr<Packet>(packet));
 
}
 

	
 
/**
 
 * Sends all the buffered packets out for this client. It stops when:
 
 *   1) all packets are send (queue is empty)
 
 *   2) the OS reports back that it can not send any more
 
@@ -94,21 +82,19 @@ void NetworkTCPSocketHandler::SendPacket
 
 * @param closing_down Whether we are closing down the connection.
 
 * @return \c true if a (part of a) packet could be sent and
 
 *         the connection is not closed yet.
 
 */
 
SendPacketsState NetworkTCPSocketHandler::SendPackets(bool closing_down)
 
{
 
	ssize_t res;
 
	Packet *p;
 

	
 
	/* We can not write to this socket!! */
 
	if (!this->writable) return SPS_NONE_SENT;
 
	if (!this->IsConnected()) return SPS_CLOSED;
 

	
 
	while ((p = this->packet_queue) != nullptr) {
 
		res = p->TransferOut<int>(send, this->sock, 0);
 
	while (!this->packet_queue.empty()) {
 
		Packet *p = this->packet_queue.front().get();
 
		ssize_t res = p->TransferOut<int>(send, this->sock, 0);
 
		if (res == -1) {
 
			NetworkError err = NetworkError::GetLast();
 
			if (!err.WouldBlock()) {
 
				/* Something went wrong.. close client! */
 
				if (!closing_down) {
 
					Debug(net, 0, "Send failed: {}", err.AsString());
 
@@ -124,13 +110,13 @@ SendPacketsState NetworkTCPSocketHandler
 
			return SPS_CLOSED;
 
		}
 

	
 
		/* Is this packet sent? */
 
		if (p->RemainingBytesToTransfer() == 0) {
 
			/* Go to the next packet */
 
			delete Packet::PopFromQueue(&this->packet_queue);
 
			this->packet_queue.pop_front();
 
		} else {
 
			return SPS_PARTLY_SENT;
 
		}
 
	}
 

	
 
	return SPS_ALL_SENT;
src/network/core/tcp.h
Show inline comments
 
@@ -27,13 +27,13 @@ enum SendPacketsState {
 
	SPS_ALL_SENT,    ///< All packets in the queue are sent.
 
};
 

	
 
/** Base socket handler for all TCP sockets */
 
class NetworkTCPSocketHandler : public NetworkSocketHandler {
 
private:
 
	Packet *packet_queue;     ///< Packets that are awaiting delivery
 
	std::deque<std::unique_ptr<Packet>> packet_queue; ///< Packets that are awaiting delivery. Cannot be std::queue as that does not have a clear() function.
 
	std::unique_ptr<Packet> packet_recv; ///< Partially received packet
 

	
 
	void EmptyPacketQueue();
 
public:
 
	SOCKET sock;              ///< The socket currently connected to
 
	bool writable;            ///< Can we write to this socket?
 
@@ -55,13 +55,13 @@ public:
 
	bool CanSendReceive();
 

	
 
	/**
 
	 * Whether there is something pending in the send queue.
 
	 * @return true when something is pending in the send queue.
 
	 */
 
	bool HasSendQueue() { return this->packet_queue != nullptr; }
 
	bool HasSendQueue() { return !this->packet_queue.empty(); }
 

	
 
	NetworkTCPSocketHandler(SOCKET s = INVALID_SOCKET);
 
	~NetworkTCPSocketHandler();
 
};
 

	
 
/**
src/network/network_server.cpp
Show inline comments
 
@@ -58,40 +58,38 @@ INSTANTIATE_POOL_METHODS(NetworkClientSo
 
/** Instantiate the listen sockets. */
 
template SocketList TCPListenHandler<ServerNetworkGameSocketHandler, PACKET_SERVER_FULL, PACKET_SERVER_BANNED>::sockets;
 

	
 
/** Writing a savegame directly to a number of packets. */
 
struct PacketWriter : SaveFilter {
 
	ServerNetworkGameSocketHandler *cs; ///< Socket we are associated with.
 
	Packet *current;                    ///< The packet we're currently writing to.
 
	std::unique_ptr<Packet> current; ///< The packet we're currently writing to.
 
	size_t total_size;                  ///< Total size of the compressed savegame.
 
	Packet *packets;                    ///< Packet queue of the savegame; send these "slowly" to the client.
 
	std::deque<std::unique_ptr<Packet>> packets; ///< Packet queue of the savegame; send these "slowly" to the client. Cannot be a std::queue as we want to push the map size packet in front of the data packets.
 
	std::mutex mutex;                   ///< Mutex for making threaded saving safe.
 
	std::condition_variable exit_sig;   ///< Signal for threaded destruction of this packet writer.
 

	
 
	/**
 
	 * Create the packet writer.
 
	 * @param cs The socket handler we're making the packets for.
 
	 */
 
	PacketWriter(ServerNetworkGameSocketHandler *cs) : SaveFilter(nullptr), cs(cs), current(nullptr), total_size(0), packets(nullptr)
 
	PacketWriter(ServerNetworkGameSocketHandler *cs) : SaveFilter(nullptr), cs(cs), total_size(0)
 
	{
 
	}
 

	
 
	/** Make sure everything is cleaned up. */
 
	~PacketWriter()
 
	{
 
		std::unique_lock<std::mutex> lock(this->mutex);
 

	
 
		if (this->cs != nullptr) this->exit_sig.wait(lock);
 

	
 
		/* This must all wait until the Destroy function is called. */
 

	
 
		while (this->packets != nullptr) {
 
			delete Packet::PopFromQueue(&this->packets);
 
		}
 

	
 
		delete this->current;
 
		Debug(net, 0, "Destruct!");
 
		this->packets.clear();
 
		this->current = nullptr;
 
	}
 

	
 
	/**
 
	 * Begin the destruction of this packet writer. It can happen in two ways:
 
	 * in the first case the client disconnected while saving the map. In this
 
	 * case the saving has not finished and killed this PacketWriter. In that
 
@@ -122,64 +120,44 @@ struct PacketWriter : SaveFilter {
 
	 * @param socket The network socket to write to.
 
	 * @return True iff the last packet of the map has been sent.
 
	 */
 
	bool TransferToNetworkQueue(ServerNetworkGameSocketHandler *socket)
 
	{
 
		/* Unsafe check for the queue being empty or not. */
 
		if (this->packets == nullptr) return false;
 
		if (this->packets.empty()) return false;
 

	
 
		std::lock_guard<std::mutex> lock(this->mutex);
 

	
 
		while (this->packets != nullptr) {
 
			Packet *p = Packet::PopFromQueue(&this->packets);
 
			bool last_packet = p->GetPacketType() == PACKET_SERVER_MAP_DONE;
 
			socket->SendPacket(p);
 
		while (!this->packets.empty()) {
 
			bool last_packet = this->packets.front()->GetPacketType() == PACKET_SERVER_MAP_DONE;
 
			socket->SendPacket(this->packets.front().release());
 
			this->packets.pop_front();
 

	
 
			if (last_packet) return true;
 
		}
 

	
 
		return false;
 
	}
 

	
 
	/** Append the current packet to the queue. */
 
	void AppendQueue()
 
	{
 
		if (this->current == nullptr) return;
 

	
 
		Packet::AddToQueue(&this->packets, this->current);
 
		this->current = nullptr;
 
	}
 

	
 
	/** Prepend the current packet to the queue. */
 
	void PrependQueue()
 
	{
 
		if (this->current == nullptr) return;
 

	
 
		/* Reversed from AppendQueue so the queue gets added to the current one. */
 
		Packet::AddToQueue(&this->current, this->packets);
 
		this->packets = this->current;
 
		this->current = nullptr;
 
	}
 

	
 
	void Write(byte *buf, size_t size) override
 
	{
 
		/* We want to abort the saving when the socket is closed. */
 
		if (this->cs == nullptr) SlError(STR_NETWORK_ERROR_LOSTCONNECTION);
 

	
 
		if (this->current == nullptr) this->current = new Packet(PACKET_SERVER_MAP_DATA, TCP_MTU);
 
		if (this->current == nullptr) this->current = std::make_unique<Packet>(PACKET_SERVER_MAP_DATA, TCP_MTU);
 

	
 
		std::lock_guard<std::mutex> lock(this->mutex);
 

	
 
		byte *bufe = buf + size;
 
		while (buf != bufe) {
 
			size_t written = this->current->Send_bytes(buf, bufe);
 
			buf += written;
 

	
 
			if (!this->current->CanWriteToPacket(1)) {
 
				this->AppendQueue();
 
				if (buf != bufe) this->current = new Packet(PACKET_SERVER_MAP_DATA, TCP_MTU);
 
				this->packets.push_back(std::move(this->current));
 
				if (buf != bufe) this->current = std::make_unique<Packet>(PACKET_SERVER_MAP_DATA, TCP_MTU);
 
			}
 
		}
 

	
 
		this->total_size += size;
 
	}
 

	
 
@@ -188,22 +166,21 @@ struct PacketWriter : SaveFilter {
 
		/* We want to abort the saving when the socket is closed. */
 
		if (this->cs == nullptr) SlError(STR_NETWORK_ERROR_LOSTCONNECTION);
 

	
 
		std::lock_guard<std::mutex> lock(this->mutex);
 

	
 
		/* Make sure the last packet is flushed. */
 
		this->AppendQueue();
 
		if (this->current != nullptr) this->packets.push_back(std::move(this->current));
 

	
 
		/* Add a packet stating that this is the end to the queue. */
 
		this->current = new Packet(PACKET_SERVER_MAP_DONE);
 
		this->AppendQueue();
 
		this->packets.push_back(std::make_unique<Packet>(PACKET_SERVER_MAP_DONE));
 

	
 
		/* Fast-track the size to the client. */
 
		this->current = new Packet(PACKET_SERVER_MAP_SIZE);
 
		this->current->Send_uint32((uint32_t)this->total_size);
 
		this->PrependQueue();
 
		auto p = std::make_unique<Packet>(PACKET_SERVER_MAP_SIZE);
 
		p->Send_uint32((uint32_t)this->total_size);
 
		this->packets.push_front(std::move(p));
 
	}
 
};
 

	
 

	
 
/**
 
 * Create a new socket for the server side of the game connection.
0 comments (0 inline, 0 general)