Improved pending data read logic.

This commit is contained in:
MobiusDevelopment
2022-12-24 22:15:56 +00:00
parent 0ab71457ca
commit f90a7324e4
62 changed files with 4247 additions and 372 deletions

View File

@ -1,5 +1,6 @@
package org.l2jmobius.commons.network;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -17,6 +18,8 @@ public class NetClient
private SocketChannel _channel;
private NetConfig _netConfig;
private Queue<byte[]> _pendingPacketData;
private ByteBuffer _pendingByteBuffer;
private int _pendingPacketSize;
/**
* Initialize the client.
@ -77,6 +80,11 @@ public class NetClient
{
_pendingPacketData.clear();
}
if (_pendingByteBuffer != null)
{
_pendingByteBuffer = null;
}
}
/**
@ -124,6 +132,40 @@ public class NetClient
return _pendingPacketData;
}
/**
* @return the pending read ByteBuffer.
*/
public ByteBuffer getPendingByteBuffer()
{
return _pendingByteBuffer;
}
/**
* Set the pending read ByteBuffer.
* @param pendingByteBuffer the pending read ByteBuffer.
*/
public void setPendingByteBuffer(ByteBuffer pendingByteBuffer)
{
_pendingByteBuffer = pendingByteBuffer;
}
/**
* @return the expected pending packet size.
*/
public int getPendingPacketSize()
{
return _pendingPacketSize;
}
/**
* Set the expected pending packet size.
* @param pendingPacketSize the expected packet size.
*/
public void setPendingPacketSize(int pendingPacketSize)
{
_pendingPacketSize = pendingPacketSize;
}
/**
* @return the Encryption of this client.
*/

View File

@ -13,6 +13,7 @@ import java.util.Set;
public class ReadThread<E extends NetClient> implements Runnable
{
private final ByteBuffer _sizeBuffer = ByteBuffer.allocate(2); // Reusable size buffer.
private final ByteBuffer _pendingSizeBuffer = ByteBuffer.allocate(1); // Reusable pending size buffer.
private final Set<E> _pool;
public ReadThread(Set<E> pool)
@ -45,6 +46,45 @@ public class ReadThread<E extends NetClient> implements Runnable
continue ITERATE;
}
// Continue read if there is a pending ByteBuffer.
final ByteBuffer pendingByteBuffer = client.getPendingByteBuffer();
if (pendingByteBuffer != null)
{
// Allocate an additional ByteBuffer based on pending packet size.
final int pendingPacketSize = client.getPendingPacketSize();
final ByteBuffer additionalData = ByteBuffer.allocate(pendingPacketSize - pendingByteBuffer.position());
switch (channel.read(additionalData))
{
// Disconnected.
case -1:
{
onDisconnection(client);
continue ITERATE;
}
// Nothing read.
case 0:
{
continue ITERATE;
}
// Data was read.
default:
{
// Merge additional read data.
pendingByteBuffer.put(pendingByteBuffer.position(), additionalData, 0, additionalData.position());
pendingByteBuffer.position(pendingByteBuffer.position() + additionalData.position());
// Read was complete.
if (pendingByteBuffer.position() >= pendingPacketSize)
{
// Add packet data to client.
client.addPacketData(pendingByteBuffer.array());
client.setPendingByteBuffer(null);
}
}
}
continue ITERATE;
}
// Read incoming packet size (short).
_sizeBuffer.clear();
switch (channel.read(_sizeBuffer))
@ -60,6 +100,54 @@ public class ReadThread<E extends NetClient> implements Runnable
{
continue ITERATE;
}
// Need to read two bytes to calculate size.
case 1:
{
int attempt = 0; // Keep it under 10 attempts (100ms).
COMPLETE_SIZE_READ: while ((attempt++ < 10) && (_sizeBuffer.position() < 2))
{
// Wait for pending data.
try
{
Thread.sleep(10);
}
catch (Exception ignored)
{
}
// Try to read the missing extra byte.
_pendingSizeBuffer.clear();
switch (channel.read(_pendingSizeBuffer))
{
// Disconnected.
case -1:
{
onDisconnection(client);
continue ITERATE;
}
// Nothing read.
case 0:
{
continue COMPLETE_SIZE_READ;
}
// Merge additional read byte.
default:
{
_sizeBuffer.put(1, _pendingSizeBuffer, 0, 1);
_sizeBuffer.position(2);
}
}
}
// Read failed.
if (_sizeBuffer.position() < 2)
{
onDisconnection(client);
continue ITERATE;
}
// Fallthrough.
}
// Read actual packet bytes.
default:
{
@ -82,21 +170,16 @@ public class ReadThread<E extends NetClient> implements Runnable
// Send data read to the client packet queue.
default:
{
// Continue read if data length is less than expected.
// Read was not complete.
if (packetByteBuffer.position() < packetSize)
{
int attempt = 0; // Keep it under 10 attempts.
while ((attempt++ < 10) && (packetByteBuffer.position() < packetSize))
{
final ByteBuffer additionalData = ByteBuffer.allocate(packetSize - packetByteBuffer.position());
channel.read(additionalData);
packetByteBuffer.put(packetByteBuffer.position(), additionalData, 0, additionalData.position());
packetByteBuffer.position(packetByteBuffer.position() + additionalData.position());
}
client.setPendingByteBuffer(packetByteBuffer);
client.setPendingPacketSize(packetSize);
}
else // Add packet data to client.
{
client.addPacketData(packetByteBuffer.array());
}
// Add packet data to client.
client.addPacketData(packetByteBuffer.array());
}
}
}