Addition of SendThread class and other related adjustments.

This commit is contained in:
MobiusDevelopment
2023-06-25 19:37:46 +03:00
parent 4708f17e41
commit 4bd9210c69
468 changed files with 8349 additions and 2223 deletions

View File

@@ -36,6 +36,11 @@ GameserverPort = 7777
# Default: 100
ClientReadPoolSize = 100
# Client pool size for sending server packets.
# Each pool is executed on a separate thread.
# Default: 25
ClientSendPoolSize = 25
# Client pool size for executing client packets.
# Each pool is executed on a separate thread.
# Default: 50

View File

@@ -766,6 +766,7 @@ public class Config
public static int LOGIN_BLOCK_AFTER_BAN;
public static String GAMESERVER_HOSTNAME;
public static int CLIENT_READ_POOL_SIZE;
public static int CLIENT_SEND_POOL_SIZE;
public static int CLIENT_EXECUTE_POOL_SIZE;
public static int PACKET_QUEUE_LIMIT;
public static boolean PACKET_FLOOD_DISCONNECT;
@@ -1373,6 +1374,7 @@ public class Config
GAME_SERVER_LOGIN_PORT = serverConfig.getInt("LoginPort", 9014);
GAME_SERVER_LOGIN_HOST = serverConfig.getString("LoginHost", "127.0.0.1");
CLIENT_READ_POOL_SIZE = serverConfig.getInt("ClientReadPoolSize", 100);
CLIENT_SEND_POOL_SIZE = serverConfig.getInt("ClientSendPoolSize", 25);
CLIENT_EXECUTE_POOL_SIZE = serverConfig.getInt("ClientExecutePoolSize", 50);
PACKET_QUEUE_LIMIT = serverConfig.getInt("PacketQueueLimit", 80);
PACKET_FLOOD_DISCONNECT = serverConfig.getBoolean("PacketFloodDisconnect", false);

View File

@@ -16,6 +16,7 @@ public class ExecuteThread<E extends NetClient> implements Runnable
private final Set<E> _pool;
private final PacketHandlerInterface<E> _packetHandler;
private boolean _idle;
public ExecuteThread(Set<E> pool, PacketHandlerInterface<E> packetHandler)
{
@@ -31,6 +32,8 @@ public class ExecuteThread<E extends NetClient> implements Runnable
// No need to iterate when pool is empty.
if (!_pool.isEmpty())
{
_idle = true;
// Iterate client pool.
ITERATE: for (E client : _pool)
{
@@ -40,7 +43,7 @@ public class ExecuteThread<E extends NetClient> implements Runnable
continue ITERATE;
}
final byte[] data = client.getPacketData().poll();
final byte[] data = client.getReceivedData().poll();
if (data == null)
{
continue ITERATE;
@@ -64,16 +67,29 @@ public class ExecuteThread<E extends NetClient> implements Runnable
}
}
_packetHandler.handle(client, new ReadablePacket(data));
_idle = false;
}
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(_idle ? 10 : 1);
}
catch (Exception ignored)
{
}
}
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(1);
}
catch (Exception ignored)
else // Remain idle for 1 second.
{
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(1000);
}
catch (Exception ignored)
{
}
}
}
}

View File

