Java Thread Pool

``` import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class CustomThreadPool implements ExecutorService {
// --- Core State ---
private final BlockingQueue<Runnable> taskQueue;
private final List<WorkerThread> workers;
private final ReentrantLock poolLock = new ReentrantLock();
private volatile boolean isShutdown = false;
private volatile boolean isTerminated = false;
private final int corePoolSize;
private final AtomicInteger activeTaskCount = new AtomicInteger(0);
// Latch used by awaitTermination()
private final CountDownLatch terminationLatch;
// --- Constructor ---
public CustomThreadPool(int corePoolSize, int queueCapacity) {
if (corePoolSize <= 0) throw new IllegalArgumentException("Pool size must be > 0");
this.corePoolSize = corePoolSize;
this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);
this.workers = new ArrayList<>(corePoolSize);
this.terminationLatch = new CountDownLatch(corePoolSize);
for (int i = 0; i < corePoolSize; i++) {
WorkerThread worker = new WorkerThread("pool-worker-" + i);
workers.add(worker);
worker.start();
}
}
// --- Worker Thread ---
private class WorkerThread extends Thread {
WorkerThread(String name) {
super(name);
setDaemon(false); // Keep JVM alive until workers finish
}
@Override
public void run() {
System.out.println(getName() + " started.");
try {
while (true) {
Runnable task = null;
if (isShutdown) {
// Drain remaining tasks, then exit
task = taskQueue.poll();
if (task == null) break;
} else {
// Block waiting for a task (with timeout to re-check shutdown flag)
task = taskQueue.poll(500, TimeUnit.MILLISECONDS);
if (task == null) continue; // re-check loop condition
}
activeTaskCount.incrementAndGet();
try {
task.run();
} catch (Exception e) {
System.err.println(getName() + " caught exception: " + e.getMessage());
} finally {
activeTaskCount.decrementAndGet();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(getName() + " interrupted.");
} finally {
System.out.println(getName() + " exiting.");
terminationLatch.countDown(); // Signal this worker is done
}
}
}
// --- ExecutorService Core Methods ---
@Override
public void execute(Runnable command) {
if (command == null) throw new NullPointerException("Task cannot be null");
poolLock.lock();
try {
if (isShutdown) {
throw new RejectedExecutionException("Pool is shut down, task rejected.");
}
// Offer is non-blocking; throws if queue is full
boolean accepted = taskQueue.offer(command);
if (!accepted) {
throw new RejectedExecutionException("Task queue is full, task rejected.");
}
} finally {
poolLock.unlock();
}
}
@Override
public <T> Future<T> submit(Callable<T> callable) {
if (callable == null) throw new NullPointerException();
FutureTask<T> future = new FutureTask<>(callable);
execute(future);
return future;
}
@Override
public Future<?> submit(Runnable task) {
FutureTask<Void> future = new FutureTask<>(task, null);
execute(future);
return future;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
FutureTask<T> future = new FutureTask<>(task, result);
execute(future);
return future;
}
// --- Shutdown ---
@Override
public void shutdown() {
poolLock.lock();
try {
isShutdown = true;
System.out.println("Pool shutdown initiated. Finishing queued tasks...");
} finally {
poolLock.unlock();
}
}
@Override
public List<Runnable> shutdownNow() {
poolLock.lock();
try {
isShutdown = true;
workers.forEach(Thread::interrupt); // Interrupt all workers immediately
List<Runnable> unfinished = new ArrayList<>();
taskQueue.drainTo(unfinished); // Return unstarted tasks
System.out.println("Immediate shutdown. Unstarted tasks returned: " + unfinished.size());
return unfinished;
} finally {
poolLock.unlock();
}
}
@Override
public boolean isShutdown() {
return isShutdown;
}
@Override
public boolean isTerminated() {
return isTerminated;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean allDone = terminationLatch.await(timeout, unit);
if (allDone) isTerminated = true;
return allDone;
}
// --- Bulk submission (simplified) ---
@Override
public <T> List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> futures = new ArrayList<>();
for (Callable<T> task : tasks) {
futures.add(submit(task));
}
// Wait for all to complete
for (Future<T> f : futures) {
try { f.get(); } catch (ExecutionException ignored) {}
}
return futures;
}
@Override
public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException("invokeAny not implemented in this example");
}
@Override
public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException("invokeAny not implemented in this example");
}
@Override
public <T> List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("invokeAll with timeout not implemented in this example");
}
// --- Demo ---
public static void main(String\[\] args) throws InterruptedException {
CustomThreadPool pool = new CustomThreadPool(3, 20);
// Submit 10 tasks
for (int i = 0; i < 10; i++) {
final int taskId = i;
pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " executing task " + taskId);
try { Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
}
// Submit a Callable and get a result
Future<String> result = pool.submit(() -> {
Thread.sleep(100);
return "Hello from Callable!";
});
try {
System.out.println("Callable result: " + result.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
// Graceful shutdown
pool.shutdown();
boolean terminated = pool.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Pool terminated cleanly: " + terminated);
}
} ```

