Condition
Condition的作用是对锁进行更精确的控制。
Condition中的await()
方法相当于Object的wait()
方法,Condition中的signal()
方法相当于Object的notify()
方法,Condition中的signalAll()
相当于Object的notifyAll()
方法。
不同的是,Object中的方法是和”同步锁”(synchronized)捆绑使用的;Condition需要与”互斥锁”/“共享锁”捆绑使用。
Condition函数列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void await()
boolean await(long time, TimeUnit unit)
long awaitNanos(long nanosTimeout)
void awaitUninterruptibly()
boolean awaitUntil(Date deadline)
void signal()
void signalAll()
|
Condition示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x); return x; } finally { lock.unlock(); } } }
public class ConditionTest2 { private static BoundedBuffer bb = new BoundedBuffer(); public static void main(String[] args) { for (int i=0; i<10; i++) { new PutThread("p"+i, i).start(); new TakeThread("t"+i).start(); } } static class PutThread extends Thread { private int num; public PutThread(String name, int num) { super(name); this.num = num; } public void run() { try { Thread.sleep(1); bb.put(num); } catch (InterruptedException e) { } } } static class TakeThread extends Thread { public TakeThread(String name) { super(name); } public void run() { try { Thread.sleep(10); Integer num = (Integer)bb.take(); } catch (InterruptedException e) { } } } }
|
Condition await()及signal()原理分析
我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。
而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。
线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。
线程1·接着马上被加入到Condition的等待队列中,等待signal信号的唤醒。
线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,加入到AQS的等待队列中。
线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1并没有被唤醒。
signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1恢复执行。
直到释放所整个过程执行完毕。
LockSupport
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport中的park()和unpark()的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend和Thread.resume所可能引发的死锁”问题。
因为park()和unpark()有许可的存在;调用park()的线程和另一个试图将其unpark()的线程之间的竞争将保持活性。
park()和unpark()调用的是unsafe中的方法,应该直接调用了JNI,原理就不在深究
LockSupport函数列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| static Object getBlocker(Thread t)
static void park()
static void park(Object blocker)
static void parkNanos(long nanos)
static void parkNanos(Object blocker, long nanos)
static void parkUntil(long deadline)
static void parkUntil(Object blocker, long deadline)
static void unpark(Thread thread)
|
LockSupport示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import java.util.concurrent.locks.LockSupport;
public class LockSupportTest1 {
private static Thread mainThread;
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta"); mainThread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+" start ta"); ta.start();
System.out.println(Thread.currentThread().getName()+" block"); LockSupport.park(mainThread);
System.out.println(Thread.currentThread().getName()+" continue"); }
static class ThreadA extends Thread{
public ThreadA(String name) { super(name); }
public void run() { System.out.println(Thread.currentThread().getName()+" wakup others"); LockSupport.unpark(mainThread); } } }
|
park和wait的区别。wait让线程阻塞前,必须通过synchronized获取同步锁。