    java.util.concurrent包是Java 5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。与synchronized关键字和Object.notify()等方法相比,这些类和方法的抽象层次都较高。Effective Java中提到,其中比较重要的同步和通信机制有Executor框架、Concurrent Collections和Synchronizers三种。  

    其中Synchronizers包含了五种: Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 JCIP中提到,Exchanger可以看做一种特殊的Barrier。Effective Java 提到用的比较多的主要是Semaphore信号量和CounDownLatch倒计时锁存器。本文主要讲解我认为比较重要的Semaphore信号量、CounDownLatch计数锁存器和CyclibBarrier。每一种都按照它们的概念、jdk实现、所提供的方法和使用(traveler或者jdk, or sample code)来进行介绍。

1 Semaphore

semaphore,信号量,是众多synchronizer中的一个。在操作系统中就存在互斥量和信号量这样的概念。 semaphore跟锁机制存在一定的相似性,semaphore也是一种锁机制,所不同的是,reentrantLock是只允许一个线程获得锁,而信号量持有多个许可(permits),允许多个线程获得许可并执行。从这个意义上看,重入锁是许可只有1的信号量。它们所提供的方法也非常接近。


1.1 实现


1.1.1 构造函数:


1     public Semaphore(int permits) {2         sync = new NonfairSync(permits); 3 }



1     public Semaphore(int permits, boolean fair) {2 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 3 }


1.1.2 其他方法


acquire() :

1     public void acquire() throws InterruptedException {2 sync.acquireSharedInterruptibly(1); 3 }


acquire(int permits)

1     public void acquire(int permits) throws InterruptedException { 2 if (permits < 0) throw new IllegalArgumentException(); 3 sync.acquireSharedInterruptibly(permits); 4 }




tryAcquire()& tryAcquire(int) 

tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)

release()& release(int)

1.2 使用示例:

1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4  5 public class TIJ_semaphore { 6     public static void main(String[] args) { 7         ExecutorService exec = Executors.newCachedThreadPool(); 8         final Semaphore semp = new Semaphore(5); // 5 permits 9 10         for (int index = 0; index < 20; index++) {11             final int NO = index;12             Runnable run = new Runnable() {13                 public void run() {14                     try {
// if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable15 semp.acquire(); 16 System.out.println("Accessing: " + NO);17 Thread.sleep((long) (10000);18 semp.release();19 } catch (InterruptedException e) {20 }21 }22 };23 exec.execute(run);24 }25 exec.shutdown();26 }


1 Accessing: 0 2 Accessing: 2 3 Accessing: 3 4 Accessing: 4 5 Accessing: 1 6 (等待10s) 7 Accessing: 5 8 Accessing: 6 9 Accessing: 1410 Accessing: 811 Accessing: 712 (等待10s)13 Accessing: 1014 Accessing: 915 Accessing: 1116 Accessing: 1517 Accessing: 1218 (等待10s)19 Accessing: 1320 Accessing: 1621 Accessing: 1722 Accessing: 1923 Accessing: 18
2 CountDownLatch

2.1 实现


2.2 方法


1 public void await() throws InterruptedException {2         sync.acquireSharedInterruptibly(1);3     }


boolean await(long,timeUnit):

如果等待超时,则返回false; 如果时间为0或者为负,则立刻返回。

1  public boolean await(long timeout, TimeUnit unit)2         throws InterruptedException {3         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));4     }




1 public void countDown() {2         sync.releaseShared(1);3     }


2.3 使用:



1    public Map
> doDelete(final String reqUri, final HttpHeaders _headers)2 throws RestAPIException3 {4 setMethod(Method.DELETE);5 setUriTemplate(reqUri);6 headers = _headers;7 return processBroadcast();8 }


