Skip to content

"Java concurrency and parallelism: barrier synchronizers (CountDownLatch, CyclicBarrier, Phaser)" article and source code.

Notifications You must be signed in to change notification settings

aliakh/demo-java-countdownlatch-cyclicbarrier-phaser

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Java concurrency and parallelism: barrier synchronizers (CountDownLatch, CyclicBarrier, Phaser)

Introduction

Barrier synchronizers (barriers) are a kind of synchronizer that ensures that any threads must stop at a certain point and cannot proceed further until all other threads reach this point.

By purpose, barriers can be grouped into the following categories:

  • entry barriers, that prevents threads from starting processing
  • exit barriers, that waiting for all threads to finish processing

Barriers also can be grouped by the number of iterations (one-time or cyclic) and by the number of parties/threads (fixed or variable).

In Java 7+ there are 3 predefined barrier classes: CountDownLatch, CyclicBarrier, Phaser.

The CountDownLatch class

The CountDownLatch class is a one-time barrier that allows threads to wait until the given count of operations is performed in other threads.

CountDownLatch

A latch is initialized with a given count. The await methods (waiting and timed waiting) wait until the current count reaches 0 due to calls of the countDown() method. After that, all waiting threads are released, and any subsequent calls of the await methods return immediately.

CountDownLatch example

Threads registration

The CountDownLatch(int count) constructor creates a latch with the given count. The current count cannot be reset without recreating a new latch object.

Threads waiting

The void await() method causes the current thread to wait until one of the events occurs:

  • the latch has counted down to 0 due to calls of the countDown() method
  • the thread is interrupted

If the current count is 0 then this method returns immediately.

The boolean await(long timeout, TimeUnit unit) method causes the current thread to wait until one of the events occurs:

  • the given timeout elapses
  • the latch has counted down to 0 due to calls of the countDown() method
  • the thread is interrupted

The method returns true if the current count reached 0 and false if the timeout elapsed before the current count reached 0. If the current count is 0 then this method returns true immediately.

Threads arrival

The countDown() method decrements the current count, releasing all waiting threads if the count reaches 0. If the current count equals 0 then nothing happens.

Latch monitoring

The long getCount() method returns the current count of the latch.

Example

In the example are used 2 latches: first as a one-time entry barrier, second as a one-time exit barrier.

private static final int PARTIES = 3;

public static void main(String[] args) throws InterruptedException {
   CountDownLatch entryBarrier = new CountDownLatch(1);
   CountDownLatch exitBarrier = new CountDownLatch(PARTIES);

   for (int p = 0; p < PARTIES; p++) {
       int delay = p + 1;
       Runnable task = new Worker(delay, entryBarrier, exitBarrier);
       new Thread(task).start();
   }

   logger.info("all threads waiting to start");
   sleep(1);

   entryBarrier.countDown();
   logger.info("all threads started");

   exitBarrier.await();
   logger.info("all threads finished");
}

private static class Worker implements Runnable {

   private final int delay;
   private final CountDownLatch entryBarrier;
   private final CountDownLatch exitBarrier;

   Worker(int delay, CountDownLatch entryBarrier, CountDownLatch exitBarrier) {
       this.delay = delay;
       this.entryBarrier = entryBarrier;
       this.exitBarrier = exitBarrier;
   }

   @Override
   public void run() {
       try {
           entryBarrier.await();
           work();
           exitBarrier.countDown();
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }
   }

   private void work() {
       logger.info("work {} started", delay);
       sleep(delay);
       logger.info("work {} finished", delay);
   }
}

The CyclicBarrier class

The CyclicBarrier class is a reusable synchronization barrier that allows threads to wait for each other at a certain point.

CyclicBarrier

A barrier is initialized with a given number of threads. The await methods (waiting and timed waiting) wait until all threads reach the barrier. Then all threads trip the barrier, and the barrier is automatically reset for the next cycle.

CyclicBarrier example

Threads registration

