christova  

threads

https://stackify.com/java-thread-pools/#:~:text=Thread%20pool%20is%20a%20core,be%20used%20to%20execute%20tasks.

Finally Getting the Most out of the Java Thread Pool

``` import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.locks.ReentrantLock;

public class CustomThreadPool implements ExecutorService {

// --- Core State ---

private final BlockingQueue<Runnable> taskQueue;

private final List<WorkerThread> workers;

private final ReentrantLock poolLock = new ReentrantLock();

private volatile boolean isShutdown = false;

private volatile boolean isTerminated = false;

private final int corePoolSize;

private final AtomicInteger activeTaskCount = new AtomicInteger(0);

// Latch used by awaitTermination()

private final CountDownLatch terminationLatch;

// --- Constructor ---

public CustomThreadPool(int corePoolSize, int queueCapacity) {

    if (corePoolSize <= 0) throw new IllegalArgumentException("Pool size must be > 0");

    this.corePoolSize = corePoolSize;

    this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);

    this.workers = new ArrayList<>(corePoolSize);

    this.terminationLatch = new CountDownLatch(corePoolSize);

    for (int i = 0; i < corePoolSize; i++) {

        WorkerThread worker = new WorkerThread("pool-worker-" + i);

        workers.add(worker);

        worker.start();

    }

}

// --- Worker Thread ---

private class WorkerThread extends Thread {

    WorkerThread(String name) {

        super(name);

        setDaemon(false); // Keep JVM alive until workers finish

    }

    @Override

    public void run() {

        System.out.println(getName() + " started.");

        try {

            while (true) {

                Runnable task = null;

                if (isShutdown) {

                    // Drain remaining tasks, then exit

                    task = taskQueue.poll();

                    if (task == null) break;

                } else {

                    // Block waiting for a task (with timeout to re-check shutdown flag)

                    task = taskQueue.poll(500, TimeUnit.MILLISECONDS);

                    if (task == null) continue; // re-check loop condition

                }

                activeTaskCount.incrementAndGet();

                try {

                    task.run();

                } catch (Exception e) {

                    System.err.println(getName() + " caught exception: " + e.getMessage());

                } finally {

                    activeTaskCount.decrementAndGet();

                }

            }

        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

            System.out.println(getName() + " interrupted.");

        } finally {

            System.out.println(getName() + " exiting.");

            terminationLatch.countDown(); // Signal this worker is done

        }

    }

}

// --- ExecutorService Core Methods ---

@Override

public void execute(Runnable command) {

    if (command == null) throw new NullPointerException("Task cannot be null");

    poolLock.lock();

    try {

        if (isShutdown) {

            throw new RejectedExecutionException("Pool is shut down, task rejected.");

        }

        // Offer is non-blocking; throws if queue is full

        boolean accepted = taskQueue.offer(command);

        if (!accepted) {

            throw new RejectedExecutionException("Task queue is full, task rejected.");

        }

    } finally {

        poolLock.unlock();

    }

}

@Override

public <T> Future<T> submit(Callable<T> callable) {

    if (callable == null) throw new NullPointerException();

    FutureTask<T> future = new FutureTask<>(callable);

    execute(future);

    return future;

}

@Override

public Future<?> submit(Runnable task) {

    FutureTask<Void> future = new FutureTask<>(task, null);

    execute(future);

    return future;

}

@Override

public <T> Future<T> submit(Runnable task, T result) {

    FutureTask<T> future = new FutureTask<>(task, result);

    execute(future);

    return future;

}

// --- Shutdown ---

@Override

public void shutdown() {

    poolLock.lock();

    try {

        isShutdown = true;

        System.out.println("Pool shutdown initiated. Finishing queued tasks...");

    } finally {

        poolLock.unlock();

    }

}

@Override

public List<Runnable> shutdownNow() {

    poolLock.lock();

    try {

        isShutdown = true;

        workers.forEach(Thread::interrupt); // Interrupt all workers immediately

        List<Runnable> unfinished = new ArrayList<>();

        taskQueue.drainTo(unfinished); // Return unstarted tasks

        System.out.println("Immediate shutdown. Unstarted tasks returned: " + unfinished.size());

        return unfinished;

    } finally {

        poolLock.unlock();

    }

}

@Override

public boolean isShutdown() {

    return isShutdown;

}

@Override

public boolean isTerminated() {

    return isTerminated;

}

@Override

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {

    boolean allDone = terminationLatch.await(timeout, unit);

    if (allDone) isTerminated = true;

    return allDone;

}

// --- Bulk submission (simplified) ---

@Override

public <T> List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks)

        throws InterruptedException {

    List<Future<T>> futures = new ArrayList<>();

    for (Callable<T> task : tasks) {

        futures.add(submit(task));

    }

    // Wait for all to complete

    for (Future<T> f : futures) {

        try { f.get(); } catch (ExecutionException ignored) {}

    }

    return futures;

}

@Override

public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks)

        throws InterruptedException, ExecutionException {

    throw new UnsupportedOperationException("invokeAny not implemented in this example");

}

@Override

public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

    throw new UnsupportedOperationException("invokeAny not implemented in this example");

}

@Override

public <T> List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks,

                                      long timeout, TimeUnit unit) throws InterruptedException {

    throw new UnsupportedOperationException("invokeAll with timeout not implemented in this example");

}

// --- Demo ---

public static void main(String\[\] args) throws InterruptedException {

    CustomThreadPool pool = new CustomThreadPool(3, 20);

    // Submit 10 tasks

    for (int i = 0; i < 10; i++) {

        final int taskId = i;

        pool.submit(() -> {

            System.out.println(Thread.currentThread().getName() + " executing task " + taskId);

            try { Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }

        });

    }

    // Submit a Callable and get a result

    Future<String> result = pool.submit(() -> {

        Thread.sleep(100);

        return "Hello from Callable!";

    });

    try {

        System.out.println("Callable result: " + result.get());

    } catch (ExecutionException e) {

        e.printStackTrace();

    }

    // Graceful shutdown

    pool.shutdown();

    boolean terminated = pool.awaitTermination(5, TimeUnit.SECONDS);

    System.out.println("Pool terminated cleanly: " + terminated);

}

} ```

#Java #Threads #ThreadPool

Multithreading enables a single program or process to execute multiple tasks concurrently. Each task is a thread. Think of threads as lightweight units of execution that share the resources of the process such as memory space.

However, multithreading also introduces complexities like synchronization, communication, and potential race conditions. This is where patterns help.

Producer-Consumer Pattern

This pattern involves two types of threads: producers generating data and consumers processing that data. A blocking queue acts as a buffer between the two.

Thread Pool Pattern

In this pattern, there is a pool of worker threads that can be reused for executing tasks. Using a pool removes the overhead of creating and destroying threads. Great for executing a large number of short-lived tasks.

Futures and Promises Pattern

In this pattern, the promise is an object that holds the eventual results and the future provides a way to access the result. This is great for executing long-running operations concurrently without blocking the main thread.

Monitor Object Pattern

Ensures that only one thread can access or modify a shared resource within an object at a time. This helps prevent race conditions. The pattern is required when you need to protect shared data or resources from concurrent access.

Barrier Pattern

Synchronizes a group of threads. Each thread executes until it reaches a barrier point in the code and blocks until all threads have reached the same barrier. Ideal for parallel tasks that need to reach a specific stage before starting the next stage.

Read-Write Lock Pattern

It allows multiple threads to read from a shared resource but only allows one thread to write to it at a time. Ideal for managing shared resources where reads are more frequent than writes.

#Threads #Multithreading #DesignPatterns