christova  

ThreadPool

https://stackify.com/java-thread-pools/#:~:text=Thread%20pool%20is%20a%20core,be%20used%20to%20execute%20tasks.

Finally Getting the Most out of the Java Thread Pool

``` import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.locks.ReentrantLock;

public class CustomThreadPool implements ExecutorService {

// --- Core State ---

private final BlockingQueue<Runnable> taskQueue;

private final List<WorkerThread> workers;

private final ReentrantLock poolLock = new ReentrantLock();

private volatile boolean isShutdown = false;

private volatile boolean isTerminated = false;

private final int corePoolSize;

private final AtomicInteger activeTaskCount = new AtomicInteger(0);

// Latch used by awaitTermination()

private final CountDownLatch terminationLatch;

// --- Constructor ---

public CustomThreadPool(int corePoolSize, int queueCapacity) {

    if (corePoolSize <= 0) throw new IllegalArgumentException("Pool size must be > 0");

    this.corePoolSize = corePoolSize;

    this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);

    this.workers = new ArrayList<>(corePoolSize);

    this.terminationLatch = new CountDownLatch(corePoolSize);

    for (int i = 0; i < corePoolSize; i++) {

        WorkerThread worker = new WorkerThread("pool-worker-" + i);

        workers.add(worker);

        worker.start();

    }

}

// --- Worker Thread ---

private class WorkerThread extends Thread {

    WorkerThread(String name) {

        super(name);

        setDaemon(false); // Keep JVM alive until workers finish

    }

    @Override

    public void run() {

        System.out.println(getName() + " started.");

        try {

            while (true) {

                Runnable task = null;

                if (isShutdown) {

                    // Drain remaining tasks, then exit

                    task = taskQueue.poll();

                    if (task == null) break;

                } else {

                    // Block waiting for a task (with timeout to re-check shutdown flag)

                    task = taskQueue.poll(500, TimeUnit.MILLISECONDS);

                    if (task == null) continue; // re-check loop condition

                }

                activeTaskCount.incrementAndGet();

                try {

                    task.run();

                } catch (Exception e) {

                    System.err.println(getName() + " caught exception: " + e.getMessage());

                } finally {

                    activeTaskCount.decrementAndGet();

                }

            }

        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

            System.out.println(getName() + " interrupted.");

        } finally {

            System.out.println(getName() + " exiting.");

            terminationLatch.countDown(); // Signal this worker is done

        }

    }

}

// --- ExecutorService Core Methods ---

@Override

public void execute(Runnable command) {

    if (command == null) throw new NullPointerException("Task cannot be null");

    poolLock.lock();

    try {

        if (isShutdown) {

            throw new RejectedExecutionException("Pool is shut down, task rejected.");

        }

        // Offer is non-blocking; throws if queue is full

        boolean accepted = taskQueue.offer(command);

        if (!accepted) {

            throw new RejectedExecutionException("Task queue is full, task rejected.");

        }

    } finally {

        poolLock.unlock();

    }

}

@Override

public <T> Future<T> submit(Callable<T> callable) {

    if (callable == null) throw new NullPointerException();

    FutureTask<T> future = new FutureTask<>(callable);

    execute(future);

    return future;

}

@Override

public Future<?> submit(Runnable task) {

    FutureTask<Void> future = new FutureTask<>(task, null);

    execute(future);

    return future;

}

@Override

public <T> Future<T> submit(Runnable task, T result) {

    FutureTask<T> future = new FutureTask<>(task, result);

    execute(future);

    return future;

}

// --- Shutdown ---

@Override

public void shutdown() {

    poolLock.lock();

    try {

        isShutdown = true;

        System.out.println("Pool shutdown initiated. Finishing queued tasks...");

    } finally {

        poolLock.unlock();

    }

}

@Override

public List<Runnable> shutdownNow() {

    poolLock.lock();

    try {

        isShutdown = true;

        workers.forEach(Thread::interrupt); // Interrupt all workers immediately

        List<Runnable> unfinished = new ArrayList<>();

        taskQueue.drainTo(unfinished); // Return unstarted tasks

        System.out.println("Immediate shutdown. Unstarted tasks returned: " + unfinished.size());

        return unfinished;

    } finally {

        poolLock.unlock();

    }

}

@Override

public boolean isShutdown() {

    return isShutdown;

}

@Override

public boolean isTerminated() {

    return isTerminated;

}

@Override

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {

    boolean allDone = terminationLatch.await(timeout, unit);

    if (allDone) isTerminated = true;

    return allDone;

}

// --- Bulk submission (simplified) ---

@Override

public <T> List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks)

        throws InterruptedException {

    List<Future<T>> futures = new ArrayList<>();

    for (Callable<T> task : tasks) {

        futures.add(submit(task));

    }

    // Wait for all to complete

    for (Future<T> f : futures) {

        try { f.get(); } catch (ExecutionException ignored) {}

    }

    return futures;

}

@Override

public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks)

        throws InterruptedException, ExecutionException {

    throw new UnsupportedOperationException("invokeAny not implemented in this example");

}

@Override

public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

    throw new UnsupportedOperationException("invokeAny not implemented in this example");

}

@Override

public <T> List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks,

                                      long timeout, TimeUnit unit) throws InterruptedException {

    throw new UnsupportedOperationException("invokeAll with timeout not implemented in this example");

}

// --- Demo ---

public static void main(String\[\] args) throws InterruptedException {

    CustomThreadPool pool = new CustomThreadPool(3, 20);

    // Submit 10 tasks

    for (int i = 0; i < 10; i++) {

        final int taskId = i;

        pool.submit(() -> {

            System.out.println(Thread.currentThread().getName() + " executing task " + taskId);

            try { Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }

        });

    }

    // Submit a Callable and get a result

    Future<String> result = pool.submit(() -> {

        Thread.sleep(100);

        return "Hello from Callable!";

    });

    try {

        System.out.println("Callable result: " + result.get());

    } catch (ExecutionException e) {

        e.printStackTrace();

    }

    // Graceful shutdown

    pool.shutdown();

    boolean terminated = pool.awaitTermination(5, TimeUnit.SECONDS);

    System.out.println("Pool terminated cleanly: " + terminated);

}

} ```

#Java #Threads #ThreadPool