The CyclicBarrier(int parties) constructor creates a new barrier that will trip when the given number of threads are waiting upon it.

The CyclicBarrier(int parties, Runnable barrierAction) constructor creates a new barrier that will trip when the given number of threads are waiting upon it. When the barrier is tripped, the given barrier action will be performed by the last thread entering the barrier.

Threads arrival and waiting

The int await() method causes the current thread to wait until one of the events occurs:

  • the last thread arrives at the barrier
  • the barrier is broken (by the reasons described below)

If the barrier is broken, then depending on the reason InterruptedException, BrokenBarrierException are thrown.

The int await(long timeout, TimeUnit unit) method causes the current thread to wait until one of the events occurs:

  • the given timeout elapses
  • the last thread arrives at the barrier
  • the barrier is broken (by the reasons described below)

If the specified timeout elapses, then a TimeoutException is thrown. If the barrier is broken, then depending on the reason InterruptedException, BrokenBarrierException are thrown.

The await method returns the arrival index of the current thread.

Barrier reset

The void reset() method resets the barrier to its initial state. If any threads are waiting at the barrier on the await methods, the methods will throw a BrokenBarrierException.

Barrier monitoring

The int getParties() method returns the number of parties required to trip the barrier.

The int getNumberWaiting() method returns the number of parties currently waiting at the barrier.

The boolean isBroken() method returns true if this barrier has been broken by one of the reasons:

  • interruption
  • timeout elapsing
  • calling the reset() method
  • the barrier action failure due to an exception

Example

In the example are used 2 barriers: first as a cyclic entry barrier, second as a cyclic exit barrier.

private static final int PARTIES = 3;
private static final int ITERATIONS = 3;

public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
   CyclicBarrier entryBarrier = new CyclicBarrier(PARTIES + 1, () -> logger.info("iteration started"));
   CyclicBarrier exitBarrier = new CyclicBarrier(PARTIES + 1, () -> logger.info("iteration finished"));

   for (int i = 0; i < ITERATIONS; i++) {
       for (int p = 0; p < PARTIES; p++) {
           int delay = p + 1;
           Runnable task = new Worker(delay, entryBarrier, exitBarrier);
           new Thread(task).start();
       }

       logger.info("all threads waiting to start: iteration {}", i);
       sleep(1);

       entryBarrier.await();
       logger.info("all threads started: iteration {}", i);

       exitBarrier.await();
       logger.info("all threads finished: iteration {}", i);
   }
}

private static class Worker implements Runnable {

   private final int delay;
   private final CyclicBarrier entryBarrier;
   private final CyclicBarrier exitBarrier;

   Worker(int delay, CyclicBarrier entryBarrier, CyclicBarrier exitBarrier) {
       this.delay = delay;
       this.entryBarrier = entryBarrier;
       this.exitBarrier = exitBarrier;
   }

   @Override
   public void run() {
       try {
           entryBarrier.await();
           work();
           exitBarrier.await();
       } catch (InterruptedException | BrokenBarrierException e) {
           throw new RuntimeException(e);
       }
   }

   private void work() {
       logger.info("work {} started", delay);
       sleep(delay);
       logger.info("work {} finished", delay);
   }
}

The Phaser class

The Phaser class is a reusable barrier that allows a variable number of parties/threads. Because of this, it’s more flexible, however much more complicated.

Phaser

To support a variable number of parties, a phaser contains the number of registered, arrived, and unarrived parties. The number of registered parties always equals the sum of the numbers of arrived and unarrived parties (registered==arrived+unarrived). To support cyclic iterations, a phaser contains a number of the current phase.

Phaser example

Parties registration

The Phaser() constructor creates a phaser with initial phase number 0 and no registered parties (phase=0, registered=0). The Phaser(int parties) constructor creates a phaser with initial phase number 0 and the given number of registered parties (phase=0, registered=parties).

