From eef3b64cf9874dcac43ea4e178af2669aae355be Mon Sep 17 00:00:00 2001 From: MobiusDev <8391001+MobiusDevelopment@users.noreply.github.com> Date: Sun, 15 Apr 2018 20:27:27 +0000 Subject: [PATCH] Replaced ThreadPool with an improved old version. --- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- .../commons/concurrent/ThreadPool.java | 354 ++++++++---------- 8 files changed, 1272 insertions(+), 1560 deletions(-) diff --git a/L2J_Mobius_1.0_Ertheia/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_1.0_Ertheia/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 2c1643ae66..d157412b2e 100644 --- a/L2J_Mobius_1.0_Ertheia/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_1.0_Ertheia/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ * */ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_2.5_Underground/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_2.5_Underground/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 2c1643ae66..d157412b2e 100644 --- a/L2J_Mobius_2.5_Underground/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_2.5_Underground/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_3.0_Helios/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_3.0_Helios/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 2c1643ae66..d157412b2e 100644 --- a/L2J_Mobius_3.0_Helios/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_3.0_Helios/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_4.0_GrandCrusade/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_4.0_GrandCrusade/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 2c1643ae66..d157412b2e 100644 --- a/L2J_Mobius_4.0_GrandCrusade/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_4.0_GrandCrusade/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_C6_Interlude/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_C6_Interlude/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 2c1643ae66..d157412b2e 100644 --- a/L2J_Mobius_C6_Interlude/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_C6_Interlude/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_CT_2.6_HighFive/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_CT_2.6_HighFive/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 79af5cb3e5..7cf193d83a 100644 --- a/L2J_Mobius_CT_2.6_HighFive/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_CT_2.6_HighFive/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_Classic_2.0_Saviors/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_Classic_2.0_Saviors/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 2c1643ae66..d157412b2e 100644 --- a/L2J_Mobius_Classic_2.0_Saviors/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_Classic_2.0_Saviors/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file diff --git a/L2J_Mobius_Classic_2.0_Zaken/java/com/l2jmobius/commons/concurrent/ThreadPool.java b/L2J_Mobius_Classic_2.0_Zaken/java/com/l2jmobius/commons/concurrent/ThreadPool.java index 79af5cb3e5..7cf193d83a 100644 --- a/L2J_Mobius_Classic_2.0_Zaken/java/com/l2jmobius/commons/concurrent/ThreadPool.java +++ b/L2J_Mobius_Classic_2.0_Zaken/java/com/l2jmobius/commons/concurrent/ThreadPool.java @@ -16,255 +16,219 @@ */ package com.l2jmobius.commons.concurrent; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import java.util.stream.Stream; import com.l2jmobius.Config; /** - * @author _dev_ (savormix) - * @author NB4L1 + * This class handles thread pooling system. It relies on two ThreadPoolExecutor arrays, which poolers number is generated using config. + *

+ * Those arrays hold following pools: + *

+ *
    + *
  • Scheduled pool keeps a track about incoming, future events.
  • + *
  • Instant pool handles short-life events.
  • + *
*/ public final class ThreadPool { - private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); + protected static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); - private static ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR; - private static ThreadPoolExecutor THREAD_POOL_EXECUTOR; + protected static ScheduledThreadPoolExecutor[] SCHEDULED_POOLS; + protected static ThreadPoolExecutor[] INSTANT_POOLS; + private static int THREAD_POOL_RANDOMIZER; - public static void init() throws Exception + /** + * Init the different pools, based on Config. It is launched only once, on Gameserver instance. + */ + public static void init() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR != null) || (THREAD_POOL_EXECUTOR != null)) + // Feed scheduled pool. + int scheduledPoolCount = Config.SCHEDULED_THREAD_POOL_COUNT; + if (scheduledPoolCount == -1) { - throw new Exception("The thread pool has been already initialized!"); + scheduledPoolCount = Runtime.getRuntime().availableProcessors(); } - SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(Config.SCHEDULED_THREAD_POOL_COUNT != -1 ? Config.SCHEDULED_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_SCHEDULED_THREAD_POOL, new PoolThreadFactory("L2JM-S-", Thread.NORM_PRIORITY)); - final int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT != -1 ? Config.INSTANT_THREAD_POOL_COUNT : Runtime.getRuntime().availableProcessors() * Config.THREADS_PER_INSTANT_THREAD_POOL; - THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(instantPoolCount, instantPoolCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new PoolThreadFactory("L2JM-I-", Thread.NORM_PRIORITY)); - - getThreadPools().forEach(tp -> + SCHEDULED_POOLS = new ScheduledThreadPoolExecutor[scheduledPoolCount]; + for (int i = 0; i < scheduledPoolCount; i++) { - tp.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); - tp.prestartAllCoreThreads(); - }); + SCHEDULED_POOLS[i] = new ScheduledThreadPoolExecutor(Config.THREADS_PER_SCHEDULED_THREAD_POOL); + } - scheduleAtFixedRate(ThreadPool::purge, 60000, 60000); // Repeats every minute. + // Feed instant pool. + int instantPoolCount = Config.INSTANT_THREAD_POOL_COUNT; + if (instantPoolCount == -1) + { + instantPoolCount = Runtime.getRuntime().availableProcessors(); + } - LOGGER.info("ThreadPool: Initialized with"); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + "/" + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize() + " scheduled thread(s)."); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getPoolSize() + "/" + THREAD_POOL_EXECUTOR.getMaximumPoolSize() + " thread(s)."); + INSTANT_POOLS = new ThreadPoolExecutor[instantPoolCount]; + for (int i = 0; i < instantPoolCount; i++) + { + INSTANT_POOLS[i] = new ThreadPoolExecutor(Config.THREADS_PER_INSTANT_THREAD_POOL, Config.THREADS_PER_INSTANT_THREAD_POOL, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(100000)); + } + + // Prestart core threads. + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) + { + threadPool.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl()); + threadPool.prestartAllCoreThreads(); + } + + // Launch purge task. + scheduleAtFixedRate(() -> + { + purge(); + }, 600000, 600000); + + LOGGER.info("ThreadPool: Initialized"); + LOGGER.info("..." + scheduledPoolCount + " scheduled pool executors with " + (scheduledPoolCount * Config.THREADS_PER_SCHEDULED_THREAD_POOL) + " total threads."); + LOGGER.info("..." + instantPoolCount + " instant pool executors with " + (instantPoolCount * Config.THREADS_PER_INSTANT_THREAD_POOL) + " total threads."); + } + + public static void purge() + { + for (ScheduledThreadPoolExecutor threadPool1 : SCHEDULED_POOLS) + { + threadPool1.purge(); + } + for (ThreadPoolExecutor threadPool2 : INSTANT_POOLS) + { + threadPool2.purge(); + } } /** - * Gets the scheduled thread pool executor. - * @return the scheduled thread pool executor + * Schedules a one-shot action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion. */ - public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() + public static ScheduledFuture schedule(Runnable r, long delay) { - return SCHEDULED_THREAD_POOL_EXECUTOR; + try + { + return getPool(SCHEDULED_POOLS).schedule(new RunnableWrapper(r), delay, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** - * Gets the thread pool executor. - * @return the thread pool executor + * Schedules a periodic action that becomes enabled after a delay. The pool is chosen based on pools activity. + * @param r : the task to execute. + * @param delay : the time from now to delay execution. + * @param period : the period between successive executions. + * @return a ScheduledFuture representing pending completion of the task and whose get() method will throw an exception upon cancellation. */ - public static ThreadPoolExecutor getThreadPoolExecutor() + public static ScheduledFuture scheduleAtFixedRate(Runnable r, long delay, long period) { - return THREAD_POOL_EXECUTOR; - } - - /** - * Gets a stream of all the thread pools. - * @return the stream of all the thread pools - */ - public static Stream getThreadPools() - { - return Stream.of(SCHEDULED_THREAD_POOL_EXECUTOR, THREAD_POOL_EXECUTOR); - } - - /** - * Schedules a task to be executed after the given delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture schedule(Runnable task, long delay) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.schedule(new RunnableWrapper(task), delay, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay at fixed rate in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleAtFixedRate(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); - } - - /** - * Schedules a task to be executed after the given delay with fixed delay in milliseconds. - * @param task the task to execute - * @param delay the delay in the given time unit - * @param period the period in the given time unit - * @return a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation - */ - public static ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay, long period) - { - return SCHEDULED_THREAD_POOL_EXECUTOR.scheduleWithFixedDelay(new RunnableWrapper(task), delay, period, TimeUnit.MILLISECONDS); + try + { + return getPool(SCHEDULED_POOLS).scheduleAtFixedRate(new RunnableWrapper(r), delay, period, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return null; + } } /** * Executes the given task sometime in the future. - * @param task the task to execute + * @param r : the task to execute. */ - public static void execute(Runnable task) + public static void execute(Runnable r) { - THREAD_POOL_EXECUTOR.execute(new RunnableWrapper(task)); + try + { + getPool(INSTANT_POOLS).execute(new RunnableWrapper(r)); + } + catch (Exception e) + { + } } /** - * Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion. - * @param task the task to submit - * @return a Future representing pending completion of the task + * @param : The pool type. + * @param threadPools : The pool array to check. + * @return the less fed pool. */ - public static Future submit(Runnable task) + private static T getPool(T[] threadPools) { - return THREAD_POOL_EXECUTOR.submit(new RunnableWrapper(task)); + return threadPools[THREAD_POOL_RANDOMIZER++ % threadPools.length]; + } + + public static String[] getStats() + { + final String[] stats = new String[(SCHEDULED_POOLS.length + INSTANT_POOLS.length) * 10]; + int pos = 0; + for (int i = 0; i < SCHEDULED_POOLS.length; i++) + { + final ScheduledThreadPoolExecutor threadPool = SCHEDULED_POOLS[i]; + stats[pos++] = "Scheduled pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + for (int i = 0; i < INSTANT_POOLS.length; i++) + { + final ThreadPoolExecutor threadPool = INSTANT_POOLS[i]; + stats[pos++] = "Instant pool #" + i + ":"; + stats[pos++] = " |- ActiveCount: ...... " + threadPool.getActiveCount(); + stats[pos++] = " |- CorePoolSize: ..... " + threadPool.getCorePoolSize(); + stats[pos++] = " |- PoolSize: ......... " + threadPool.getPoolSize(); + stats[pos++] = " |- LargestPoolSize: .. " + threadPool.getLargestPoolSize(); + stats[pos++] = " |- MaximumPoolSize: .. " + threadPool.getMaximumPoolSize(); + stats[pos++] = " |- CompletedTaskCount: " + threadPool.getCompletedTaskCount(); + stats[pos++] = " |- QueuedTaskCount: .. " + threadPool.getQueue().size(); + stats[pos++] = " |- TaskCount: ........ " + threadPool.getTaskCount(); + stats[pos++] = " | -------"; + } + return stats; } /** - * Purges all thread pools. - */ - public static void purge() - { - getThreadPools().forEach(ThreadPoolExecutor::purge); - } - - /** - * Gets the thread pools stats. - * @return the stats - */ - public static List getStats() - { - final List list = new ArrayList<>(23); - list.add(""); - list.add("Scheduled pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + SCHEDULED_THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + SCHEDULED_THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getCorePoolSize()); // ScheduledThreadPoolExecutor has a fixed number of threads and maximumPoolSize has no effect - list.add("getCompletedTaskCount: " + SCHEDULED_THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + SCHEDULED_THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - list.add("Thread pool:"); - list.add("================================================="); - list.add("getActiveCount: ...... " + THREAD_POOL_EXECUTOR.getActiveCount()); - list.add("getCorePoolSize: ..... " + THREAD_POOL_EXECUTOR.getCorePoolSize()); - list.add("getPoolSize: ......... " + THREAD_POOL_EXECUTOR.getPoolSize()); - list.add("getLargestPoolSize: .. " + THREAD_POOL_EXECUTOR.getLargestPoolSize()); - list.add("getMaximumPoolSize: .. " + THREAD_POOL_EXECUTOR.getMaximumPoolSize()); - list.add("getCompletedTaskCount: " + THREAD_POOL_EXECUTOR.getCompletedTaskCount()); - list.add("getQueuedTaskCount: .. " + THREAD_POOL_EXECUTOR.getQueue().size()); - list.add("getTaskCount: ........ " + THREAD_POOL_EXECUTOR.getTaskCount()); - list.add(""); - return list; - } - - /** - * Shutdowns the thread pools waiting for tasks to finish. + * Shutdown thread pooling system correctly. Send different informations. */ public static void shutdown() { - if ((SCHEDULED_THREAD_POOL_EXECUTOR == null) && (THREAD_POOL_EXECUTOR == null)) + try { - return; - } - - final long startTime = System.currentTimeMillis(); - - LOGGER.info("ThreadPool: Shutting down."); - LOGGER.info("...executing " + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks."); - LOGGER.info("...executing " + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks."); - - getThreadPools().forEach(tp -> - { - try + LOGGER.info("ThreadPool: Shutting down."); + + for (ScheduledThreadPoolExecutor threadPool : SCHEDULED_POOLS) { - tp.shutdown(); + threadPool.shutdownNow(); } - catch (Throwable t) + + for (ThreadPoolExecutor threadPool : INSTANT_POOLS) { - LOGGER.warning("" + t); - } - }); - - getThreadPools().forEach(t -> - { - try - { - t.awaitTermination(15, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - LOGGER.warning("" + e); - } - }); - - if (!SCHEDULED_THREAD_POOL_EXECUTOR.isTerminated()) - { - SCHEDULED_THREAD_POOL_EXECUTOR.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - SCHEDULED_THREAD_POOL_EXECUTOR.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - try - { - SCHEDULED_THREAD_POOL_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS); - } - catch (Throwable t) - { - LOGGER.warning("" + t); + threadPool.shutdownNow(); } } - - LOGGER.info("...success: " + getThreadPools().allMatch(ThreadPoolExecutor::isTerminated) + " in " + (System.currentTimeMillis() - startTime) + " ms."); - LOGGER.info("..." + SCHEDULED_THREAD_POOL_EXECUTOR.getQueue().size() + " scheduled tasks left."); - LOGGER.info("..." + THREAD_POOL_EXECUTOR.getQueue().size() + " tasks left."); - } - - private static final class PoolThreadFactory implements ThreadFactory - { - private final String _prefix; - private final int _priority; - private final AtomicInteger _threadId = new AtomicInteger(); - - public PoolThreadFactory(String prefix, int priority) + catch (Throwable t) { - _prefix = prefix; - _priority = priority; - } - - @Override - public Thread newThread(Runnable r) - { - final Thread thread = new Thread(r, _prefix + _threadId.incrementAndGet()); - thread.setPriority(_priority); - return thread; + t.printStackTrace(); } } -} +} \ No newline at end of file