private Map
> processBroadcast() throws RestAPIException { ... final Map
dServers = RestRequestSender.deployedServers; if (!dServers.isEmpty()) { final CountDownLatch doneSignal = new CountDownLatch(dServers.size()); ... for (final String key : dServers.keySet()) { ... executor.submit(new RRSRunnable(doneSignal, context, results, key, totalRecordsCounter)); } try { if (!doneSignal.await(Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt(), TimeUnit.MINUTES)) // if timeout will retrun false { XLog.warning("MDM api broadcast timed out after " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt() + " minutes. Timeout is set via notes.ini key " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getNotesIniName()); } } catch (final InterruptedException ie) { throw new RestAPIException("Interrupted", ie); } } return results; // if doneSingnal.await has been intrerrupted, will this line still execute? }


class RRSRunnable implements Runnable   {      private final CountDownLatch    doneSignal;      ...      RRSRunnable(final CountDownLatch doneSignal, final SaaSServerContext context,                  final Map
> results, final String serverKey, final MaxRecordCounter recordCounter) { this.doneSignal = doneSignal; ... } @Override public void run() { ... try { processRequest(responses, context.getHostName(), client, context, recordCounter); final long elapsedTime = System.currentTimeMillis() - startTime; XLog.fine("Traveler API request completed. orgid=" + orgId + ";request=" + reqUri + ";pool=" + serverKey + ";time=" + elapsedTime + "ms"); } catch (final RestAPIException rae) { exceptionServerKey = serverKey; exception = rae; } finally { doneSignal.countDown(); } } }


2.4 更加一般的Latch:


2.4.1 nts Latch简介:

在nts生产代码中也实现了一种Latch,Latch允许多个线程间的协作:在这种Latch中,有working thread和latching thread之分:

workingThread在做一些工作,latchingThread希望当这些工作完成的时候,锁存这些工作,然后得到workingThread的工作结果。workingThread和latchingThread共享一个Latch对象,workingThread会调用start方法,通知它正在开始针对特定Object的工作已经开始了。同时,latchingThread将调用latch方法,并传进它希望等待的Object。 当workingThread完成对某一Object(start方法传入的)的工作后,它将调用finish方法,传入该对象,以及工作的结果对象。当finish方法被调用后,调用latch方法的线程被唤醒,返回工作结果给latch方法的调用者。多个线程可以锁存同一个将要完成某些工作的object。一旦任意一个线程调用了finish方法,他们都将被唤醒并返回结果对象。如果调用latch方法时,针对latch对象的工作还没有开始,线程立刻返回,并不会block. 所以start(Object)应该首先被调用。

workingThread调用start(Object)方法,表明它开始工作。 同时,latchingThread调用latch(Object,long)方法,等待workingThread的执行完成。 workingThread执行finish(Object,Object)方法,表示工作完成,此时,latchingThread醒来。start(Object) finish(Object,Object) --> working thread 第二个参数为结果。 ?
latch(Object,long) --> latching thread

2.4.2 nts Latch 实现:


1    public boolean start(final Object obj) 2    { 3       final long timeStart = System.currentTimeMillis(); 4       boolean rv = false; 5       Barrier b = null; 6       synchronized (this) 7       { 8          if (!latched.containsKey(obj)) 9          {10             b = new Barrier("Latch:" + name + "_Obj:" + obj, 1);11             latched.put(obj, b); // latched is a synchronizedHashMap12             rv = true;13          }14       }15       XLog.exiting("name=" + name, "obj=" + obj, "barrier=" + b, "rv=" + rv, ("Elapsed time="16             + (System.currentTimeMillis() - timeStart) + "ms"));17       return rv;18    }



1    public void finish(final Object obj, final Object result) 2    { 3       final long timeStart = System.currentTimeMillis(); 4       final Barrier b; 5       synchronized (this) 6       { 7          b = latched.remove(obj); 8          if (null != b) 9          {10             // there are waiters that need the result11             b.result = result;12             try13             {14                b.enter(0);15             }16             catch (final InterruptedException e)17             {18                // ignored19             }20          }21       }22       XLog.exiting("name=" + name, "obj=" + obj, "result=" + result, "barrier=" + b, ("Elapsed time="23             + (System.currentTimeMillis() - timeStart) + "ms"));24    }


3 CyclicBarrier

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它(一个线程)才执行; 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。


3.1 实现



1     /** The lock for guarding barrier entry */2     private final ReentrantLock lock = new ReentrantLock();3     /** Condition to wait on until tripped */4     private final Condition trip = lock.newCondition();


3.1.1 构造函数

当在等待栅栏的线程个数到达预定义的个数时,barrier 发生trip, 但是因为没有预定义的动作,所以不执行任何动作。

1     public CyclicBarrier(int parties) {2         this(parties, null); 3 }


1    public CyclicBarrier(int parties, Runnable barrierAction) {2         if (parties <= 0) throw new IllegalArgumentException(); 3 this.parties = parties; 4 this.count = parties; 5 this.barrierCommand = barrierAction; 6 }




public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }

 await(Long time, TimeUtil unit):

1     public int await(long timeout, TimeUnit unit)2 throws InterruptedException, 3 BrokenBarrierException, 4 TimeoutException { 5 return dowait(true, unit.toNanos(timeout)); 6 }


1  private int dowait(boolean timed, long nanos) 2         throws InterruptedException, BrokenBarrierException, 3                TimeoutException { 4         final ReentrantLock lock = this.lock; 5         lock.lock(); 6         try {
final Generation g = generation; 7 ... 8 if (Thread.interrupted()) { 9 breakBarrier();10 throw new InterruptedException();11 }12 13 int index = --count;14 if (index == 0) { // tripped15 boolean ranAction = false;16 try {17 final Runnable command = barrierCommand;18 if (command != null)19 command.run();20 ranAction = true;21 nextGeneration();22 return 0;23 } finally {24 if (!ranAction)25 breakBarrier();26 }27 }28 29 // loop until tripped, broken, interrupted, or timed out30 for (;;) {31 try {32 if (!timed)33 /* The lock associated with this Condition is atomically      released and the current thread becomes disabled for thread scheduling      purposes */ trip.await(); 34 else if (nanos > 0L)35 nanos = trip.awaitNanos(nanos);36 } catch (InterruptedException ie) {37 if (g == generation && ! g.broken) {38 breakBarrier();39 throw ie;40 } else {41 // We're about to finish waiting even if we had not42 // been interrupted, so this interrupt is deemed to43 // "belong" to subsequent execution.44 Thread.currentThread().interrupt();45 }46 }47 48 if (g.broken)49 throw new BrokenBarrierException();50 51 if (g != generation)52 return index;53 54 if (timed && nanos <= 0L) {55 breakBarrier();56 throw new TimeoutException();57 }58 }59 } finally {60 lock.unlock();61 }62 }


1     private void breakBarrier() {2         generation.broken = true;3         count = parties;4         trip.signalAll();5     }


3.2 使用:

在jdk和traveler code中没有使用。这符合Effective Java 69中的描述。在实际使用中较少见到。


4 Synchronizere与wait()/wait(long)/wait(long,int)/notify()/notifyAll()的比较:

    在effective java 69中提到,wait()/wait(long)/wait(long,int)/notify()/notifyAll()不易使用且容易出错。一般来讲应该优选更高级的concurrent container 或者 synchronizer。 其中synchronizer比较常用的是Semaphore和CountDownLatch,而CyclicBarrier和Exchanger 则使用的比较少。说从易用性上来讲,wait()/notify()/notifyAll()更象是汇编语言,并发容器和synchronizer更像是高级语言。 但我在nts代码中看到了很多wait/notify,而Semaphore和CountDownLatch则用的很少。

    Lock/Condition()是使用AQS实现的,Lock/Condition() 组合可以用来替代Object.wait()/notify()。 而高级的synchronizer: CountDownLatch& Semaphore也是基于AQS实现的。所以理论上,可以替代wait/notify。 Semaphore和CountDownLatch也都包含了跟wait(long timeout)相对应的方法。

    考虑以下在下面的例子中,是否可以用Semaphore& CountDownLatch来代替wait/notify ?

1  public Connection getConnection(final boolean highpriority) throws SQLException  2    {  3       final long sTime = System.currentTimeMillis();  4       Connection conn = null;   6       boolean createConnection = false;  7       boolean waitConnection = false;  8   9       try 10       { 11          while (true) 12          { 13             createConnection = false; 14             waitConnection = false; 15          17             synchronized (dbConnections) 18             { 19                 20                final List
dbConnectionsToBeFreedTemp = new ArrayList
(); 21 synchronized (dbConnectionsToBeFreed) 22 { 23 if (!dbConnectionsToBeFreed.isEmpty()) 24 { 25 dbConnectionsToBeFreedTemp.addAll(dbConnectionsToBeFreed); 26 dbConnectionsToBeFreed.clear(); 27 if (1 == dbConnectionsToBeFreedTemp.size()) 28 { 29 // only one, so only notify one 30 dbConnectionsToBeFreed.notify(); // Semaphore.release() 31 } 32 else 33 { 34 dbConnectionsToBeFreed.notifyAll(); // CountDownLatch.await() 35 } 36 } 37 if (isThrottDown && connectionCount <= Tier.ONE.value * .20F) 38 { 39 isThrottDown = false; 40 dbConnectionsToBeFreed.notifyAll(); 41 } 42 } 43 44 if (!dbConnectionsToBeFreedTemp.isEmpty()) 45 { 46 for (final Connection connToFree : dbConnectionsToBeFreedTemp) 47 { 48 if (!closeConnectionIfAgedOut(connToFree)) 49 { 50 dbConnections.add(connToFree); 51 } 52 } 53 dbConnectionsToBeFreedTemp.clear(); 54 } 55 56 57 if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean()) 58 { 59 getCurrentTier(); 60 61 if ((!dbConnections.isEmpty() && !isThrottDown) 62 && (highpriority || ((connectionCount - dbConnections.size()) < currentTier.value))) //Prevent non-high priority requests from grabbing freed high priority connections if connection is capped 63 { 64 conn = dbConnections.remove(0); 65 } 66 67 if (null == conn) 68 { 69 // See if we should create a new connection or have to wait 70 // if none are in the stack, then see if we should make another 71 if ((connectionCount < currentTier.value || (highpriority && (connectionCount < maxConnectionCount))) 72 && !isThrottDown) 73 { 74 75 createConnection = true; 76 connectionCount++; 77 } 78 else if (!isScheduled && conn == null && !highpriority && !createConnection 79 && currentTier != Tier.THREE && !isThrottDown) 80 { 81 WallClock.getInstance().addAlarm(ALARM_NAME, 82 Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmStepUpTier); 83 isScheduled = true; 84 waitConnection = true; 85 } 86 else if (conn == null && !createConnection && currentTier == Tier.THREE 87 && connectionCount - dbConnections.size() >= Tier.THREE.value && !isScheduled) 88 { 89 WallClock.getInstance().addAlarm(ALARM_THROTTLE_DOWN, 90 Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmThrottleDown); 91 isScheduled = true; 92 waitConnection = true; 93 } 94 else 95 { 96 // Connections are maxed out, so we have to wait 97 waitConnection = true; 98 } 99 }100 }101 else102 {103 if (!dbConnections.isEmpty()104 && (highpriority || ((connectionCount - dbConnections.size()) <= maxConnectionCount)))105 {106 conn = dbConnections.remove(0);107 }108 109 if (null == conn)110 {111 // See if we should create a new connection or have to wait112 // if none are in the stack, then see if we should make another113 if ((connectionCount < maxConnectionCount) || highpriority)114 {115 116 createConnection = true;117 connectionCount++;118 }119 else120 {121 // Connections are maxed out, so we have to wait122 waitConnection = true;123 }124 }125 }126 }127 128 if (null != conn)129 {130 // we have a Connection, so we are done131 break;132 }133 else if (createConnection)134 {135 if (!isDerby)136 {137 // update user and password from Configuration138 connProps.put("user", Configuration.NTS_DBUSER.getString());139 connProps.put("password", Configuration.NTS_DBPASSWORD.getString());140 }141 142 try143 {144 conn = DriverManager.getConnection(url, connProps); 145 }146 catch (final SQLException sqle)147 {148 connectionCount--; // count must be decremented since a connection was never created149 DatabaseStatus.reportException(sqle);150 throw sqle;151 }152 153 if (conn != null)154 {155 DatabaseStatus.clearException();156 if (peakConnections < connectionCount)157 {158 peakConnections = connectionCount;159 Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_COUNT, peakConnections);160 Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_TIME, new Date().toString());161 }162 allConnections.put(Integer.valueOf(conn.hashCode()), PersistentStore.createDeathTimeStamp());163 164 break;165 }166 else if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean())167 {168 // Unexpected Database exceptions will result in connection == null.169 // Instead of retrying the connection, throw a checked exception.170 throw new DBConnectionsAllUsedException("There are no DB Connections available");171 }172 else173 {174 // don't break which will do the retry175 }176 }177 else if (waitConnection)178 {179 try180 {181 Stats.inc(Stats.DB_THREADS_WAITING_FOR_CONNECTION);182 synchronized (dbConnectionsToBeFreed)183 {184 dbConnectionsToBeFreed.wait(); // Semaphore.acquire() CountDownLatch.countdown()185 }186 }187 finally188 {189 Stats.dec(Stats.DB_THREADS_WAITING_FOR_CONNECTION);190 }191 // don't break which will do the retry192 }193 else194 {195 // should never hit this case196 break;197 }198 }199 }200 catch (final InterruptedException ie)201 {202 // expected203 throw new SQLException("Exception waiting for DB connection.", ie);204 }205 finally206 {207 208 }209 210 return conn;211 }


5 进一步问题:

5.1 CyclicBarrier方法,await(): 为什么要提供这两种方法来包裹wrapper dowait方法?


5.2 Semaphore和CountDownLatch互换


1 final Semaphore sem = new Semaphore(0); 2 for (int i = 0; i < num_threads; ++ i) 3 { 4   Thread t = new Thread() { 5     public void run() 6     { 7       try 8       { 9         doStuff();10       }11       finally12       {13         sem.release();14       }15     }16   };17   t.start();18 }19 20 sem.acquire(num_threads);




1 final CountDownLatch latch = new CountDownLatch(num_threads); 2 for (int i = 0; i < num_threads; ++ i) 3 { 4   Thread t = new Thread() { 5     public void run() 6     { 7       try 8       { 9         doStuff();10       }11       finally12       {13         latch.countDown();14       }15     }16   };17   t.start();18 }19 20 latch.await();



