This is a summary of the section "5.5" from the wonderful book Java Concurrency in Practice. Do read that book for the full details.
Latches
Latches are like gates. You can imagine that this gate will open only after certain conditions are met, till then it will be closed. Latches could be ideal for cases like below
- Need some initialization to be completed before other thread can proceed
- Ensuring that a service does not start until all the other services on which it depends on have started
CountDownLatch is one implementation of Latch. Let see some code on how we can use this. Let say we want to do some initialization and want our worker thread to only start processing after the initialization is completed
void foo() {
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(new Init(latch));
t.start(); // start the Initialization thread
int numberOfWorkers = 10;
for(i = 0 ; i < numberOfWorkers; i++){
Thread t = new Thread(new Worker(latch));
t.start(); // start the worker thread
}
}
class Init implements runnable {
private final CountdownLatch latch;
worker(CountdownLatch latch){
this.latch = latch;
}
public void run(){
doInitialization();
latch.countDown(); // open the gate
}
}
class Worker implements runnable {
private final CountdownLatch latch;
worker(CountdownLatch latch){
this.latch = latch;
}
public void run(){
latch.await(); // wait for the gate to open
doProcessing();
}
}
Future Task
FutureTask is one of the implementation of the Future interface. The idea of Future aka Promise is simple. You submit a job for which you do not need the result immediately but somewhere in the Future ( get it now :) ). So once you have submitted your job, you can go around in doing other important stuff and when the time comes around to use the value of the job submitted, you call the future object that you have created. Now, in this case there are two possibility, either
- The job is completed and you get the result immediately.
- The job is yet to be completed/not started yet. In this case, you have two options
- If the job is in the middle of processing then you can wait for it (blocking) or cancel the job
- If the job is not yet started, then you can ask it to start and wait for the result (blocking) or cancel the job
void process() throws InterruptedException, ExecutionException {
FutureTask<integer> f = new FutureTask<integer>(new ComplicatedProcessing(1, 2));
Thread t = new Thread(f);
t.start();
doOtherProcessing();
if (f.isDone()) { // job is done
System.out.println(f.get());
}
else {
try {
System.out.println(f.get(1, TimeUnit.MINUTES)); // we wait for 1 minute and see if it get done or not
}
catch (TimeoutException e) {
f.cancel(true); // we cancel the job, even if it is in between
// processing
}
}
}
class ComplicatedProcessing implements Callable<integer> {
private int a;
private int b;
public ComplicatedProcessing(int val1, int val2) {
this.a = val1;
this.b = val2;
}
public Integer call() {
return a * b;
}
}
Semaphores
A semaphore allows us to give permits (permission) to access a certain resource. You can think of semaphores as licensing. So assume I have some resource and at a certain given time I can only allow four people to access it. In this case I will give out four license. Once a thread have a license then that thread can access the resource otherwise it need to wait. Once a thread is done with the resource, it will return the license back to the semaphore.
Semaphores are useful for implementing resource pools such as database connection. Let say we want to allow at any given time maximum four connection to the database. We can easily do that by using semaphore. Let see how we can do that in some simple code. In here we will use java class semaphore
That's about it. I hope that you have learned something from this blog post.
Semaphores are useful for implementing resource pools such as database connection. Let say we want to allow at any given time maximum four connection to the database. We can easily do that by using semaphore. Let see how we can do that in some simple code. In here we will use java class semaphore
class DatabasePool {
private final Semaphore sem;
public DatabasePool(int maximumConnection){
this.sem = new Semaphore(maximumConnection);
//do other necessary stuff to setup database connection and create the pool
}
public Connection getConnection() throws InterruptedException {
sem.acquire(); // try to get the license. If maximum is reached then it will block. There are other options
// tryAcquire does not block. It is possible to set the timeOut option in tryAcquire
return connectionFromPool();
}
//once the connection is no longer needed then it can be returned to the pool
public void done(Connection connection){
sem.release(); // release the license back to semaphore
returnConnectionToPool(connection);
}
}
Barriers
Barrier are similar to latches in that they block a group of threads until some event occurs. A good analogy from the book is "Everyone meet at McDonald's at 6:00, once you get there, stay there until everyone shows up, and then we'll figure out what we're doing next.". Whereas a latch is like "Everyone go to McDonald and you go in have your meal if McDonald is open". As you can see, in case of latch, as long as the gate is open each thread does its own processing at its own time, there is no waiting for other thread. Whereas in Barrier we wait for each thread to complete and only when all the thread is completed then the barrier opens.
So what scenario is Barrier good for? It is good for cases where there is some dependency among thread. Assuming I have some task that I need to perform and that task I break it down into smaller task run by different thread. Since different part of the task might take different time, some will be slower while other faster. Barrier allows us to make each thread wait for the other thread before executing the next step.
Let see some code for using barrier. Java CyclicBarrier implements this concept. Let say I want to add all numbers in a 2D array. Each worker thread will process one row. The final thread will sum up the total for each row.
private List<integer> list = Collections.synchronizedList(new ArrayList<integer>());
public void process() {
int[][] numbers =
new int[][] { { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 } };
CyclicBarrier barrier = new CyclicBarrier(numbers.length, new Runnable() {
//executed once all the threads are completed
@Override
public void run() {
int total = 0;
for (int n : list) {
total += n;
}
System.out.println("The total is :" + total);
}
});
int i = 0;
for (i = 0; i < numbers.length; i++) {
Thread t = new Thread(new Add(barrier, numbers[i]));
t.start();
}
}
private class Add implements Runnable {
private int[] array;
private final CyclicBarrier barrier;
public Add(CyclicBarrier barrier, int[] array) {
this.barrier = barrier;
this.array = array;
}
public void run() {
try {
int sum = 0;
for (int i = 0; i < array.length; i++) {
sum += array[i];
}
list.add(sum);
this.barrier.await(); // wait for the other thread
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
That's about it. I hope that you have learned something from this blog post.
No comments:
Post a Comment