ThreadPool class improvements.

This commit is contained in:
MobiusDevelopment 2019-03-31 17:46:01 +00:00
parent 0c3a512bd3
commit 36457c0bec
39 changed files with 442 additions and 715 deletions

View File

@ -1272,8 +1272,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1279,8 +1279,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1287,8 +1287,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1280,8 +1280,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1275,8 +1275,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1275,8 +1275,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1282,8 +1282,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -2330,8 +2330,16 @@ public final class Config
LOG_ITEMS = Boolean.valueOf(devSettings.getProperty("LogItems", "false")); LOG_ITEMS = Boolean.valueOf(devSettings.getProperty("LogItems", "false"));
SCHEDULED_THREAD_POOL_COUNT = Integer.parseInt(devSettings.getProperty("ScheduledThreadPoolCount", "-1")); SCHEDULED_THREAD_POOL_COUNT = Integer.parseInt(devSettings.getProperty("ScheduledThreadPoolCount", "-1"));
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = Integer.parseInt(devSettings.getProperty("ThreadsPerScheduledThreadPool", "4")); THREADS_PER_SCHEDULED_THREAD_POOL = Integer.parseInt(devSettings.getProperty("ThreadsPerScheduledThreadPool", "4"));
INSTANT_THREAD_POOL_COUNT = Integer.parseInt(devSettings.getProperty("InstantThreadPoolCount", "-1")); INSTANT_THREAD_POOL_COUNT = Integer.parseInt(devSettings.getProperty("InstantThreadPoolCount", "-1"));
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = Integer.parseInt(devSettings.getProperty("ThreadsPerInstantThreadPool", "2")); THREADS_PER_INSTANT_THREAD_POOL = Integer.parseInt(devSettings.getProperty("ThreadsPerInstantThreadPool", "2"));
LAZY_CACHE = Boolean.valueOf(devSettings.getProperty("LazyCache", "false")); LAZY_CACHE = Boolean.valueOf(devSettings.getProperty("LazyCache", "false"));

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1441,8 +1441,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1219,8 +1219,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1223,8 +1223,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1223,8 +1223,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();

View File

@ -1223,8 +1223,16 @@ public final class Config
SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false); SERVER_LIST_BRACKET = serverSettings.getBoolean("ServerListBrackets", false);
SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1); SCHEDULED_THREAD_POOL_COUNT = serverSettings.getInt("ScheduledThreadPoolCount", -1);
if (SCHEDULED_THREAD_POOL_COUNT == -1)
{
SCHEDULED_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4); THREADS_PER_SCHEDULED_THREAD_POOL = serverSettings.getInt("ThreadsPerScheduledThreadPool", 4);
INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1); INSTANT_THREAD_POOL_COUNT = serverSettings.getInt("InstantThreadPoolCount", -1);
if (INSTANT_THREAD_POOL_COUNT == -1)
{
INSTANT_THREAD_POOL_COUNT = Runtime.getRuntime().availableProcessors();
}
THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2); THREADS_PER_INSTANT_THREAD_POOL = serverSettings.getInt("ThreadsPerInstantThreadPool", 2);
IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2); IO_PACKET_THREAD_CORE_SIZE = serverSettings.getInt("UrgentPacketThreadCoreSize", 2);

View File

