Replaced SocketChannel with Socket.

This commit is contained in:
MobiusDevelopment
2023-06-19 21:02:28 +03:00
parent 670b7e6982
commit 4708f17e41
340 changed files with 5470 additions and 6377 deletions

View File

@@ -63,10 +63,6 @@ PacketFloodLogged = True
# Default: True (disabled)
TcpNoDelay = True
# Connection timeout in milliseconds.
# Default 800
ConnectionTimeout = 800
# Packet encryption.
# By default packets sent or received are encrypted using the Blowfish algorithm.
# Disabling this reduces the resources needed to process any packets transfered,

View File

@@ -774,7 +774,6 @@ public class Config
public static boolean PACKET_FLOOD_DROP;
public static boolean PACKET_FLOOD_LOGGED;
public static boolean TCP_NO_DELAY;
public static int CONNECTION_TIMEOUT;
public static boolean PACKET_ENCRYPTION;
public static boolean FAILED_DECRYPTION_LOGGED;
public static String DATABASE_DRIVER;
@@ -1309,7 +1308,6 @@ public class Config
PACKET_FLOOD_DROP = serverConfig.getBoolean("PacketFloodDrop", false);
PACKET_FLOOD_LOGGED = serverConfig.getBoolean("PacketFloodLogged", true);
TCP_NO_DELAY = serverConfig.getBoolean("TcpNoDelay", true);
CONNECTION_TIMEOUT = serverConfig.getInt("ConnectionTimeout", 800);
PACKET_ENCRYPTION = serverConfig.getBoolean("PacketEncryption", false);
FAILED_DECRYPTION_LOGGED = serverConfig.getBoolean("FailedDecryptionLogged", true);
REQUEST_ID = serverConfig.getInt("RequestServerID", 0);

View File

