简介
java.util.concurrent包是Java 5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。与synchronized关键字和Object.notify()等方法相比,这些类和方法的抽象层次都较高。Effective Java中提到,其中比较重要的同步和通信机制有Executor框架、Concurrent Collections和Synchronizers三种。
其中Synchronizers包含了五种: Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 JCIP中提到,Exchanger可以看做一种特殊的Barrier。Effective Java 提到用的比较多的主要是Semaphore信号量和CounDownLatch倒计时锁存器。本文主要讲解我认为比较重要的Semaphore信号量、CounDownLatch计数锁存器和CyclibBarrier。每一种都按照它们的概念、jdk实现、所提供的方法和使用(traveler或者jdk, or sample code)来进行介绍。
1 Semaphoresemaphore,信号量,是众多synchronizer中的一个。在操作系统中就存在互斥量和信号量这样的概念。 semaphore跟锁机制存在一定的相似性,semaphore也是一种锁机制,所不同的是,reentrantLock是只允许一个线程获得锁,而信号量持有多个许可(permits),允许多个线程获得许可并执行。从这个意义上看,重入锁是许可只有1的信号量。它们所提供的方法也非常接近。
1.1 实现
跟ReentrantLock一样,Semaphore也是以AQS为基础来实现的。
1.1.1 构造函数:
非公平版本:
1 public Semaphore(int permits) {2 sync = new NonfairSync(permits); 3 }
可以选择是否公平的版本:
1 public Semaphore(int permits, boolean fair) {2 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 3 }
1.1.2 其他方法
跟ReentrantLock不同的是,每种acquire方法都分为有参数的和不带参数的两个版本:
acquire() :1 public void acquire() throws InterruptedException {2 sync.acquireSharedInterruptibly(1); 3 }
acquire(int permits)
1 public void acquire(int permits) throws InterruptedException { 2 if (permits < 0) throw new IllegalArgumentException(); 3 sync.acquireSharedInterruptibly(permits); 4 }
与此类似的还有:
acquireUninterruptibly()&acquireUninterruptibly(int)
tryAcquire()& tryAcquire(int)
tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)
release()& release(int)
1.2 使用示例:
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4 5 public class TIJ_semaphore { 6 public static void main(String[] args) { 7 ExecutorService exec = Executors.newCachedThreadPool(); 8 final Semaphore semp = new Semaphore(5); // 5 permits 9 10 for (int index = 0; index < 20; index++) {11 final int NO = index;12 Runnable run = new Runnable() {13 public void run() {14 try { // if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable15 semp.acquire(); 16 System.out.println("Accessing: " + NO);17 Thread.sleep((long) (10000);18 semp.release();19 } catch (InterruptedException e) {20 }21 }22 };23 exec.execute(run);24 }25 exec.shutdown();26 }
程序输出结果为:
1 Accessing: 0 2 Accessing: 2 3 Accessing: 3 4 Accessing: 4 5 Accessing: 1 6 (等待10s) 7 Accessing: 5 8 Accessing: 6 9 Accessing: 1410 Accessing: 811 Accessing: 712 (等待10s)13 Accessing: 1014 Accessing: 915 Accessing: 1116 Accessing: 1517 Accessing: 1218 (等待10s)19 Accessing: 1320 Accessing: 1621 Accessing: 1722 Accessing: 1923 Accessing: 18
2 CountDownLatch
2.1 实现
内部使用AQS实现
2.2 方法await()等待,无超时,可以被中断
1 public void await() throws InterruptedException {2 sync.acquireSharedInterruptibly(1);3 }
boolean await(long,timeUnit):
如果等待超时,则返回false; 如果时间为0或者为负,则立刻返回。
1 public boolean await(long timeout, TimeUnit unit)2 throws InterruptedException {3 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));4 }
countDown():
把Latch的计数减1,如果计数到达0,则释放所有正在等待的线程。
1 public void countDown() {2 sync.releaseShared(1);3 }
2.3 使用:
绝大多数synchronizer在jdk中没有使用,原因很简单:这些synchronizer是抽象层次较高的,所以一般只有应用程序才会直接使用。
而在nts生产环境中,只有一处admin.rest.api.RestRequestSender使用了CountDownLatch:
1 public Map> doDelete(final String reqUri, final HttpHeaders _headers)2 throws RestAPIException3 {4 setMethod(Method.DELETE);5 setUriTemplate(reqUri);6 headers = _headers;7 return processBroadcast();8 }
doDelete调用了processBraodcast:
private Map> processBroadcast() throws RestAPIException { ... final Map dServers = RestRequestSender.deployedServers; if (!dServers.isEmpty()) { final CountDownLatch doneSignal = new CountDownLatch(dServers.size()); ... for (final String key : dServers.keySet()) { ... executor.submit(new RRSRunnable(doneSignal, context, results, key, totalRecordsCounter)); } try { if (!doneSignal.await(Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt(), TimeUnit.MINUTES)) // if timeout will retrun false { XLog.warning("MDM api broadcast timed out after " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt() + " minutes. Timeout is set via notes.ini key " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getNotesIniName()); } } catch (final InterruptedException ie) { throw new RestAPIException("Interrupted", ie); } } return results; // if doneSingnal.await has been intrerrupted, will this line still execute? }
RRSRunnable代码如下:
class RRSRunnable implements Runnable { private final CountDownLatch doneSignal; ... RRSRunnable(final CountDownLatch doneSignal, final SaaSServerContext context, final Map> results, final String serverKey, final MaxRecordCounter recordCounter) { this.doneSignal = doneSignal; ... } @Override public void run() { ... try { processRequest(responses, context.getHostName(), client, context, recordCounter); final long elapsedTime = System.currentTimeMillis() - startTime; XLog.fine("Traveler API request completed. orgid=" + orgId + ";request=" + reqUri + ";pool=" + serverKey + ";time=" + elapsedTime + "ms"); } catch (final RestAPIException rae) { exceptionServerKey = serverKey; exception = rae; } finally { doneSignal.countDown(); } } }
2.4 更加一般的Latch:
CountDownLatch是一种特殊的Latch,jcip第八章用countdownLatch实现了一种valueLatch。
2.4.1 nts Latch简介:
在nts生产代码中也实现了一种Latch,Latch允许多个线程间的协作:在这种Latch中,有working thread和latching thread之分:
workingThread在做一些工作,latchingThread希望当这些工作完成的时候,锁存这些工作,然后得到workingThread的工作结果。workingThread和latchingThread共享一个Latch对象,workingThread会调用start方法,通知它正在开始针对特定Object的工作已经开始了。同时,latchingThread将调用latch方法,并传进它希望等待的Object。 当workingThread完成对某一Object(start方法传入的)的工作后,它将调用finish方法,传入该对象,以及工作的结果对象。当finish方法被调用后,调用latch方法的线程被唤醒,返回工作结果给latch方法的调用者。多个线程可以锁存同一个将要完成某些工作的object。一旦任意一个线程调用了finish方法,他们都将被唤醒并返回结果对象。如果调用latch方法时,针对latch对象的工作还没有开始,线程立刻返回,并不会block. 所以start(Object)应该首先被调用。
workingThread调用start(Object)方法,表明它开始工作。 同时,latchingThread调用latch(Object,long)方法,等待workingThread的执行完成。 workingThread执行finish(Object,Object)方法,表示工作完成,此时,latchingThread醒来。start(Object) finish(Object,Object) --> working thread 第二个参数为结果。 ?latch(Object,long) --> latching thread2.4.2 nts Latch 实现:
start:
1 public boolean start(final Object obj) 2 { 3 final long timeStart = System.currentTimeMillis(); 4 boolean rv = false; 5 Barrier b = null; 6 synchronized (this) 7 { 8 if (!latched.containsKey(obj)) 9 {10 b = new Barrier("Latch:" + name + "_Obj:" + obj, 1);11 latched.put(obj, b); // latched is a synchronizedHashMap12 rv = true;13 }14 }15 XLog.exiting("name=" + name, "obj=" + obj, "barrier=" + b, "rv=" + rv, ("Elapsed time="16 + (System.currentTimeMillis() - timeStart) + "ms"));17 return rv;18 }
finish:
1 public void finish(final Object obj, final Object result) 2 { 3 final long timeStart = System.currentTimeMillis(); 4 final Barrier b; 5 synchronized (this) 6 { 7 b = latched.remove(obj); 8 if (null != b) 9 {10 // there are waiters that need the result11 b.result = result;12 try13 {14 b.enter(0);15 }16 catch (final InterruptedException e)17 {18 // ignored19 }20 }21 }22 XLog.exiting("name=" + name, "obj=" + obj, "result=" + result, "barrier=" + b, ("Elapsed time="23 + (System.currentTimeMillis() - timeStart) + "ms"));24 }
3 CyclicBarrier
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它(一个线程)才执行; 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
3.1 实现:
使用Lock和Condition实现。不同于AQS。(Condition是基于AQS实现的)
CyclicBarrier包含下面的域:
1 /** The lock for guarding barrier entry */2 private final ReentrantLock lock = new ReentrantLock();3 /** Condition to wait on until tripped */4 private final Condition trip = lock.newCondition();
3.1.1 构造函数
当在等待栅栏的线程个数到达预定义的个数时,barrier 发生trip, 但是因为没有预定义的动作,所以不执行任何动作。
1 public CyclicBarrier(int parties) {2 this(parties, null); 3 }
当barrier发生trip时,会由最后一个进入该barrier的线程执行特定的动作:
1 public CyclicBarrier(int parties, Runnable barrierAction) {2 if (parties <= 0) throw new IllegalArgumentException(); 3 this.parties = parties; 4 this.count = parties; 5 this.barrierCommand = barrierAction; 6 }
CyclicBarrier方法:
await():
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
await(Long time, TimeUtil unit):
1 public int await(long timeout, TimeUnit unit)2 throws InterruptedException, 3 BrokenBarrierException, 4 TimeoutException { 5 return dowait(true, unit.toNanos(timeout)); 6 }
上述两种方法都调用了dowait方法:
1 private int dowait(boolean timed, long nanos) 2 throws InterruptedException, BrokenBarrierException, 3 TimeoutException { 4 final ReentrantLock lock = this.lock; 5 lock.lock(); 6 try { final Generation g = generation; 7 ... 8 if (Thread.interrupted()) { 9 breakBarrier();10 throw new InterruptedException();11 }12 13 int index = --count;14 if (index == 0) { // tripped15 boolean ranAction = false;16 try {17 final Runnable command = barrierCommand;18 if (command != null)19 command.run();20 ranAction = true;21 nextGeneration();22 return 0;23 } finally {24 if (!ranAction)25 breakBarrier();26 }27 }28 29 // loop until tripped, broken, interrupted, or timed out30 for (;;) {31 try {32 if (!timed)33 /* The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes */ trip.await(); 34 else if (nanos > 0L)35 nanos = trip.awaitNanos(nanos);36 } catch (InterruptedException ie) {37 if (g == generation && ! g.broken) {38 breakBarrier();39 throw ie;40 } else {41 // We're about to finish waiting even if we had not42 // been interrupted, so this interrupt is deemed to43 // "belong" to subsequent execution.44 Thread.currentThread().interrupt();45 }46 }47 48 if (g.broken)49 throw new BrokenBarrierException();50 51 if (g != generation)52 return index;53 54 if (timed && nanos <= 0L) {55 breakBarrier();56 throw new TimeoutException();57 }58 }59 } finally {60 lock.unlock();61 }62 }
breakBarrier:
1 private void breakBarrier() {2 generation.broken = true;3 count = parties;4 trip.signalAll();5 }
3.2 使用:
在jdk和traveler code中没有使用。这符合Effective Java 69中的描述。在实际使用中较少见到。
4 Synchronizere与wait()/wait(long)/wait(long,int)/notify()/notifyAll()的比较:
在effective java 69中提到,wait()/wait(long)/wait(long,int)/notify()/notifyAll()不易使用且容易出错。一般来讲应该优选更高级的concurrent container 或者 synchronizer。 其中synchronizer比较常用的是Semaphore和CountDownLatch,而CyclicBarrier和Exchanger 则使用的比较少。说从易用性上来讲,wait()/notify()/notifyAll()更象是汇编语言,并发容器和synchronizer更像是高级语言。 但我在nts代码中看到了很多wait/notify,而Semaphore和CountDownLatch则用的很少。
Lock/Condition()是使用AQS实现的,Lock/Condition() 组合可以用来替代Object.wait()/notify()。 而高级的synchronizer: CountDownLatch& Semaphore也是基于AQS实现的。所以理论上,可以替代wait/notify。 Semaphore和CountDownLatch也都包含了跟wait(long timeout)相对应的方法。
考虑以下在下面的例子中,是否可以用Semaphore& CountDownLatch来代替wait/notify ?
1 public Connection getConnection(final boolean highpriority) throws SQLException 2 { 3 final long sTime = System.currentTimeMillis(); 4 Connection conn = null; 6 boolean createConnection = false; 7 boolean waitConnection = false; 8 9 try 10 { 11 while (true) 12 { 13 createConnection = false; 14 waitConnection = false; 15 17 synchronized (dbConnections) 18 { 19 20 final ListdbConnectionsToBeFreedTemp = new ArrayList (); 21 synchronized (dbConnectionsToBeFreed) 22 { 23 if (!dbConnectionsToBeFreed.isEmpty()) 24 { 25 dbConnectionsToBeFreedTemp.addAll(dbConnectionsToBeFreed); 26 dbConnectionsToBeFreed.clear(); 27 if (1 == dbConnectionsToBeFreedTemp.size()) 28 { 29 // only one, so only notify one 30 dbConnectionsToBeFreed.notify(); // Semaphore.release() 31 } 32 else 33 { 34 dbConnectionsToBeFreed.notifyAll(); // CountDownLatch.await() 35 } 36 } 37 if (isThrottDown && connectionCount <= Tier.ONE.value * .20F) 38 { 39 isThrottDown = false; 40 dbConnectionsToBeFreed.notifyAll(); 41 } 42 } 43 44 if (!dbConnectionsToBeFreedTemp.isEmpty()) 45 { 46 for (final Connection connToFree : dbConnectionsToBeFreedTemp) 47 { 48 if (!closeConnectionIfAgedOut(connToFree)) 49 { 50 dbConnections.add(connToFree); 51 } 52 } 53 dbConnectionsToBeFreedTemp.clear(); 54 } 55 56 57 if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean()) 58 { 59 getCurrentTier(); 60 61 if ((!dbConnections.isEmpty() && !isThrottDown) 62 && (highpriority || ((connectionCount - dbConnections.size()) < currentTier.value))) //Prevent non-high priority requests from grabbing freed high priority connections if connection is capped 63 { 64 conn = dbConnections.remove(0); 65 } 66 67 if (null == conn) 68 { 69 // See if we should create a new connection or have to wait 70 // if none are in the stack, then see if we should make another 71 if ((connectionCount < currentTier.value || (highpriority && (connectionCount < maxConnectionCount))) 72 && !isThrottDown) 73 { 74 75 createConnection = true; 76 connectionCount++; 77 } 78 else if (!isScheduled && conn == null && !highpriority && !createConnection 79 && currentTier != Tier.THREE && !isThrottDown) 80 { 81 WallClock.getInstance().addAlarm(ALARM_NAME, 82 Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmStepUpTier); 83 isScheduled = true; 84 waitConnection = true; 85 } 86 else if (conn == null && !createConnection && currentTier == Tier.THREE 87 && connectionCount - dbConnections.size() >= Tier.THREE.value && !isScheduled) 88 { 89 WallClock.getInstance().addAlarm(ALARM_THROTTLE_DOWN, 90 Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmThrottleDown); 91 isScheduled = true; 92 waitConnection = true; 93 } 94 else 95 { 96 // Connections are maxed out, so we have to wait 97 waitConnection = true; 98 } 99 }100 }101 else102 {103 if (!dbConnections.isEmpty()104 && (highpriority || ((connectionCount - dbConnections.size()) <= maxConnectionCount)))105 {106 conn = dbConnections.remove(0);107 }108 109 if (null == conn)110 {111 // See if we should create a new connection or have to wait112 // if none are in the stack, then see if we should make another113 if ((connectionCount < maxConnectionCount) || highpriority)114 {115 116 createConnection = true;117 connectionCount++;118 }119 else120 {121 // Connections are maxed out, so we have to wait122 waitConnection = true;123 }124 }125 }126 }127 128 if (null != conn)129 {130 // we have a Connection, so we are done131 break;132 }133 else if (createConnection)134 {135 if (!isDerby)136 {137 // update user and password from Configuration138 connProps.put("user", Configuration.NTS_DBUSER.getString());139 connProps.put("password", Configuration.NTS_DBPASSWORD.getString());140 }141 142 try143 {144 conn = DriverManager.getConnection(url, connProps); 145 }146 catch (final SQLException sqle)147 {148 connectionCount--; // count must be decremented since a connection was never created149 DatabaseStatus.reportException(sqle);150 throw sqle;151 }152 153 if (conn != null)154 {155 DatabaseStatus.clearException();156 if (peakConnections < connectionCount)157 {158 peakConnections = connectionCount;159 Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_COUNT, peakConnections);160 Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_TIME, new Date().toString());161 }162 allConnections.put(Integer.valueOf(conn.hashCode()), PersistentStore.createDeathTimeStamp());163 164 break;165 }166 else if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean())167 {168 // Unexpected Database exceptions will result in connection == null.169 // Instead of retrying the connection, throw a checked exception.170 throw new DBConnectionsAllUsedException("There are no DB Connections available");171 }172 else173 {174 // don't break which will do the retry175 }176 }177 else if (waitConnection)178 {179 try180 {181 Stats.inc(Stats.DB_THREADS_WAITING_FOR_CONNECTION);182 synchronized (dbConnectionsToBeFreed)183 {184 dbConnectionsToBeFreed.wait(); // Semaphore.acquire() CountDownLatch.countdown()185 }186 }187 finally188 {189 Stats.dec(Stats.DB_THREADS_WAITING_FOR_CONNECTION);190 }191 // don't break which will do the retry192 }193 else194 {195 // should never hit this case196 break;197 }198 }199 }200 catch (final InterruptedException ie)201 {202 // expected203 throw new SQLException("Exception waiting for DB connection.", ie);204 }205 finally206 {207 208 }209 210 return conn;211 }
5 进一步问题:
5.1 CyclicBarrier方法,await(): 为什么要提供这两种方法来包裹wrapper dowait方法?
5.2 Semaphore和CountDownLatch互换
1 final Semaphore sem = new Semaphore(0); 2 for (int i = 0; i < num_threads; ++ i) 3 { 4 Thread t = new Thread() { 5 public void run() 6 { 7 try 8 { 9 doStuff();10 }11 finally12 {13 sem.release();14 }15 }16 };17 t.start();18 }19 20 sem.acquire(num_threads);
1 final CountDownLatch latch = new CountDownLatch(num_threads); 2 for (int i = 0; i < num_threads; ++ i) 3 { 4 Thread t = new Thread() { 5 public void run() 6 { 7 try 8 { 9 doStuff();10 }11 finally12 {13 latch.countDown();14 }15 }16 };17 t.start();18 }19 20 latch.await();
6 参考文献: http://www.cnblogs.com/dolphin0520/p/3920397.html
----- 单例 -----
单例与static方法的区别: 一个是实例,可以传递,另外一个不可以。(好像也没什么用,传递单例)。
http://stackoverflow.com/questions/519520/difference-between-static-class-and-singleton-pattern#