Addition of ExecuteThread executor for running packets.

This commit is contained in:
MobiusDevelopment
2023-07-12 00:45:47 +03:00
parent b994d54621
commit 96a579ee4a
277 changed files with 3248 additions and 4504 deletions

View File

@ -470,7 +470,6 @@ public class Config
public static Set<String> ALT_DEV_EXCLUDED_PACKETS;
public static int SCHEDULED_THREAD_POOL_SIZE;
public static int INSTANT_THREAD_POOL_SIZE;
public static boolean THREADS_FOR_CLIENT_PACKETS;
public static boolean THREADS_FOR_LOADING;
public static boolean DEADLOCK_DETECTOR;
public static int DEADLOCK_CHECK_INTERVAL;
@ -1456,7 +1455,6 @@ public class Config
{
INSTANT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
}
THREADS_FOR_CLIENT_PACKETS = serverConfig.getBoolean("ThreadsForClientPackets", true);
THREADS_FOR_LOADING = serverConfig.getBoolean("ThreadsForLoading", false);
DEADLOCK_DETECTOR = serverConfig.getBoolean("DeadLockDetector", true);
DEADLOCK_CHECK_INTERVAL = serverConfig.getInt("DeadLockCheckInterval", 20);
@ -1482,8 +1480,8 @@ public class Config
final PropertiesParser networkConfig = new PropertiesParser(NETWORK_CONFIG_FILE);
CLIENT_READ_POOL_SIZE = networkConfig.getInt("ClientReadPoolSize", 100);
CLIENT_SEND_POOL_SIZE = networkConfig.getInt("ClientSendPoolSize", 50);
CLIENT_EXECUTE_POOL_SIZE = networkConfig.getInt("ClientExecutePoolSize", 50);
CLIENT_SEND_POOL_SIZE = networkConfig.getInt("ClientSendPoolSize", 100);
CLIENT_EXECUTE_POOL_SIZE = networkConfig.getInt("ClientExecutePoolSize", 100);
PACKET_QUEUE_LIMIT = networkConfig.getInt("PacketQueueLimit", 80);
PACKET_FLOOD_DISCONNECT = networkConfig.getBoolean("PacketFloodDisconnect", false);
PACKET_FLOOD_DROP = networkConfig.getBoolean("PacketFloodDrop", false);
@ -3632,7 +3630,6 @@ public class Config
{
INSTANT_THREAD_POOL_SIZE = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
}
THREADS_FOR_CLIENT_PACKETS = loginConfig.getBoolean("ThreadsForClientPackets", true);
SHOW_LICENCE = loginConfig.getBoolean("ShowLicence", true);
SHOW_PI_AGREEMENT = loginConfig.getBoolean("ShowPIAgreement", false);
AUTO_CREATE_ACCOUNTS = loginConfig.getBoolean("AutoCreateAccounts", true);

View File

