Addition of SendThread executor for sending packets.
This commit is contained in:
@ -1509,7 +1509,7 @@ public class Config
|
||||
|
||||
final PropertiesParser networkConfig = new PropertiesParser(NETWORK_CONFIG_FILE);
|
||||
CLIENT_READ_POOL_SIZE = networkConfig.getInt("ClientReadPoolSize", 100);
|
||||
CLIENT_SEND_POOL_SIZE = networkConfig.getInt("ClientSendPoolSize", 25);
|
||||
CLIENT_SEND_POOL_SIZE = networkConfig.getInt("ClientSendPoolSize", 50);
|
||||
CLIENT_EXECUTE_POOL_SIZE = networkConfig.getInt("ClientExecutePoolSize", 50);
|
||||
PACKET_QUEUE_LIMIT = networkConfig.getInt("PacketQueueLimit", 80);
|
||||
PACKET_FLOOD_DISCONNECT = networkConfig.getBoolean("PacketFloodDisconnect", false);
|
||||
|
@ -5,6 +5,7 @@ import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
@ -17,6 +18,7 @@ public class NetClient
|
||||
|
||||
private final Queue<byte[]> _receivedData = new ConcurrentLinkedQueue<>();
|
||||
private final Queue<WritablePacket> _sendPacketQueue = new ConcurrentLinkedQueue<>();
|
||||
private final AtomicBoolean _isSending = new AtomicBoolean();
|
||||
|
||||
private String _ip;
|
||||
private Socket _socket;
|
||||
@ -252,6 +254,24 @@ public class NetClient
|
||||
return _outputStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the client is currently in the process of sending data.
|
||||
* @return {@code true} if the client is sending data, {@code false} otherwise.
|
||||
*/
|
||||
public boolean isSending()
|
||||
{
|
||||
return _isSending.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the sending state of the client.
|
||||
* @param value the sending state to set. {@code true} if the client is sending data, {@code false} otherwise.
|
||||
*/
|
||||
public void setSending(boolean value)
|
||||
{
|
||||
_isSending.set(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the network configurations of this client.
|
||||
*/
|
||||
|
@ -3,6 +3,9 @@ package org.l2jmobius.commons.network;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author Pantelis Andrianakis
|
||||
@ -14,7 +17,13 @@ public class SendThread<E extends NetClient> implements Runnable
|
||||
// Throttle packets sent per cycle to limit flooding from waiting one client.
|
||||
private static final int MAX_PACKETS_SENT_PER_CYCLE = 2000;
|
||||
|
||||
private final Set<E> _pool;
|
||||
// The core pool size for the ThreadPoolExecutor.
|
||||
private static final int EXECUTOR_POOL_SIZE = 2;
|
||||
|
||||
// ThreadPoolExecutor used to execute tasks concurrently, avoiding delays caused by waiting for a single client.
|
||||
private final ThreadPoolExecutor _executor = new ThreadPoolExecutor(EXECUTOR_POOL_SIZE, Integer.MAX_VALUE, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
|
||||
|
||||
protected final Set<E> _pool;
|
||||
private boolean _idle;
|
||||
|
||||
public SendThread(Set<E> pool)
|
||||
@ -41,44 +50,21 @@ public class SendThread<E extends NetClient> implements Runnable
|
||||
continue ITERATE;
|
||||
}
|
||||
|
||||
try
|
||||
if (client.isSending())
|
||||
{
|
||||
final Queue<WritablePacket> packetQueue = client.getSendPacketQueue();
|
||||
if (packetQueue.isEmpty())
|
||||
{
|
||||
continue ITERATE;
|
||||
}
|
||||
|
||||
final OutputStream outputStream = client.getOutputStream();
|
||||
SEND_CYCLE: for (int count = 0; count < MAX_PACKETS_SENT_PER_CYCLE; count++)
|
||||
{
|
||||
final WritablePacket writablePacket = packetQueue.poll();
|
||||
if (writablePacket == null)
|
||||
{
|
||||
continue ITERATE;
|
||||
}
|
||||
|
||||
final byte[] sendableBytes = writablePacket.getSendableBytes(client.getEncryption());
|
||||
if (sendableBytes == null)
|
||||
{
|
||||
continue SEND_CYCLE;
|
||||
}
|
||||
|
||||
// Send the packet data.
|
||||
outputStream.write(sendableBytes);
|
||||
outputStream.flush();
|
||||
|
||||
// Run packet implementation.
|
||||
writablePacket.run();
|
||||
|
||||
_idle = false;
|
||||
}
|
||||
continue ITERATE;
|
||||
}
|
||||
catch (Exception ignored)
|
||||
|
||||
final Queue<WritablePacket> packetQueue = client.getSendPacketQueue();
|
||||
if (packetQueue.isEmpty())
|
||||
{
|
||||
_pool.remove(client);
|
||||
client.onDisconnection();
|
||||
continue ITERATE;
|
||||
}
|
||||
|
||||
client.setSending(true);
|
||||
_executor.execute(new SendTask(client, packetQueue));
|
||||
|
||||
_idle = false;
|
||||
}
|
||||
|
||||
// Prevent high CPU caused by repeatedly looping.
|
||||
@ -103,4 +89,54 @@ public class SendThread<E extends NetClient> implements Runnable
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class SendTask implements Runnable
|
||||
{
|
||||
private final E _client;
|
||||
private final Queue<WritablePacket> _packetQueue;
|
||||
|
||||
public SendTask(E client, Queue<WritablePacket> packetQueue)
|
||||
{
|
||||
_client = client;
|
||||
_packetQueue = packetQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final OutputStream outputStream = _client.getOutputStream();
|
||||
for (int count = 0; count < MAX_PACKETS_SENT_PER_CYCLE; count++)
|
||||
{
|
||||
final WritablePacket writablePacket = _packetQueue.poll();
|
||||
if (writablePacket == null)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
final byte[] sendableBytes = writablePacket.getSendableBytes(_client.getEncryption());
|
||||
if (sendableBytes == null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Send the packet data.
|
||||
outputStream.write(sendableBytes);
|
||||
outputStream.flush();
|
||||
|
||||
// Run packet implementation.
|
||||
writablePacket.run();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_pool.remove(_client);
|
||||
_client.onDisconnection();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_client.setSending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user