Thread类之线程间通讯

Thread

Posted by Jay on July 3, 2019

Thread类之线程间通讯

一、volatile与synchronized关键字

  • volatile关键字保证变量在线程间的可见性;
  • synchronized关键字保证了线程对变量访问的互斥性。

synchronized举例如下:

public class Synchronized {
	public static void main(String[] args){
		// 同步代码块,使用了monitorenter、monitorexit指令
	    synchronized (Synchronized.class) {

		}

		// 同步方法,依靠方法修饰符上的ACC_SYNCHRONIZED完成
		m();
	}

	public static synchronized void m() {

	}
}

二、等待、通知机制/Wait、Notify机制

1. Wait/Notify相关方法
  • wait()

    在其他线程调用此对象的notify()方法或notifyAll()方法前,导致当前线程等待。此方法的行为就好像它仅执行 wait(0) 调用一样。调用该方法时,当前线程必须拥有该对象的监视器锁。调用后释放锁。

  • wait(long)

    在其他线程调用此对象的notify() 方法或 notifyAll()方法,或者超过指定的时间量前,导致当前线程等待。调用该方法时,当前线程必须拥有该对象的监视器锁。

  • wait(long, int)

    wait(long)类似,增加了纳秒级的控制。

  • notify()

    唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程。选择是任意性的,并在对实现做出决定时发生。直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。此方法只应由作为此对象监视器的所有者的线程来调用。

  • notifyAll()

    唤醒在此对象监视器上等待的所有线程。直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。此方法只应由作为此对象监视器的所有者的线程来调用。

2.等待、通知机制

​ 示例如下:

public class WaitNotify {
	// 条件
	private static boolean flag = true;
	// 对象锁
	private static Object lock = new Object();
	private static SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");

	public static void main(String[] args) {
		Thread waitThread = new Thread(new Wait(), "WaitThread");
		Thread notifyThread = new Thread(new Notify(), "NotifyThread");
		waitThread.start();

		SleepUtils.second(5);

		notifyThread.start();
	}

	// 等待线程
	private static class Wait implements Runnable {
		@Override
		public void run() {
			// 获取对象的监视器锁
			synchronized (lock) {
				// 条件不满足,等待,并释放锁
				while (flag) {
					try {
						System.out.println(Thread.currentThread().getName() + " wait at " + format.format(new Date()));
						lock.wait(); // 在这里等待
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				// 被notify,条件满足,重新获取到对象监视器锁,继续执行后续的任务
				System.out.println(Thread.currentThread().getName() + " done at " + format.format(new Date()));
			}
		}
	}

	// 通知线程
	private static class Notify implements Runnable {
		@Override
		public void run() {
			// 先获取对象的监视器锁
			synchronized (lock) {
				// 改变条件
				flag = false;
				// 通知。不会马上释放lock的锁,需要当前线程释放了锁之后,等待线程才可能从wait()方法返回
				lock.notify();
				System.out.println(Thread.currentThread().getName() + " notify at " + format.format(new Date()));
				SleepUtils.second(5); // 睡眠之后释放锁
			}
		}
	}

}
// 输出
/*
WaitThread wait at 21:38:36
NotifyThread notify at 21:38:42
WaitThread done at 21:38:47
*/

上述示例可以说明的细节如下:

  • 使用wait()/notify()/notifyAll()时需要先要获取对象锁;
  • 调用wait()后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列;
  • notify()notifyAll()调用后,等待线程依旧不会从wait()方法返回,需要调用notify()notifyAll()的线程释放锁之后,等待线程才有机会从wait()处返回;
  • notify()方法将等待队列中的一个等待线程从等待队列中移动到同步队列中,notifyAll()方法将等待队列中所有的线程全部移动到同步队列,被移动的线程状态由WAITING变为BLOCKED
  • wait()返回的前提是获得了调用对象的锁。

等待、通知机制依赖于同步机制,目的是确保等待线程从wait()返回时能够感知到通知线程对变量做出的修改。

三、等待、通知的模式

1.等待方(消费者)
  • 先获取对象的锁;
  • 如果条件不满足,调用对象的wait()方法,被通知后仍要检查条件(防止假醒);
  • 条件满足后执行后续的逻辑。
synchronized(对象){
    while(条件不满足) {
        对象.wait();
    }
    对应的处理逻辑
}

#####2.通知方(生成者)

  • 获取对象的锁;
  • 改变条件;
  • 通知所有在对象上等待的线程。
synchronized(对象) {
    改变条件
    对象.notify();
}

四、管道输入输出流

用于线程之间的数据传输,传输媒介是内存。示例如下:

public class Piped {
	public static void main(String[] args) throws IOException {
		PipedWriter writer = new PipedWriter();
		PipedReader reader = new PipedReader();
		// 连接输出流和输入流
		writer.connect(reader);

		Thread printThread = new Thread(new Print(reader), "PrintThread");
		printThread.start();

		int value;
		try {
			while ((value = System.in.read()) != -1) {
				writer.write(value);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			writer.close();
		}

	}

	private static class Print implements Runnable {
		private PipedReader reader;

		public Print(PipedReader reader) {
			this.reader = reader;
		}

		@Override
		public void run() {
			int value;
			try {
				while ((value = reader.read()) != -1) {
					System.out.print((char) value);
				}
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				try {
					reader.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

五、Thread.join()方法

1.相关方法
  • join()

    等待该线程终止。等于join(0)

  • join(long millis)

    等待该线程终止的时间最长为 millis 毫秒。超时为 0 意味着要一直等下去。该方法的实现根据isAlive()状态,循环调用this.wait()方法。当线程终止时,会调用线程自身的notifyAll()方法。

  • joni(long millis, int)

    等待该线程终止的时间最长为 millis 毫秒。超时为 0 意味着要一直等下去。该方法的实现根据isAlive()状态,循环调用this.wait()方法。当线程终止时,会调用线程自身的notifyAll()方法。

2.作用与实例

线程A调用B.join()方法(BThread实例),含义是当前线程A等待线程B终止之后才能从B.join()返回。示例如下:

public class JoinTest {
	public static void main(String[] args){
		Thread previous = Thread.currentThread();
		for (int i = 0; i < 10; i++) {
			// 每个线程拥有前一个线程的引用,只有前一个线程终止执行,这个线程才能从等待中返回,继续执行
			Thread current = new Thread(new Domino(previous), String.valueOf(i));
			current.start();
			previous = current;
		}
		SleepUtils.second(5);
		System.out.println(Thread.currentThread().getName() + " terminate.");
	}

	private static class Domino implements Runnable{
		private Thread previous;

		public Domino(Thread previous) {
			this.previous = previous;
		}

		@Override
		public void run() {
			try {
				previous.join();
				System.out.println(Thread.currentThread().getName() + " terminate.");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

/**
输出
main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.
/*

六、ThreadLocal

线程本地变量。ThreadLocal的实现原理是每一个Thread维护一个ThreadLocalMap映射表,映射表的keyThreadLocal实例,并且使用的是ThreadLocal弱引用value是具体需要存储的Object。一个线程可以根据一个ThreadLocal实例获取绑定到该线程的值。

通过set()方法设置值,在当前线程下通过get()方法获取原先设置的值。示例如下:

public class ThreadLocalTest {
	// 如果get()方法调用之前未调用set()方法,则在调用get()时,会先调用initialValue()方法进行初始化
	// 每个线程调用一次
	private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
		@Override
		protected Long initialValue() {
			return System.currentTimeMillis();
		}
	};

	public static void begin() {
		TIME_THREADLOCAL.set(System.currentTimeMillis());
	}

	public static long end() {
		return System.currentTimeMillis() - TIME_THREADLOCAL.get();
	}


	public static void main(String[] args){
		ThreadLocalTest.begin();

		SleepUtils.second(1);

		System.out.println("Cost: " + ThreadLocalTest.end() + " ms.");
	}
}
/*
输出:
Cost: 1002 ms.
*/