@ -37,7 +37,7 @@ public final class RunnableWrapper implements Runnable
{ {
_runnable.run(); _runnable.run();
} }
catch (final Throwable e) catch (Throwable e)
{ {
final Thread t = Thread.currentThread(); final Thread t = Thread.currentThread();
final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler(); final UncaughtExceptionHandler h = t.getUncaughtExceptionHandler();

View File

@ -39,37 +39,21 @@ public final class ThreadPool
{ {
private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; private static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[Config.SCHEDULED_THREAD_POOL_COUNT];
private static ThreadPoolExecutor[] INSTANT_POOLS; private static ThreadPoolExecutor[] INSTANT_POOLS = new ThreadPoolExecutor[Config.INSTANT_THREAD_POOL_COUNT];
private static int THREAD_POOL_RANDOMIZER; private static volatile int SCHEDULED_THREAD_RANDOMIZER = 0;
private static volatile int INSTANT_THREAD_RANDOMIZER = 0;
/**
* Init the different pools, based on Config. It is launched only once, on Gameserver instance.
*/
public static void init() public static void init()
{ {
// Feed scheduled pool. // Feed scheduled pool.
int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; for (int i = 0; i < Config.SCHEDULED_THREAD_POOL_COUNT; i++)
if (scheduledPoolCount == -1)
{
scheduledPoolCount = Runtime.getRuntime().availableProcessors();
}
SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount];
for (int i = 0; i < scheduledPoolCount; i++)
{ {
SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL);
} }
// Feed instant pool. // Feed instant pool.
int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; for (int i = 0; i < Config.INSTANT_THREAD_POOL_COUNT; i++)
if (instantPoolCount == -1)
{
instantPoolCount = Runtime.getRuntime().availableProcessors();
}
INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount];
for (int i = 0; i < instantPoolCount; i++)
{ {
INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000)); INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000));
} }
@ -80,7 +64,6 @@ public final class ThreadPool
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
threadPool.prestartAllCoreThreads(); threadPool.prestartAllCoreThreads();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
@ -94,33 +77,33 @@ public final class ThreadPool
}, 600000, 600000); }, 600000, 600000);
LOGGER.info("ThreadPool: Initialized"); LOGGER.info("ThreadPool: Initialized");
LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.SCHEDULED_THREAD_POOL_COUNT + " scheduled pool executors with " + (Config.SCHEDULED_THREAD_POOL_COUNT * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads.");
LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); LOGGER.info("..." + Config.INSTANT_THREAD_POOL_COUNT + " instant pool executors with " + (Config.INSTANT_THREAD_POOL_COUNT * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads.");
} }
public static void purge() public static void purge()
{ {
for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool1.purge(); threadPool.purge();
} }
for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool2.purge(); threadPool.purge();
} }
} }
/** /**
* Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a one-shot action that becomes enabled after the given delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param delay : the time from now to delay execution.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion.
*/ */
public static ScheduledFuture<?> schedule(Runnable r, long delay) public static ScheduledFuture<?> schedule(Runnable runnable, long delay)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].schedule(new RunnableWrapper(runnable), delay, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -129,17 +112,17 @@ public final class ThreadPool
} }
/** /**
* Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. * Creates and executes a periodic action that becomes enabled first after the given initial delay.
* @param r : the task to execute. * @param runnable : the task to execute.
* @param delay : the time from now to delay execution. * @param initialDelay : the time to delay first execution.
* @param period : the period between successive executions. * @param period : the period between successive executions.
* @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation.
*/ */
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long delay, long period) public static ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period)
{ {
try try
{ {
return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); return SCHEDULED_POOLS[SCHEDULED_THREAD_RANDOMIZER++ % Config.SCHEDULED_THREAD_POOL_COUNT].scheduleAtFixedRate(new RunnableWrapper(runnable), initialDelay, period, TimeUnit.MILLISECONDS);
} }
catch (Exception e) catch (Exception e)
{ {
@ -149,29 +132,19 @@ public final class ThreadPool
/** /**
* Executes the given task sometime in the future. * Executes the given task sometime in the future.
* @param r : the task to execute. * @param runnable : the task to execute.
*/ */
public static void execute(Runnable r) public static void execute(Runnable runnable)
{ {
try try
{ {
getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); INSTANT_POOLS[INSTANT_THREAD_RANDOMIZER++ % Config.INSTANT_THREAD_POOL_COUNT].execute(new RunnableWrapper(runnable));
} }
catch (Exception e) catch (Exception e)
{ {
} }
} }
/**
* @param <T> : The pool type.
* @param threadPools : The pool array to check.
* @return the less fed pool.
*/
private static <T> T getPool(T[] threadPools)
{
return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length];
}
public static String[] getStats() public static String[] getStats()
{ {
final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10];
@ -215,12 +188,10 @@ public final class ThreadPool
try try
{ {
LOGGER.info("ThreadPool: Shutting down."); LOGGER.info("ThreadPool: Shutting down.");
for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
for (ThreadPoolExecutor threadPool : INSTANT_POOLS) for (ThreadPoolExecutor threadPool : INSTANT_POOLS)
{ {
threadPool.shutdownNow(); threadPool.shutdownNow();