@ -1,8 +1,12 @@
package org.l2jmobius.commons.network;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.l2jmobius.commons.threads.ThreadProvider;
import org.l2jmobius.commons.util.CommonUtil;
/**
@ -12,10 +16,16 @@ import org.l2jmobius.commons.util.CommonUtil;
*/
public class ExecuteThread<E extends NetClient> implements Runnable
{
private static final Logger LOGGER = Logger.getLogger(ExecuteThread.class.getName());
protected static final Logger LOGGER = Logger.getLogger(ExecuteThread.class.getName());
private final Set<E> _pool;
private final PacketHandlerInterface<E> _packetHandler;
// The core pool size for the ThreadPoolExecutor.
private static final int EXECUTOR_POOL_SIZE = 2;
// ThreadPoolExecutor used to execute tasks concurrently, avoiding delays caused by waiting for a single client.
private final ThreadPoolExecutor _executor = new ThreadPoolExecutor(EXECUTOR_POOL_SIZE, Integer.MAX_VALUE, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadProvider("ExecuteThread Executor", Thread.MAX_PRIORITY));
protected final Set<E> _pool;
protected final PacketHandlerInterface<E> _packetHandler;
private boolean _idle;
public ExecuteThread(Set<E> pool, PacketHandlerInterface<E> packetHandler)
@ -43,30 +53,19 @@ public class ExecuteThread<E extends NetClient> implements Runnable
continue ITERATE;
}
if (client.isRunning())
{
continue ITERATE;
}
final byte[] data = client.getReceivedData().poll();
if (data == null)
{
continue ITERATE;
}
if (client.getEncryption() != null)
{
try
{
client.getEncryption().decrypt(data, 0, data.length);
}
catch (Exception e)
{
if (client.getNetConfig().isFailedDecryptionLogged())
{
LOGGER.warning("ExecuteThread: Problem with " + client + " data decryption.");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
client.disconnect();
continue ITERATE;
}
}
_packetHandler.handle(client, new ReadablePacket(data));
client.setRunning(true);
_executor.execute(new ExecuteTask(client, data));
_idle = false;
}
@ -93,4 +92,43 @@ public class ExecuteThread<E extends NetClient> implements Runnable
}
}
}
private class ExecuteTask implements Runnable
{
private final E _client;
private final byte[] _data;
public ExecuteTask(E client, byte[] data)
{
_client = client;
_data = data;
}
@Override
public void run()
{
if (_client.getEncryption() != null)
{
try
{
_client.getEncryption().decrypt(_data, 0, _data.length);
}
catch (Exception e)
{
if (_client.getNetConfig().isFailedDecryptionLogged())
{
LOGGER.warning("ExecuteThread: Problem with " + _client + " data decryption.");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
_pool.remove(_client);
_client.disconnect();
return;
}
}
_packetHandler.handle(_client, new ReadablePacket(_data));
_client.setRunning(false);
}
}
}

View File

@ -19,6 +19,7 @@ public class NetClient
private final Queue<byte[]> _receivedData = new ConcurrentLinkedQueue<>();
private final Queue<WritablePacket> _sendPacketQueue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean _isSending = new AtomicBoolean();
private final AtomicBoolean _isRunning = new AtomicBoolean();
private String _ip;
private Socket _socket;
@ -65,6 +66,7 @@ public class NetClient
*/
public void onDisconnection()
{
disconnect();
}
/**
@ -272,6 +274,24 @@ public class NetClient
_isSending.set(value);
}
/**
* Checks if the client is currently in the process of running a client packet.
* @return {@code true} if the client is running a client packet, {@code false} otherwise.
*/
public boolean isRunning()
{
return _isRunning.get();
}
/**
* Sets the running state of the client.
* @param value the sending state to set. {@code true} if the client is running a client packet, {@code false} otherwise.
*/
public void setRunning(boolean value)
{
_isRunning.set(value);
}
/**
* @return the network configurations of this client.
*/

View File