@@ -15,12 +15,15 @@ public class NetClient
{
protected static final Logger LOGGER = Logger.getLogger(NetClient.class.getName());
private final Queue<byte[]> _receivedData = new ConcurrentLinkedQueue<>();
private final Queue<WritablePacket> _sendPacketQueue = new ConcurrentLinkedQueue<>();
private String _ip;
private Socket _socket;
private InputStream _inputStream;
private OutputStream _outputStream;
private NetConfig _netConfig;
private Queue<byte[]> _packetData;
private byte[] _pendingData;
private int _pendingPacketSize;
@@ -33,7 +36,6 @@ public class NetClient
{
_socket = socket;
_netConfig = netConfig;
_packetData = new ConcurrentLinkedQueue<>();
_ip = socket.getInetAddress().toString().substring(1); // Trim out /127.0.0.1
try
@@ -82,10 +84,8 @@ public class NetClient
}
}
if (_packetData != null)
{
_packetData.clear();
}
_receivedData.clear();
_sendPacketQueue.clear();
if (_pendingData != null)
{
@@ -97,10 +97,10 @@ public class NetClient
* Add packet data to the queue.
* @param data
*/
public void addPacketData(byte[] data)
public void addReceivedData(byte[] data)
{
// Check packet flooding.
final int size = _packetData.size();
final int size = _receivedData.size();
if (size >= _netConfig.getPacketQueueLimit())
{
if (_netConfig.isPacketFloodDisconnect())
@@ -127,15 +127,15 @@ public class NetClient
}
// Add to queue.
_packetData.add(data);
_receivedData.add(data);
}
/**
* @return the pending packet data.
* @return the pending received data.
*/
public Queue<byte[]> getPacketData()
public Queue<byte[]> getReceivedData()
{
return _packetData;
return _receivedData;
}
/**
@@ -172,34 +172,21 @@ public class NetClient
_pendingPacketSize = pendingPacketSize;
}
/**
* @return the writable packet queue waiting to be sent.
*/
public Queue<WritablePacket> getSendPacketQueue()
{
return _sendPacketQueue;
}
/**
* Sends a packet over the network using the default encryption.
* @param packet The packet to send.
*/
public void sendPacket(WritablePacket packet)
{
if ((_socket == null) || !_socket.isConnected())
{
return;
}
final byte[] sendableBytes = packet.getSendableBytes(getEncryption());
if (sendableBytes == null)
{
return;
}
try
{
synchronized (this)
{
_outputStream.write(sendableBytes);
_outputStream.flush();
}
}
catch (Exception ignored)
{
}
_sendPacketQueue.add(packet);
}
/**

View File

@@ -7,6 +7,7 @@ package org.l2jmobius.commons.network;
public class NetConfig
{
private int _readPoolSize = 100;
private int _sendPoolSize = 25;
private int _executePoolSize = 50;
private int _packetQueueLimit = 80;
private boolean _packetFloodDisconnect = false;
@@ -32,6 +33,23 @@ public class NetConfig
_readPoolSize = clientPoolSize;
}
/**
* @return the NetClient pool size for sending server packets.
*/
public int getSendPoolSize()
{
return _sendPoolSize;
}
/**
* Sets the NetClient pool size for sending server packets.
* @param clientPoolSize
*/
public void setSendPoolSize(int clientPoolSize)
{
_sendPoolSize = clientPoolSize;
}
/**
* @return the NetClient pool size for executing client packets.
*/

View File

@@ -20,6 +20,7 @@ public class NetServer<E extends NetClient>
protected static final Logger LOGGER = Logger.getLogger(NetServer.class.getName());
protected final List<Set<E>> _clientReadPools = new LinkedList<>();
protected final List<Set<E>> _clientSendPools = new LinkedList<>();
protected final List<Set<E>> _clientExecutePools = new LinkedList<>();
protected final NetConfig _netConfig = new NetConfig();
protected final String _hostname;
@@ -155,6 +156,35 @@ public class NetServer<E extends NetClient>
_clientReadPools.add(newReadPool);
}
// Add to send pool.
// Find a pool that is not full.
boolean sendPoolFound = false;
SEND_POOLS: for (Set<E> pool : _clientSendPools)
{
if (pool.size() < _netConfig.getSendPoolSize())
{
pool.add(client);
sendPoolFound = true;
break SEND_POOLS;
}
}
// All pools are full.
if (!sendPoolFound)
{
// Create a new client pool.
final Set<E> newSendPool = ConcurrentHashMap.newKeySet(_netConfig.getSendPoolSize());
newSendPool.add(client);
// Create a new task for the new pool.
final Thread sendThread = new Thread(new SendThread<>(newSendPool), _name + ": Packet send thread " + _clientSendPools.size());
sendThread.setPriority(Thread.MAX_PRIORITY);
sendThread.setDaemon(true);
sendThread.start();
// Add the new pool to the pool list.
_clientSendPools.add(newSendPool);
}
// Add to execute pool.
// Find a pool that is not full.