The int register() method adds an unarrived party to the phaser (registered++). The int bulkRegister(int parties) method adds the given number of unarrived parties to the phaser (registered+=parties). These methods return the arrival phase number to which this registration is applied.

Parties synchronization

The int arrive() method marks a party arriving at the phaser, without waiting for other parties to arrive (arrived++, unarrived--).

The int awaitAdvance(int phase) method awaits the phase of the phaser to advance from the given phase number. The method returns immediately if the current phase number is not equal to the given phase number.

The int arriveAndAwaitAdvance() marks a party arriving at the phaser and awaits other parties to arrive (arrived++, unarrived--).

The int arriveAndDeregister() method marks a party arriving at the phaser and deregisters from it without waiting for other parties to arrive (registered--, arrived++, unarrived--).

The arrive, arriveAndAwaitAdvance, arriveAndDeregister methods return the arrival phase number. The awaitAdvance method returns the next arrival phase number.

Phases iterations

The current phase is finished when all registered parties arrive (registered==arrived, unarrived==0). To decide whether to start the next phase or to terminate the phaser is used the protected boolean onAdvance(int phase, int registeredParties) method.

If the onAdvance method returns true, then the phaser is terminated (phase<0, terminated=true). If the onAdvance method returns false, then the phaser starts a new phase (phase++, arrived=0, unarrived=registered). The onAdvance method can also be used to perform a barrier action.

By default the onAdvance method returns true when the number of registered parties has become 0 as the result of calls the arriveAndDeregister method:

protected boolean onAdvance(int phase, int registeredParties) {
   return registeredParties == 0;
}

The overridden onAdvance method for one-time process:

@Override
protected boolean onAdvance(int phase, int registeredParties) {
   return true;
}

The overridden onAdvance method for infinite iterations:

@Override
protected boolean onAdvance(int phase, int registeredParties) {
   return false;
}

The overridden onAdvance method for maxPhase iterations:

@Override
protected boolean onAdvance(int phase, int registeredParties) {
   return (phase >= maxPhase - 1) || (registeredParties == 0);
}

Phaser termination

Phaser is terminated automatically when the onAdvance method returns true. It’s possible to terminate the phaser manually by calling the forceTermination() method.

The arrive, awaitAdvance, arriveAndAwaitAdvance, arriveAndDeregister methods return negative values if the phaser has already terminated.

Phaser monitoring

The methods to monitor parties numbers:

  • int getRegisteredParties() - returns the number of parties registered at the phaser
  • int getArrivedParties() - returns the number of registered parties that have arrived at the current phase of the phaser
  • int getUnarrivedParties() - returns the number of registered parties that have not yet arrived at the current phase of the phaser

The int getPhase() method returns the current phase number.

The boolean isTerminated() method returns true if this phaser has been terminated.

Examples

In the example are used the basic phaser methods.

public static void main(String[] args) {
   Phaser phaser = new Phaser(3) {
       @Override
       protected boolean onAdvance(int phase, int registeredParties) {
           log("inside onAdvance()", this);
           return true;
       }
   };
   log("after constructor", phaser);

   phaser.register();
   log("after register()", phaser);

   phaser.arrive();
   log("after arrive()", phaser);

   Thread thread = new Thread() {
       @Override
       public void run() {
           log("before arriveAndAwaitAdvance()", phaser);
           phaser.arriveAndAwaitAdvance();
           log("after arriveAndAwaitAdvance()", phaser);
       }
   };
   thread.start();

   phaser.arrive();
   log("after arrive()", phaser);

   phaser.arriveAndDeregister();
   log("after arriveAndDeregister()", phaser);
}

In the example, a phaser is used to implement a one-time entry barrier.

private static final int PARTIES = 3;