@ -7,6 +7,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.l2jmobius.commons.threads.ThreadProvider;
/**
* @author Pantelis Andrianakis
* @since June 22nd 2023
@ -14,14 +16,11 @@ import java.util.concurrent.TimeUnit;
*/
public class SendThread<E extends NetClient> implements Runnable
{
// Throttle packets sent per cycle to limit flooding from waiting one client.
private static final int MAX_PACKETS_SENT_PER_CYCLE = 2000;
// The core pool size for the ThreadPoolExecutor.
private static final int EXECUTOR_POOL_SIZE = 2;
private static final int EXECUTOR_POOL_SIZE = 5;
// ThreadPoolExecutor used to execute tasks concurrently, avoiding delays caused by waiting for a single client.
private final ThreadPoolExecutor _executor = new ThreadPoolExecutor(EXECUTOR_POOL_SIZE, Integer.MAX_VALUE, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
private final ThreadPoolExecutor _executor = new ThreadPoolExecutor(EXECUTOR_POOL_SIZE, Integer.MAX_VALUE, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadProvider("SendThread Executor", Thread.MAX_PRIORITY));
protected final Set<E> _pool;
private boolean _idle;
@ -104,15 +103,10 @@ public class SendThread<E extends NetClient> implements Runnable
@Override
public void run()
{
WritablePacket writablePacket;
final OutputStream outputStream = _client.getOutputStream();
for (int count = 0; count < MAX_PACKETS_SENT_PER_CYCLE; count++)
while ((writablePacket = _packetQueue.poll()) != null)
{
final WritablePacket writablePacket = _packetQueue.poll();
if (writablePacket == null)
{
break;
}
final byte[] sendableBytes = writablePacket.getSendableBytes(_client.getEncryption());
if (sendableBytes == null)
{

View File

@ -18,10 +18,8 @@ package org.l2jmobius.gameserver.network;
import java.util.logging.Logger;
import org.l2jmobius.Config;
import org.l2jmobius.commons.network.PacketHandlerInterface;
import org.l2jmobius.commons.network.ReadablePacket;
import org.l2jmobius.commons.threads.ThreadPool;
import org.l2jmobius.commons.util.CommonUtil;
import org.l2jmobius.gameserver.network.clientpackets.ClientPacket;
@ -75,54 +73,16 @@ public class PacketHandler implements PacketHandlerInterface<GameClient>
return;
}
// Continue on another thread.
if (Config.THREADS_FOR_CLIENT_PACKETS)
// Packet read and run.
try
{
ThreadPool.execute(new ExecuteTask(client, packet, newPacket, packetId));
newPacket.read(packet);
newPacket.run(client);
}
else // Wait for execution.
catch (Exception e)
{
try
{
newPacket.read(packet);
newPacket.run(client);
}
catch (Exception e)
{
LOGGER.warning("PacketHandler: Problem with " + client + " [Packet: 0x" + Integer.toHexString(packetId).toUpperCase() + "]");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
}
}
private class ExecuteTask implements Runnable
{
private final GameClient _client;
private final ReadablePacket _packet;
private final ClientPacket _newPacket;
private final int _packetId;
public ExecuteTask(GameClient client, ReadablePacket packet, ClientPacket newPacket, int packetId)
{
_client = client;
_packet = packet;
_newPacket = newPacket;
_packetId = packetId;
}
@Override
public void run()
{
try
{
_newPacket.read(_packet);
_newPacket.run(_client);
}
catch (Exception e)
{
LOGGER.warning("PacketHandler->ExecuteTask: Problem with " + _client + " [Packet: 0x" + Integer.toHexString(_packetId).toUpperCase() + "]");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
LOGGER.warning("PacketHandler: Problem with " + client + " [Packet: 0x" + Integer.toHexString(packetId).toUpperCase() + "]");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
}
}

View File

@ -18,10 +18,8 @@ package org.l2jmobius.loginserver.network;
import java.util.logging.Logger;
import org.l2jmobius.Config;
import org.l2jmobius.commons.network.PacketHandlerInterface;
import org.l2jmobius.commons.network.ReadablePacket;
import org.l2jmobius.commons.threads.ThreadPool;
import org.l2jmobius.commons.util.CommonUtil;
import org.l2jmobius.loginserver.network.clientpackets.LoginClientPacket;
@ -75,54 +73,16 @@ public class LoginPacketHandler implements PacketHandlerInterface<LoginClient>
return;
}
// Continue on another thread.
if (Config.THREADS_FOR_CLIENT_PACKETS)
// Packet read and run.
try
{
ThreadPool.execute(new ExecuteTask(client, packet, newPacket, packetId));
newPacket.read(packet);
newPacket.run(client);
}
else // Wait for execution.
catch (Exception e)
{
try
{
newPacket.read(packet);
newPacket.run(client);
}
catch (Exception e)
{
LOGGER.warning("LoginPacketHandler: Problem with " + client + " [Packet: 0x" + Integer.toHexString(packetId).toUpperCase() + "]");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
}
}
private class ExecuteTask implements Runnable
{
private final LoginClient _client;
private final ReadablePacket _packet;
private final LoginClientPacket _newPacket;
private final int _packetId;
public ExecuteTask(LoginClient client, ReadablePacket packet, LoginClientPacket newPacket, int packetId)
{
_client = client;
_packet = packet;
_newPacket = newPacket;
_packetId = packetId;
}
@Override
public void run()
{
try
{
_newPacket.read(_packet);
_newPacket.run(_client);
}
catch (Exception e)
{
LOGGER.warning("LoginPacketHandler->ExecuteTask: Problem with " + _client + " [Packet: 0x" + Integer.toHexString(_packetId).toUpperCase() + "]");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
LOGGER.warning("LoginPacketHandler: Problem with " + client + " [Packet: 0x" + Integer.toHexString(packetId).toUpperCase() + "]");
LOGGER.warning(CommonUtil.getStackTrace(e));
}
}
}