欢迎光临
我们一起进阶

Java并发编程(十四):ReentrantLock

扫码或搜索:沉默王二
发送 290992
即可立即永久解锁本站全部文章

Java并发编程Lock,ReentrantLock的工作原理及使用方式

 jdk提供synchronized实现线程同步,但有些场景下并不灵活,如多个同步方法,每次只能有一个线程访问;而Lock则可以非常灵活的在代码中实现同步机制

I. Lock的使用

在之前学习阻塞队列中,较多地方使用 ReadWriteLock, Condition,接下来在探究实现原理之前,先研究下锁的使用

Lock 接口的定义

public interface Lock {

     // 获取锁,若当前lock被其他线程获取;则此线程阻塞等待lock被释放
     // 如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁
    void lock();

    // 获取锁,若当前锁不可用(被其他线程获取);
    // 则阻塞线程,等待获取锁,则这个线程能够响应中断,即中断线程的等待状态
    void lockInterruptibly() throws InterruptedException;

    // 来尝试获取锁,如果获取成功,则返回true;
    // 如果获取失败(即锁已被其他线程获取),则返回false
    // 也就是说,这个方法无论如何都会立即返回
    boolean tryLock();

    // 在拿不到锁时会等待一定的时间
    // 等待过程中,可以被中断
    // 超过时间,依然获取不到,则返回false;否则返回true
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    // 释放锁
    void unlock();

    // 返回一个绑定该lock的Condtion对象
    // 在Condition#await()之前,锁会被该线程持有
    // Condition#await() 会自动释放锁,在wait返回之后,会自动获取锁
    Condition newCondition();
}

1. ReentrantLock

可重入锁。jdk中ReentrantLock是唯一实现了Lock接口的类
可重入的意思是一个线程拥有锁之后,可以再次获取锁,

基本使用

最基本的使用场景,就是利用lock和unlock来实现线程同步

以轮班为实例进行说明,要求一个人下班之后,另一个人才能上班,即不能两个人同时上班,具体实现可以如下

public class LockDemo {

    private Lock lock = new ReentrantLock();

    private void workOn() {
        System.out.println(Thread.currentThread().getName() + ":上班!");
    }

    private void workOff() {
        System.out.println(Thread.currentThread().getName() + ":下班");
    }


    public void work() {
        try {
            lock.lock();
            workOn();
            System.out.println(Thread.currentThread().getName() 
              + "工作中!!!!");
            Thread.sleep(100);
            workOff();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockDemo lockDemo = new LockDemo();

        int i = 0;
        List<Thread> list = new ArrayList<>(30);
        do {
            Thread a = new Thread(new Runnable() {
                @Override
                public void run() {
                    lockDemo.work();
                }
            }, "小A_" + i);

            Thread b = new Thread(new Runnable() {
                @Override
                public void run() {
                    lockDemo.work();
                }
            }, "小B_" + i);


          list.add(a);
          list.add(b);
        } while (i++ < 10);
        list.parallelStream().forEach(Thread::start);

        Thread.sleep(3000);
        System.out.println("main over!");
    }
}

上面的示例,主要给出了lock() 和 unlock() 的配套使用,当一个线程在上班迁尝试获取锁,如果获取到,则只有在下班之后才会释放锁,保证在其上班的过程中,不会有线程也跑来上岗抢饭碗,输出如下

小A_3:上班!
小A_3工作中!!!!
小A_3:下班
小A_1:上班!
小A_1工作中!!!!
小A_1:下班
// .... 省略部分
小B_7:上班!
小B_7工作中!!!!
小B_7:下班
小B_5:上班!
小B_5工作中!!!!
小B_5:下班
main over!

从基本的使用中,确定lock的使用姿势一般如下:

  1. 创建锁对象 Lock lock = new ReentrantLock()
  2. 在希望保证线程同步的代码之前显示调用 lock.lock() 尝试获取锁,若被其他线程占用,则阻塞
  3. 执行完之后,一定得手动释放锁,否则会造成死锁 lock.unlock(); 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally块中

即常见的使用姿势应该是

try {
  lock.lock();
  // .....
} finally {
  lock.unlock();
}

一个疑问,若没被加锁,仅只执行lock.unlock()是否会有问题?

测试如下

@Test
public void testLock() {
    Lock lock = new ReentrantLock();
    // lock.lock();
    // lock.unlock();
    lock.unlock();
    System.out.println("123");
}

执行之后,发现抛了异常(去掉上面的注释,即加锁一次,释放锁两次也会抛下面异常)

另一个疑问,代码块中多次上锁,释放锁只一次,是否会有问题?

将前面的TestDemo方法稍稍改动一下

public void work() {
    try {
        lock.lock();
        workOn();
        lock.lock();
        System.out.println(Thread.currentThread().getName() 
          + "工作中!!!!");
        Thread.sleep(100);
        workOff();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

再次执行,发现其他线程都无法再次获取到锁了,运行的gif图如下

因此可以得出结论:

  1. lock() 和 unlock() 配套使用,不要出现一个比另一个用得多的情况
  2. 不要出现lock(),lock()连续调用的情况,即两者之间没有释放锁unlock()的显示调用

2. Condtion 及 Lock的配合使用

在JDK的阻塞队列中,很多地方就利用了Condition和Lock来实现出队入队的并发安全性,以ArrayBlockingQueue为例

内部定义了锁,非空条件,非满条件

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    // ... 省略
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

出队,入队的实现如下(屏蔽一些与锁无关逻辑)

// 入队逻辑
public void put(E e) throws InterruptedException {
    // ...
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) 
        // 如果队列已满,则执行Condtion notFull的等待方法
        // 本线程会释放锁,等待其他线程出队之后,执行 notFull.singal()方法
            notFull.await();
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}

private void enqueue(E x) {
    // 入队,notEmpty 条件执行,唤醒被 notEmpty.await() 阻塞的出队线程
    notEmpty.signal();
}


// 出队
public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
      while (count == 0)
          // 队列为空,线程执行notEmpty.wait(),阻塞并释放锁
          // 等待其他入队线程执行  notEmpty.signal(); 后被唤醒
          notEmpty.await();
      return dequeue();
  } finally {
      lock.unlock();
  }
}

private E dequeue() {
  // ...
  // 出队,notFull 条件执行,唤醒被 notFull.await() 阻塞的入队线程
  notFull.signal();
}

下面看下Condition的定义

public interface Condition {

    // 使当前线程处于等待状态,释放与Condtion绑定的lock锁
    // 直到 singal()方法被调用后,被唤醒(若中断,就game over了)
    // 唤醒后,该线程会再次获取与条件绑定的 lock锁
    void await() throws InterruptedException;

    // 相比较await()而言,不响应中断
    void awaitUninterruptibly();

    // 在wait()的返回条件基础上增加了超时响应,返回值表示当前剩余的时间
    // < 0 ,则表示超时
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 同上,只是时间参数不同而已
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    // 同上,只是时间参数不同而已
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 表示条件达成,唤醒一个被条件阻塞的线程
    void signal();

    // 唤醒所有被条件阻塞的线程。
    void signalAll();
}

通过上面的注释,也就是说Condtion一般是与Lock配套使用,应用在多线程协同工作的场景中;即一个线程的执行,期望另一个线程执行完毕之后才完成

针对这种方式,我们写个测试类,来实现累加,要求如下:

  1. 线程1实现 start-middle的累加;线程2实现middle-end的累加
  2. 线程3实现线程1和线程2计算结果的相加

上面这种情况下,线程3的执行,要求线程1和线程2都执行完毕

说明:下面实现只是为了演示Condition和Lock的使用,上面这种场景有更好的选择,如Thread.join()或者利用Fork/Join都更加优雅

public class LockCountDemo {

    private int start = 10;
    private int middle = 90;
    private int end = 200;

    private volatile int tmpAns1 = 0;
    private volatile int tmpAns2 = 0;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private AtomicInteger count = new AtomicInteger(0);


    private int add(int i, int j) {
        try {
            lock.lock();
            int sum = 0;
            for (int tmp = i; tmp < j; tmp++) {
                sum += tmp;
            }
            return sum;
        } finally {
            atomic();
            lock.unlock();
        }
    }


    private int sum() throws InterruptedException {
        try {
            lock.lock();
            condition.await();
            return tmpAns1 + tmpAns2;
        } finally {
            lock.unlock();
        }
    }

    private void atomic() {
        if (2 == count.addAndGet(1)) {
            condition.signal();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        LockCountDemo demo = new LockCountDemo();
        Thread thread1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + 
                " : 开始执行");
            demo.tmpAns1 = demo.add(demo.start, demo.middle);
            System.out.println(Thread.currentThread().getName() +
                    " : calculate ans: " + demo.tmpAns1);
        }, "count1");

        Thread thread2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() +
                " : 开始执行");
            demo.tmpAns2 = demo.add(demo.middle, demo.end + 1);
            System.out.println(Thread.currentThread().getName() +
                    " : calculate ans: " + demo.tmpAns2);
        }, "count2");

        Thread thread3 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() +
                    " : 开始执行");
                int ans = demo.sum();
                System.out.println("the total result: " + ans);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "sum");


        thread3.start();
        thread1.start();
        thread2.start();

        Thread.sleep(3000);
        System.out.println("over");
    }
}

输出如下

sum : 开始执行
count2 : 开始执行
count1 : 开始执行
count1 : calculate ans: 3960
the total result: 20055
count2 : calculate ans: 16095
over

小结Condition的使用:

  • Condition与Lock配套使用,通过 Lock#newConditin() 进行实例化
  • Condition#await() 会释放lock,线程阻塞;直到线程中断or Condition#singal()被执行,唤醒阻塞线程,并重新获取lock
  • 经典case可以参考jdk的阻塞队列实现(ArrayBlockingQueue, LinkedBlockingQueue)

II. ReentrantLock实现原理

1. AbstractQueuedSynchronizer (简称AQS)

AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题

AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    //取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    // Link to next node waiting on condition, 
    // or the special value SHARED
    volatile Thread thread;

    Node nextWaiter;
}

2. lock()实现

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

源码分析:

  • 首先用一个CAS操作,判断state是否是0(表示当前锁未被占用), 如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功
  • 当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,其他被阻塞

acquire的逻辑如下:
tyrAcquire: 尝试获取锁,非阻塞立即返回

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 没有线程占用锁
        if (compareAndSetState(0, acquires)) {
          // 占用锁成功,设置为独占
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 本线程之前已经获取到锁
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新重入次数
        setState(nextc);
        return true;
    }
    return false;
}

非公平锁tryAcquire的流程是:

  • 检查state字段,若为0,表示锁未被占用,那么尝试占用
  • 若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。
  • 如果以上两点都没有成功,则获取锁失败,返回false

addWaiter: 入队

private Node addWaiter(Node mode) {
    // 创建一个节点,并将其挂在链表的最后
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
    enq(node);
    return node;
}

acquireQueued挂起

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // /标记线程是否被中断过
        boolean interrupted = false;
        for (;;) {
            //获取前驱节点
            final Node p = node.predecessor();
            // 若该节点为有效的队列头(head指向的Node内部实际为空)
            // 尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 获取成功,将当前节点设置为head节点
                p.next = null; // help GC
                failed = false;
                //返回是否被中断过
                return interrupted;
            }
            // 判断获取失败后是否可以挂起,若可以则挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                 // 线程若被中断,设置interrupted为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

线程挂起的逻辑

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //前驱节点的状态
    if (ws == Node.SIGNAL) // 前驱节点状态为signal,返回true
        return true;
    if (ws > 0) { 
    // 从队尾向前寻找第一个状态不为CANCELLED的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点的状态设置为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是“Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!”。

shouldParkAfterFailedAcquire会先判断当前节点的前驱状态是否符合要求:

  • 若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起
  • 如果不符合,再看前驱节点是否>0(CANCELLED)
  • 若是那么向前遍历直到找到第一个符合要求的前驱
  • 若不是则将前驱节点的状态设置为SIGNAL

小结下lock()流程

  1. CAS判断state是否为0来表示锁是否被占用;若未被占用,则独占锁
  2. 否则,尝试获取锁 acquire()
  3. 若尝试获取锁成功(锁就是被当前线程占用的,重入计数+1即可;或者锁正好被释放)
  4. 获取锁失败,则需要创建一个Node节点(包含了线程信息)入队
  5. 挂起线程 acquireQueued, 挂起之前,会先尝试获取锁,值有确认失败之后,则挂起锁,并设置前置Node的状态为SIGNAL(以保障在释放锁的时候,可以保证唤醒Node的后驱节点线程)

3. unlock的实现

尝试释放锁,成功,需要清楚各种状态(计数,释放独占锁)

此外还需要额外判断队列下个节点是否需要唤醒,然后决定唤醒被挂起的线程;

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
        // 查看头结点的状态是否为SIGNAL,如果是则唤醒头结点的下个节点关联的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 计算释放后state值
    if (Thread.currentThread() != getExclusiveOwnerThread())
        // 如果不是当前线程占用锁,那么抛出异常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 锁被重入次数为0,表示释放成功,清空独占线程
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

小结

  1. 创建锁对象 Lock lock = new ReentrantLock()
  2. 在希望保证线程同步的代码之前显示调用 lock.lock() 尝试获取锁,若被其他线程占用,则阻塞
  3. 执行完之后,一定得手动释放锁,否则会造成死锁 lock.unlock(); 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally块中
  4. lock() 和 unlock() 配套使用,不要出现一个比另一个用得多的情况
  5. 不要出现lock(),lock()连续调用的情况,即两者之间没有释放锁unlock()的显示调用
  6. Condition与Lock配套使用,通过 Lock#newConditin() 进行实例化
  7. Condition#await() 会释放lock,线程阻塞;直到线程中断or Condition#singal()被执行,唤醒阻塞线程,并重新获取lock
  8. ReentrantLock#lock的流程图大致如下

赞(1) 打赏
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

小白学堂,学的不止是技术,更是前程

关于我们免责声明

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