public static void main(String[] args) {
   Phaser phaser = new Phaser(1);
   log("after constructor", phaser);

   for (int p = 0; p < PARTIES; p++) {
       int delay = p + 1;
       Runnable task = new Worker(delay, phaser);
       new Thread(task).start();
   }

   log("all threads waiting to start", phaser);
   sleep(1);

   log("before all threads started", phaser);
   phaser.arriveAndDeregister();
   log("after all threads started", phaser);

   sleep(10);
   log("all threads finished", phaser);
}

private static class Worker implements Runnable {

   private final int delay;
   private final Phaser phaser;

   Worker(int delay, Phaser phaser) {
       phaser.register();

       this.delay = delay;
       this.phaser = phaser;
   }

   @Override
   public void run() {
       phaser.arriveAndAwaitAdvance();
       work();
   }

   private void work() {
       logger.info("work {} started", delay);
       sleep(delay);
       logger.info("work {} finished", delay);
   }
}

In the example, a phaser is used to implement one-time entry and exit barriers.

private static final int PARTIES = 3;

public static void main(String[] args) {
   Phaser phaser = new Phaser(1);
   log("after constructor", phaser);

   for (int p = 0; p < PARTIES; p++) {
       int delay = p + 1;
       Runnable task = new Worker(delay, phaser);
       new Thread(task).start();
   }

   log("all threads waiting to start", phaser);
   sleep(1);

   log("before all threads started", phaser);
   phaser.arriveAndDeregister();
   log("after all threads started", phaser);

   phaser.register();
   while (!phaser.isTerminated()) {
       phaser.arriveAndAwaitAdvance();
       phaser.arriveAndDeregister();
   }

   log("all threads finished", phaser);
}

private static class Worker implements Runnable {

   private final int delay;
   private final Phaser phaser;

   Worker(int delay, Phaser phaser) {
       phaser.register();

       this.delay = delay;
       this.phaser = phaser;
   }

   @Override
   public void run() {
       phaser.arriveAndAwaitAdvance();
       work();
       phaser.arriveAndDeregister();
   }

   private void work() {
       logger.info("work {} started", delay);
       sleep(delay);
       logger.info("work {} finished", delay);
   }
}

In the example, a phaser is used to implement cyclic entry and exit barriers.

private static final int PARTIES = 3;
private static final int ITERATIONS = 3;

public static void main(String[] args) {
   Phaser phaser = new Phaser(1) {
       final private int maxPhase = ITERATIONS;

       @Override
       protected boolean onAdvance(int phase, int registeredParties) {
           return (phase >= maxPhase - 1) || (registeredParties == 0);
       }
   };
   log("after constructor", phaser);

   for (int p = 0; p < PARTIES; p++) {
       int delay = p + 1;
       Runnable task = new Worker(delay, phaser);
       new Thread(task).start();
   }

   log("all threads waiting to start", phaser);
   sleep(1);

   log("before all threads started", phaser);
   phaser.arriveAndDeregister();
   log("after all threads started", phaser);

   phaser.register();
   while (!phaser.isTerminated()) {
       phaser.arriveAndAwaitAdvance();
   }

   log("all threads finished", phaser);
}

private static class Worker implements Runnable {

   private final int delay;
   private final Phaser phaser;

   Worker(int delay, Phaser phaser) {
       phaser.register();

       this.delay = delay;
       this.phaser = phaser;
   }

   @Override
   public void run() {
       do {
           work();
           phaser.arriveAndAwaitAdvance();
       } while (!phaser.isTerminated());
   }

   void work() {
       logger.info("work {} started", delay);
       sleep(delay);
       logger.info("work {} finished", delay);
   }
}

Conclusion

The CountDownLatch class is suitable for one-time iteration with a fixed number of parties.

The CyclicBarrier class is suitable for one-time and cyclic iterations with a fixed number of parties.

The Phaser class is suitable for one-time and cyclic iterations with a variable number of parties. It also can be used with a fixed number of parties, however, it is an excess.

Code examples are available in the GitHub repository.

About

"Java concurrency and parallelism: barrier synchronizers (CountDownLatch, CyclicBarrier, Phaser)" article and source code.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages