LockSupport源码分析

LockSupport

Posted by Jay on February 1, 2019

LockSupport源码分析

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。简而言之,当调用LockSupport.park时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark时,必须把等待获得许可的线程作为参数进行传递,才能让此线程继续运行。

一、LockSupport分析

1.属性
// Hotspot implementation via intrinsics API
// Unsafe实例
private static final sun.misc.Unsafe UNSAFE;
// 表示Thread parkBlocker字段内存偏移地址
private static final long parkBlockerOffset;
// 表示Thread threadLocalRandomSeed字段的内存偏移地址
private static final long SEED;
// 表示Thread threadLocalRandomProbe字段的内存偏移地址
private static final long PROBE;
// 表示Thread threadLocalRandomSecondarySeed字段的内存偏移地址
private static final long SECONDARY;

static {
    try {
        // 获取Unsafe实例
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        // Class<Thread>
        Class<?> tk = Thread.class;
        // 获取Thread的parkBlocker字段的内存偏移地址
        parkBlockerOffset = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("parkBlocker"));
        // 获取Thread的threadLocalRandomSeed字段的内存偏移地址
        SEED = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSeed"));
        // 获取Thread的threadLocalRandomProbe字段的内存偏移地址
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
        // 获取Thread的threadLocalRandomSecondarySeed字段的内存偏移地址
        SECONDARY = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
    } catch (Exception ex) { throw new Error(ex); }
}

UNSAFE字段表示sun.misc.Unsafe类实例,可参考文章Unsafe使用指南的分析,一般程序中不允许直接调用;而long型的字段表示Thread实例对象相应字段在内存中的偏移地址,可以通过该偏移地址获取或者设置该字段的值。

2.构造方法
private LockSupport() {} // Cannot be instantiated. 构造器私有化,LockSupport不允许实例化
3.核心方法分析

在分析LockSupport中的方法之前,先引入sun.misc.Unsafe类中的parkunpark函数,因为LockSupport的核心方法都是基于Unsafe类中定义的parkunpark函数实现的,下面给出两个函数的定义。

public native void park(boolean isAbsolute, long time);
public native void unpark(Thread thread);

对两个函数的说明如下

  ① park函数,阻塞线程,并且该线程在下列情况发生之前都会被阻塞:① 调用unpark函数② 该线程被中断。③ 设置的时间到了。并且当time为绝对时间时,isAbsolutetrue,否则isAbsolutefalse。当time为0时,表示无限等待,直到unpark发生。

  ② unpark函数,即激活调用park后阻塞的线程。

(1) park

park方法有两个重载版本:

// 阻塞当前线程
public static void park()
public static void park(Object blocker)

两个方法的区别在于park()方法没有blocker参数,即没有设置线程的parkBlocker字段。park(Object)方法如下:

// 阻塞当前线程
public static void park(Object blocker) {
    Thread t = Thread.currentThread(); // 当前线程
    setBlocker(t, blocker); // 设置blocker
    UNSAFE.park(false, 0L); // 阻塞线程,一直阻塞(0L)
    setBlocker(t, null); // 继续运行之后,设置blocker
}

调用park方法时,首先获取当前线程,然后设置当前线程的parkBlocker字段,即调用setBlocker方法,之后调用Unsafe类的park方法,之后再调用setBlocker方法。那么问题来了,为什么要在此park方法中要调用两次setBlocker方法呢?原因其实很简单,调用park方法时,当前线程首先设置好parkBlocker字段,然后再调用Unsafepark方法,此后,当前线程就已经阻塞了,等待该线程的unpark方法被调用,所以后面的一个setBlocker方法无法运行,unpark方法被调用,该线程获得许可后,就可以继续运行了,也就运行第二个setBlocker,把该线程的parkBlocker字段设置为null,这样就完成了整个park方法的逻辑。如果没有第二个setBlocker,那么之后没有调用park(Object blocker),而直接调用getBlocker方法,得到的还是前一个park(Object blocker)设置的blocker,显然是不符合逻辑的。总之,必须要保证在park(Object blocker)整个方法执行完后,该线程的parkBlocker字段又恢复为null。所以,park(Object)方法里必须要调用setBlocker方法两次。setBlocker方法如下。 

// 设置线程t的parkBlocker字段为arg
private static void setBlocker(Thread t, Object arg) {
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

另外一个无参重载版本park()方法如下:

// 阻塞当前线程
public static void park() {
    UNSAFE.park(false, 0L); // 阻塞线程,一直阻塞(0L)
}

调用了park方法后,会禁用当前线程,除非许可可用。在以下三种情况之一发生之前,当前线程都将处于休眠状态,即下列情况发生时,当前线程会获取许可,可以继续运行。

  ① 其他某个线程将当前线程作为目标调用unpark

  ② 其他某个线程中断当前线程。

  ③ 该调用不合逻辑地(即毫无理由地)返回。

(2).parkNanos

此方法表示在许可可用前阻塞当前线程,并最多等待指定的等待时间。

// 限时(纳秒)阻塞当前线程
public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread(); // 当前线程
        setBlocker(t, blocker); // set Blocker
        UNSAFE.park(false, nanos); // 限时阻塞  ns
        setBlocker(t, null); // set blocker = null
    }
}
(3).parkUntil
// 限时阻塞当前线程(绝对时间,毫秒)
public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(true, deadline); // true表示使用的是绝对时间,单位毫秒
    setBlocker(t, null);
}
(4).unpark
// 取消阻塞当前线程
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

二、示例

下面对比使用对象监视器锁和LockSupport,来实现线程之间的通信。

1. 对象监视器锁
public class ObjectMonitorLockTest {
    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
            try {
                System.out.println(Thread.currentThread().getName()+" start ta");
                ta.start(); // 主线程运行,ta阻塞

                System.out.println(Thread.currentThread().getName()+" block");
                // 主线程等待,ta运行
                ta.wait();

                System.out.println(Thread.currentThread().getName()+" continue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        @Override
      public void run() {
            synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" wakeup others");
                notify();    // 唤醒“当前对象上的等待线程”
            }
        }
    }
}
// 输出
main start ta
main block
ta wakeup others
main continue
2. LockSupport
public class LockSupportTest {
    private static Thread mainThread;

    public static void main(String[] args) {
        ThreadA t = new ThreadA("test");
        mainThread = Thread.currentThread();

        System.out.println(Thread.currentThread().getName()+" start ta");
        t.start();

        System.out.println(Thread.currentThread().getName()+" block");
        LockSupport.park(mainThread);

        System.out.println(Thread.currentThread().getName()+" continue");

    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        @Override
      public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" wakeup others");
            LockSupport.unpark(mainThread);
        }

    }

}
// 输出
main start ta
main block
test wakeup others
main continue

参考文献