@@ -26,19 +26,15 @@ public class ExecuteThread<E extends NetClient> implements Runnable
@Override
public void run()
{
long executionStart;
long currentTime;
while (true)
{
executionStart = System.currentTimeMillis();
// No need to iterate when pool is empty.
if (!_pool.isEmpty())
{
// Iterate client pool.
ITERATE: for (E client : _pool)
{
if (client.getChannel() == null)
if (client.getSocket() == null)
{
_pool.remove(client);
continue ITERATE;
@@ -72,16 +68,12 @@ public class ExecuteThread<E extends NetClient> implements Runnable
}
// Prevent high CPU caused by repeatedly looping.
currentTime = System.currentTimeMillis();
if ((currentTime - executionStart) < 1)
try
{
Thread.sleep(1);
}
catch (Exception ignored)
{
try
{
Thread.sleep(1);
}
catch (Exception ignored)
{
}
}
}
}

View File

@@ -1,7 +1,8 @@
package org.l2jmobius.commons.network;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Logger;
@@ -15,27 +16,30 @@ public class NetClient
protected static final Logger LOGGER = Logger.getLogger(NetClient.class.getName());
private String _ip;
private SocketChannel _channel;
private Socket _socket;
private InputStream _inputStream;
private OutputStream _outputStream;
private NetConfig _netConfig;
private Queue<byte[]> _pendingPacketData;
private ByteBuffer _pendingByteBuffer;
private Queue<byte[]> _packetData;
private byte[] _pendingData;
private int _pendingPacketSize;
/**
* Initialize the client.
* @param channel
* @param socket
* @param netConfig
*/
public void init(SocketChannel channel, NetConfig netConfig)
public void init(Socket socket, NetConfig netConfig)
{
_channel = channel;
_socket = socket;
_netConfig = netConfig;
_pendingPacketData = new ConcurrentLinkedQueue<>();
_packetData = new ConcurrentLinkedQueue<>();
_ip = socket.getInetAddress().toString().substring(1); // Trim out /127.0.0.1
try
{
_ip = _channel.getRemoteAddress().toString();
_ip = _ip.substring(1, _ip.lastIndexOf(':')); // Trim out /127.0.0.1:12345
_inputStream = _socket.getInputStream();
_outputStream = _socket.getOutputStream();
}
catch (Exception ignored)
{
@@ -64,26 +68,28 @@ public class NetClient
*/
public void disconnect()
{
if (_channel != null)
if (_socket != null)
{
try
{
_channel.close();
_channel = null;
_socket.close();
_socket = null;
_inputStream = null;
_outputStream = null;
}
catch (Exception ignored)
{
}
}
if (_pendingPacketData != null)
if (_packetData != null)
{
_pendingPacketData.clear();
_packetData.clear();
}
if (_pendingByteBuffer != null)
if (_pendingData != null)
{
_pendingByteBuffer = null;
_pendingData = null;
}
}
@@ -94,7 +100,7 @@ public class NetClient
public void addPacketData(byte[] data)
{
// Check packet flooding.
final int size = _pendingPacketData.size();
final int size = _packetData.size();
if (size >= _netConfig.getPacketQueueLimit())
{
if (_netConfig.isPacketFloodDisconnect())
@@ -121,7 +127,7 @@ public class NetClient
}
// Add to queue.
_pendingPacketData.add(data);
_packetData.add(data);
}
/**
@@ -129,24 +135,24 @@ public class NetClient
*/
public Queue<byte[]> getPacketData()
{
return _pendingPacketData;
return _packetData;
}
/**
* @return the pending read ByteBuffer.
* @return the pending read <b>byte[]</b>.
*/
public ByteBuffer getPendingByteBuffer()
public byte[] getPendingData()
{
return _pendingByteBuffer;
return _pendingData;
}
/**
* Set the pending read ByteBuffer.
* @param pendingByteBuffer the pending read ByteBuffer.
* Set the pending read <b>byte[]</b>.
* @param pendingData the pending read <b>byte[]</b>.
*/
public void setPendingByteBuffer(ByteBuffer pendingByteBuffer)
public void setPendingData(byte[] pendingData)
{
_pendingByteBuffer = pendingByteBuffer;
_pendingData = pendingData;
}
/**
@@ -166,6 +172,36 @@ public class NetClient
_pendingPacketSize = pendingPacketSize;
}
/**
* Sends a packet over the network using the default encryption.
* @param packet The packet to send.
*/
public void sendPacket(WritablePacket packet)
{
if ((_socket == null) || !_socket.isConnected())
{
return;
}
final byte[] sendableBytes = packet.getSendableBytes(getEncryption());
if (sendableBytes == null)
{
return;
}
try
{
synchronized (this)
{
_outputStream.write(sendableBytes);
_outputStream.flush();
}
}
catch (Exception ignored)
{
}
}
/**
* @return the Encryption of this client.
*/
@@ -175,11 +211,27 @@ public class NetClient
}
/**
* @return the SocketChannel of this client.
* @return the Socket of this client.
*/
public SocketChannel getChannel()
public Socket getSocket()
{
return _channel;
return _socket;
}
/**
* @return the InputStream of this client.
*/
public InputStream getInputStream()
{
return _inputStream;
}
/**
* @return the OutputStream of this client.
*/
public OutputStream getOutputStream()
{
return _outputStream;
}
/**

View File

@@ -8,7 +8,6 @@ public class NetConfig
{
private int _readPoolSize = 100;
private int _executePoolSize = 50;
private int _connectionTimeout = 800;
private int _packetQueueLimit = 80;
private boolean _packetFloodDisconnect = false;
private boolean _packetFloodDrop = false;
@@ -50,23 +49,6 @@ public class NetConfig
_executePoolSize = executePoolSize;
}
/**
* @return the timeout until a connection is established.
*/
public int getConnectionTimeout()
{
return _connectionTimeout;
}
/**
* Sets the timeout until a connection is established.
* @param connectionTimeout
*/
public void setConnectionTimeout(int connectionTimeout)
{
_connectionTimeout = connectionTimeout;
}
/**
* @return the packet queue limit of receivable packets.
*/

View File

@@ -1,8 +1,8 @@
package org.l2jmobius.commons.network;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -105,30 +105,26 @@ public class NetServer<E extends NetClient>
public void run()
{
// Create server and bind port.
try (ServerSocketChannel server = ServerSocketChannel.open())
try (ServerSocket server = new ServerSocket())
{
server.setReuseAddress(true);
server.bind(new InetSocketAddress(_hostname, _port));
server.configureBlocking(false); // Non-blocking I/O.
server.setSoTimeout(0); // Non-blocking I/O.
// Listen for new connections.
LOGGER.info(_name + ": Listening on port " + _port + " for incoming connections.");
long executionStart;
long currentTime;
while (true)
{
executionStart = System.currentTimeMillis();
final SocketChannel channel = server.accept();
if (channel != null)
final Socket socket = server.accept();
if (socket != null)
{
// Configure channel.
channel.socket().setTcpNoDelay(_netConfig.isTcpNoDelay());
channel.socket().setSoTimeout(_netConfig.getConnectionTimeout());
channel.configureBlocking(false); // Non-blocking I/O.
socket.setTcpNoDelay(_netConfig.isTcpNoDelay());
socket.setSoTimeout(0); // Non-blocking I/O.
// Create client.
final E client = _clientSupplier.get();
client.init(channel, _netConfig);
client.init(socket, _netConfig);
// Add to read pool.
@@ -190,11 +186,7 @@ public class NetServer<E extends NetClient>
}
// Prevent high CPU caused by repeatedly polling the channel.
currentTime = System.currentTimeMillis();
if ((currentTime - executionStart) < 1)
{
Thread.sleep(1);
}
Thread.sleep(1);
}
}
catch (Exception e)

View File

@@ -1,8 +1,7 @@
package org.l2jmobius.commons.network;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
@@ -12,9 +11,9 @@ import java.util.Set;
*/
public class ReadThread<E extends NetClient> implements Runnable
{
private final ByteBuffer _sizeBuffer = ByteBuffer.allocate(2); // Reusable size buffer.
private final ByteBuffer _pendingSizeBuffer = ByteBuffer.allocate(1); // Reusable pending size buffer.
private final Set<E> _pool;
private final byte[] _sizeBuffer = new byte[2]; // Reusable size buffer.
private final byte[] _pendingSizeBuffer = new byte[1]; // Reusable pending size buffer.
public ReadThread(Set<E> pool)
{
@@ -24,12 +23,8 @@ public class ReadThread<E extends NetClient> implements Runnable
@Override
public void run()
{
long executionStart;
long currentTime;
while (true)
{
executionStart = System.currentTimeMillis();
// No need to iterate when pool is empty.
if (!_pool.isEmpty())
{
@@ -38,22 +33,29 @@ public class ReadThread<E extends NetClient> implements Runnable
{
try
{
final SocketChannel channel = client.getChannel();
if (channel == null) // Unexpected disconnection?
final InputStream inputStream = client.getInputStream();
if (inputStream == null) // Unexpected disconnection?
{
// Null SocketChannel: client
// Null InputStream: client
onDisconnection(client);
continue ITERATE;
}
// Continue read if there is a pending ByteBuffer.
final ByteBuffer pendingByteBuffer = client.getPendingByteBuffer();
if (pendingByteBuffer != null)
// Continue when there are no bytes that can be read.
if (inputStream.available() < 1)
{
// Allocate an additional ByteBuffer based on pending packet size.
continue ITERATE;
}
// Continue read if there is a pending byte array.
final byte[] pendingData = client.getPendingData();
if (pendingData != null)
{
// Allocate an additional byte array based on pending packet size.
final int pendingPacketSize = client.getPendingPacketSize();
final ByteBuffer additionalData = ByteBuffer.allocate(pendingPacketSize - pendingByteBuffer.position());
switch (channel.read(additionalData))
final byte[] additionalData = new byte[pendingPacketSize - pendingData.length];
final int bytesRead = inputStream.read(additionalData);
switch (bytesRead)
{
// Disconnected.
case -1:
@@ -70,15 +72,21 @@ public class ReadThread<E extends NetClient> implements Runnable
default:
{
// Merge additional read data.
pendingByteBuffer.put(pendingByteBuffer.position(), additionalData, 0, additionalData.position());
pendingByteBuffer.position(pendingByteBuffer.position() + additionalData.position());
final int currentSize = pendingData.length + bytesRead;
final byte[] mergedData = new byte[currentSize];
System.arraycopy(pendingData, 0, mergedData, 0, pendingData.length);
System.arraycopy(additionalData, 0, mergedData, pendingData.length, bytesRead);
// Read was complete.
if (pendingByteBuffer.position() >= pendingPacketSize)
if (currentSize >= pendingPacketSize)
{
// Add packet data to client.
client.addPacketData(pendingByteBuffer.array());
client.setPendingByteBuffer(null);
client.addPacketData(mergedData);
client.setPendingData(null);
}
else
{
client.setPendingData(mergedData);
}
}
}
@@ -86,8 +94,8 @@ public class ReadThread<E extends NetClient> implements Runnable
}
// Read incoming packet size (short).
_sizeBuffer.clear();
switch (channel.read(_sizeBuffer))
boolean sizeRead = false;
switch (inputStream.read(_sizeBuffer))
{
// Disconnected.
case -1:
@@ -103,8 +111,8 @@ public class ReadThread<E extends NetClient> implements Runnable
// Need to read two bytes to calculate size.
case 1:
{
int attempt = 0; // Keep it under 10 attempts (100ms).
COMPLETE_SIZE_READ: while ((attempt++ < 10) && (_sizeBuffer.position() < 2))
// Keep it under 10 attempts (100ms).
COMPLETE_SIZE_READ: for (int attempt = 0; attempt < 10; attempt++)
{
// Wait for pending data.
try
@@ -116,8 +124,8 @@ public class ReadThread<E extends NetClient> implements Runnable
}
// Try to read the missing extra byte.
_pendingSizeBuffer.clear();
switch (channel.read(_pendingSizeBuffer))
_pendingSizeBuffer[0] = 0;
switch (inputStream.read(_pendingSizeBuffer))
{
// Disconnected.
case -1:
@@ -133,14 +141,15 @@ public class ReadThread<E extends NetClient> implements Runnable
// Merge additional read byte.
default:
{
_sizeBuffer.put(1, _pendingSizeBuffer, 0, 1);
_sizeBuffer.position(2);
_sizeBuffer[1] = _pendingSizeBuffer[0];
sizeRead = true;
break COMPLETE_SIZE_READ;
}
}
}
// Read failed.
if (_sizeBuffer.position() < 2)
if (!sizeRead)
{
onDisconnection(client);
continue ITERATE;
@@ -151,10 +160,11 @@ public class ReadThread<E extends NetClient> implements Runnable
// Read actual packet bytes.
default:
{
// Allocate a new ByteBuffer based on packet size read.
// Allocate a new byte array based on packet size read.
final int packetSize = calculatePacketSize();
final ByteBuffer packetByteBuffer = ByteBuffer.allocate(packetSize);
switch (channel.read(packetByteBuffer))
final byte[] packetData = new byte[packetSize];
final int bytesRead = inputStream.read(packetData);
switch (bytesRead)
{
// Disconnected.
case -1:
@@ -171,14 +181,16 @@ public class ReadThread<E extends NetClient> implements Runnable
default:
{
// Read was not complete.
if (packetByteBuffer.position() < packetSize)
if (bytesRead < packetSize)
{
client.setPendingByteBuffer(packetByteBuffer);
final byte[] pendindBytes = new byte[bytesRead];
System.arraycopy(packetData, 0, pendindBytes, 0, bytesRead);
client.setPendingData(pendindBytes);
client.setPendingPacketSize(packetSize);
}
else // Add packet data to client.
{
client.addPacketData(packetByteBuffer.array());
client.addPacketData(packetData);
}
}
}
@@ -197,24 +209,19 @@ public class ReadThread<E extends NetClient> implements Runnable
}
// Prevent high CPU caused by repeatedly looping.
currentTime = System.currentTimeMillis();
if ((currentTime - executionStart) < 1)
try
{
Thread.sleep(1);
}
catch (Exception ignored)
{
try
{
Thread.sleep(1);
}
catch (Exception ignored)
{
}
}
}
}
private int calculatePacketSize()
{
_sizeBuffer.rewind();
return ((_sizeBuffer.get() & 0xff) | ((_sizeBuffer.get() << 8) & 0xffff)) - 2;
return ((_sizeBuffer[0] & 0xff) | ((_sizeBuffer[1] << 8) & 0xffff)) - 2;
}
private void onDisconnection(E client)

View File

@@ -1,6 +1,5 @@
package org.l2jmobius.commons.network;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -13,7 +12,6 @@ public abstract class WritablePacket
{
private byte[] _data;
private byte[] _sendableBytes;
private ByteBuffer _byteBuffer;
private int _position = 2; // Allocate space for size (max length 65535 - size header).
/**
@@ -270,38 +268,6 @@ public abstract class WritablePacket
return _sendableBytes;
}
/**
* @return ByteBuffer of the sendable packet data, including a size header.
*/
public ByteBuffer getSendableByteBuffer()
{
return getSendableByteBuffer(null);
}
/**
* @param encryption if EncryptionInterface is used.
* @return ByteBuffer of the sendable packet data, including a size header.
*/
public synchronized ByteBuffer getSendableByteBuffer(EncryptionInterface encryption)
{
// Generate sendable ByteBuffer.
if ((_byteBuffer == null /* Not processed */) || (encryption != null /* Encryption can change */))
{
final byte[] bytes = getSendableBytes(encryption);
if (bytes != null) // Data was actually written.
{
_byteBuffer = ByteBuffer.wrap(bytes);
}
}
else // Rewind the buffer.
{
_byteBuffer.rewind();
}
// Return the buffer.
return _byteBuffer;
}
/**
* Take in consideration that data must be written first.
* @return The length of the data (includes size header).

View File

@@ -457,7 +457,6 @@ public class GameServer
server.getNetConfig().setPacketFloodDrop(Config.PACKET_FLOOD_DROP);
server.getNetConfig().setPacketFloodLogged(Config.PACKET_FLOOD_LOGGED);
server.getNetConfig().setTcpNoDelay(Config.TCP_NO_DELAY);
server.getNetConfig().setConnectionTimeout(Config.CONNECTION_TIMEOUT);
server.getNetConfig().setFailedDecryptionLogged(Config.FAILED_DECRYPTION_LOGGED);
server.start();

View File

@@ -16,8 +16,6 @@
*/
package org.l2jmobius.gameserver.network;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -224,30 +222,11 @@ public class GameClient extends NetClient
_pendingPackets.add(packet);
synchronized (_pendingPackets)
{
final SocketChannel channel = getChannel();
if ((channel != null) && channel.isConnected())
{
final ServerPacket sendPacket = _pendingPackets.poll();
final ByteBuffer byteBuffer = sendPacket.getSendableByteBuffer(_encryption);
if (byteBuffer != null)
{
// Send the packet data.
try
{
// Loop while there are remaining bytes in the buffer.
while (byteBuffer.hasRemaining())
{
channel.write(byteBuffer);
}
}
catch (Exception ignored)
{
}
// Run packet implementation.
sendPacket.run(_player);
}
}
// Send the packet data.
super.sendPacket(packet);
// Run packet implementation.
packet.run(_player);
}
}

View File

@@ -16,6 +16,8 @@
*/
package org.l2jmobius.loginserver.network;
import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
@@ -145,20 +147,26 @@ public class LoginClient extends NetClient
return _connectionStartTime;
}
@Override
public void sendPacket(WritablePacket packet)
{
if ((packet == null))
{
return;
}
// Write into the channel.
if ((getChannel() != null) && getChannel().isConnected())
final Socket socket = getSocket();
if ((socket != null) && socket.isConnected())
{
final byte[] sendableBytes = packet.getSendableBytes();
if (sendableBytes == null)
{
return;
}
try
{
// Send the packet data.
getChannel().write(packet.getSendableByteBuffer());
final OutputStream outputStream = getOutputStream();
synchronized (this)
{
outputStream.write(sendableBytes);
outputStream.flush();
}
}
catch (Exception ignored)
{