View File

@@ -14,6 +14,7 @@ public class ReadThread<E extends NetClient> implements Runnable
private final Set<E> _pool;
private final byte[] _sizeBuffer = new byte[2]; // Reusable size buffer.
private final byte[] _pendingSizeBuffer = new byte[1]; // Reusable pending size buffer.
private boolean _idle;
public ReadThread(Set<E> pool)
{
@@ -28,6 +29,8 @@ public class ReadThread<E extends NetClient> implements Runnable
// No need to iterate when pool is empty.
if (!_pool.isEmpty())
{
_idle = true;
// Iterate client pool.
ITERATE: for (E client : _pool)
{
@@ -80,14 +83,16 @@ public class ReadThread<E extends NetClient> implements Runnable
// Read was complete.
if (currentSize >= pendingPacketSize)
{
// Add packet data to client.
client.addPacketData(mergedData);
// Add received data to client.
client.addReceivedData(mergedData);
client.setPendingData(null);
}
else
{
client.setPendingData(mergedData);
}
_idle = false;
}
}
continue ITERATE;
@@ -188,10 +193,12 @@ public class ReadThread<E extends NetClient> implements Runnable
client.setPendingData(pendindBytes);
client.setPendingPacketSize(packetSize);
}
else // Add packet data to client.
else // Add received data to client.
{
client.addPacketData(packetData);
client.addReceivedData(packetData);
}
_idle = false;
}
}
}
@@ -206,15 +213,26 @@ public class ReadThread<E extends NetClient> implements Runnable
onDisconnection(client);
}
}
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(_idle ? 10 : 1);
}
catch (Exception ignored)
{
}
}
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(1);
}
catch (Exception ignored)
else // Remain idle for 1 second.
{
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(1000);
}
catch (Exception ignored)
{
}
}
}
}

View File

@@ -0,0 +1,106 @@
package org.l2jmobius.commons.network;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Queue;
import java.util.Set;
/**
* @author Pantelis Andrianakis
* @since June 22nd 2023
* @param <E> extends NetClient
*/
public class SendThread<E extends NetClient> implements Runnable
{
// Throttle packets sent per cycle to limit flooding from waiting one client.
private static final int MAX_PACKETS_SENT_PER_CYCLE = 2000;
private final Set<E> _pool;
private boolean _idle;
public SendThread(Set<E> pool)
{
_pool = pool;
}
@Override
public void run()
{
while (true)
{
// No need to iterate when pool is empty.
if (!_pool.isEmpty())
{
_idle = true;
// Iterate client pool.
ITERATE: for (E client : _pool)
{
final Socket socket = client.getSocket();
if (socket == null)
{
_pool.remove(client);
continue ITERATE;
}
try
{
final Queue<WritablePacket> packetQueue = client.getSendPacketQueue();
if (packetQueue.isEmpty())
{
continue ITERATE;
}
final OutputStream outputStream = client.getOutputStream();
SEND_CYCLE: for (int count = 0; count < MAX_PACKETS_SENT_PER_CYCLE; count++)
{
final WritablePacket writablePacket = packetQueue.poll();
if (writablePacket == null)
{
continue ITERATE;
}
final byte[] sendableBytes = writablePacket.getSendableBytes(client.getEncryption());
if (sendableBytes == null)
{
continue SEND_CYCLE;
}
// Send the packet data.
outputStream.write(sendableBytes);
outputStream.flush();
// Run packet implementation.
writablePacket.run();
_idle = false;
}
}
catch (Exception ignored)
{
}
}
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(_idle ? 10 : 1);
}
catch (Exception ignored)
{
}
}
else // Remain idle for 1 second.
{
// Prevent high CPU caused by repeatedly looping.
try
{
Thread.sleep(1000);
}
catch (Exception ignored)
{
}
}
}
}
}

View File

@@ -223,6 +223,14 @@ public abstract class WritablePacket
// Overridden by server implementation.
}
/**
* Method that runs after packet is sent.
*/
public void run()
{
// Overridden by server implementation.
}
/**
* @return <b>byte[]</b> of the sendable packet data, including a size header.
*/

View File

@@ -468,6 +468,7 @@ public class GameServer
final NetServer<GameClient> server = new NetServer<>(Config.GAMESERVER_HOSTNAME, Config.PORT_GAME, new PacketHandler(), GameClient::new);
server.setName(getClass().getSimpleName());
server.getNetConfig().setReadPoolSize(Config.CLIENT_READ_POOL_SIZE);
server.getNetConfig().setSendPoolSize(Config.CLIENT_SEND_POOL_SIZE);
server.getNetConfig().setExecutePoolSize(Config.CLIENT_EXECUTE_POOL_SIZE);
server.getNetConfig().setPacketQueueLimit(Config.PACKET_QUEUE_LIMIT);
server.getNetConfig().setPacketFloodDisconnect(Config.PACKET_FLOOD_DISCONNECT);

View File

@@ -20,8 +20,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,7 +64,6 @@ public class GameClient extends NetClient
{
protected static final Logger LOGGER_ACCOUNTING = Logger.getLogger("accounting");
private final Queue<ServerPacket> _pendingPackets = new ConcurrentLinkedQueue<>();
private final FloodProtectors _floodProtectors = new FloodProtectors(this);
private final ReentrantLock _playerLock = new ReentrantLock();
private ConnectionState _connectionState = ConnectionState.CONNECTED;
@@ -218,16 +215,11 @@ public class GameClient extends NetClient
}
}
// Keep the order of packets if sent by multiple threads.
_pendingPackets.add(packet);
synchronized (_pendingPackets)
{
// Send the packet data.
super.sendPacket(packet);
// Run packet implementation.
packet.run(_player);
}
// Used by packet run() method.
packet.setPlayer(_player);
// Send the packet data.
super.sendPacket(packet);
}
public void sendPacket(SystemMessageId systemMessageId)

View File

@@ -131,12 +131,14 @@ public abstract class AbstractHtmlPacket extends ServerPacket
}
@Override
public void run(Player player)
public void run()
{
final Player player = getPlayer();
if (player != null)
{
player.clearHtmlActions(getScope());
}
if (_disabledValidation)
{
return;

View File

@@ -162,8 +162,9 @@ public class CreatureSay extends ServerPacket
}
@Override
public void run(Player player)
public void run()
{
final Player player = getPlayer();
if (player != null)
{
player.broadcastSnoop(_chatType, _senderName, _text);

View File

@@ -117,12 +117,22 @@ public abstract class ServerPacket extends WritablePacket
super(initialSize);
}
private Player _player;
/**
* Method that runs after packet is sent.
* @param player
* @return the Player
*/
public void run(Player player)
public Player getPlayer()
{
return _player;
}
/**
* @param player the Player to set.
*/
public void setPlayer(Player player)
{
_player = player;
}
protected void writeOptionalInt(int value)

View File

@@ -136,6 +136,7 @@ public class LoginServer
final NetServer<LoginClient> server = new NetServer<>(Config.LOGIN_BIND_ADDRESS, Config.PORT_LOGIN, new LoginPacketHandler(), LoginClient::new);
server.setName(getClass().getSimpleName());
server.getNetConfig().setReadPoolSize(2000);
server.getNetConfig().setSendPoolSize(2000);
server.getNetConfig().setExecutePoolSize(2000);
server.getNetConfig().setPacketQueueLimit(10);
server.getNetConfig().setPacketFloodDisconnect(true);