4.线程间通信


4. 线程间通信

4.1 锁与同步

JAVA 中的锁都是对象锁,是基于对象的。

4.1.1 线程同步

线程同步是指约束线程按照一定的顺序执行。

线程同步可以通过锁 synchronized 来实现:

public class ObjectLock {

    private static Object lock = new Object(); // 临界资源,互斥锁

    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) { // 线程运行时,加对象锁
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread A " + i);
                }
            }
        }
    }

    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread B " + i);
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000); // 让主线程睡眠一会,防止 B 先拿到锁
        new Thread(new ThreadB()).start();
    }
}

4.2 等待/通知机制

通过 lock.wait()lock.notify() 可以实现等待/通知机制。

  • lock.wait() 会使已获得锁的对象主动释放锁,并进入 WAITING 状态;
  • 一个获得了对象锁的线程,通过使用 lock.notify() 函数来随机唤醒一个正在等待该锁的线程。 通过 lock.notifyAll() 函数来唤醒所有正在等待该锁的线程。
    • 使用 notify() 方法并不会主动去放弃锁;
    • 若要释放锁可以调用 wait() 方法去主动阻塞这个线程,或者等待线程执行结束;
    • 如果先使用 wait() 方法去主动阻塞这个线程,则必须先调用 notify() 方法来唤醒其他线程,因为在 wait() 后该线程已经被阻塞。

4.3 信号量 与 volatile 关键字

信号量机制:通过共享的信号量,来实现临界资源的进程(线程)互斥,解决进程(线程)间的通信问题。

volatile 关键字

volatile 关键字实现内存的可见性,某线程对该变量的改动对其他线程可见。通过 volatile 关键字修饰的变量可以当做线程通信的信号量

  • volatile 需要进行原子操作。
    • 实现原子操作,可以通过 synchronized 进行加锁
    • 或者使用 AtomicInteger 等原子类
  • 信号量 通常用于多线程。

volatile 关键字所指的内存可见性,是指

  • 当一个线程对 volatile 修饰的变量进行写操作时,JMM 会立刻把该线程对应的本地内存中的共享变量的值刷新到主内存中;
  • 当另一个线程对 volatile 变量进行读取的时候,JMM 会立即把本地内存设置为无效,并从主内存中读取变量。

4.4 管道通信

管道是指基于管道流的通信方式。JDK 提供的管道流实现方式:

  • PipedWriter (字符流)
  • PipedReader (字符流)
  • PipedOutputStream (字节流)
  • PipedInpuitStream (字节流)

用途:线程之间的信息传送(如字符串),文件等。

public class Pipe {
    static class ReaderThread implements Runnable {
        private PipedReader reader;

        public ReaderThread(PipedReader reader) {
            this.reader = reader;
        }

        @Override
        public void run() {
            System.out.println("this is reader");
            int receive = 0;
            try {
                while ((receive = reader.read()) != -1) {
                    System.out.print((char)receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class WriterThread implements Runnable {

        private PipedWriter writer;

        public WriterThread(PipedWriter writer) {
            this.writer = writer;
        }

        @Override
        public void run() {
            System.out.println("this is writer");
            int receive = 0;
            try {
                writer.write("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        writer.connect(reader); // 这里注意一定要连接,才能通信

        new Thread(new ReaderThread(reader)).start();
        Thread.sleep(1000);
        new Thread(new WriterThread(writer)).start();
    }
}

// 输出:
this is reader
this is writer
test

4.5 通信相关的 API

  • join() 方法:让当前线程陷入等待状态,等待 a.join() 的这个 a 线程执行完毕,然后再继续执行当前线程。
  • wait() 方法:释放锁,进入等待状态,并释放 CPU 资源。
    • wait() 必须放在同步块、同步方法
  • sleep() 方法: 线程休眠,不释放锁,但会释放 CPU 的资源占用。
    • 易发生 死锁

4.6 ThreadLocal

ThreadLocal 线程本地变量。它让每个线程有自己“独立”的变量,各个线程之间互不影响

  • ThreadLocal 为每个线程创建一个副本,每个线程可以访问自己内部的副本变量。
  • ThreadLocal 可以通过 set(), get() 方法来赋值或取值。

4.6.0 源码分析

set(), get(), remove()

在调用 get(), set(), remove() 时,首先都会通过当前所在的线程来拿到对应的 ThreadLocalMap,然后对线程自身的 ThreadLocalMap 进行操作。

// 获取当前线程的 ThreadLocal 变量的值
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 如果还没有调用 set() 赋值,则赋初始值
    return setInitialValue();
}

// 设置当前线程 ThreadLocal 变量的值
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

// 删除当前线程 ThreadLocal 某个变量
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

ThreadLocalMap 的引用在当前线程 Thread 中,该引用的初始化在 ThreadLocal.createMap() 时:

public class Thread implements Runnable {

    // ... 其他方法

    ThreadLocal.ThreadLocalMap threadLocals = null;
}

public class ThreadLocal<T> { 

    // ... 其他方法

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
}
initialValue() 赋初值

赋初值的方法 setInitialValue(),通过重写 initialValue() 方法可以修改默认的初始值:

  • 首先会去取到当前线程的 ThreadLocalMap,该 map 的引用是当前线程的 threadLocals
    • 如果 map 是 null,则新建一个 map
    • 如果不空,则 set 线程私有值
private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

// ThreadLocal 变量的初始值,重写该方法即可修改初值
protected T initialValue() {
    return null;
}

对当前线程新建 ThreadLocalMap, 该:

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap

ThreadLocalMap 的每一个 Entry 都被设置为弱引用 WeakReference 对象,不会导致内存泄漏。

static class ThreadLocalMap {
    // Entry 是 弱引用
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

    // map 的初始大小是 16
    private static final int INITIAL_CAPACITY = 16;

    // 桶数组
    private Entry[] table;

    // map 中的元素数目
    private int size = 0;

    // 扩容的阈值
    private int threshold; // Default to 0
}

4.6.1 用途

ThreadLocal 用于将类的某个静态变量(user ID 或者 transaction ID)与线程状态关联。

  • ThreadLocal 用来解决数据库连接、Session 管理等。因为数据库连接和 Session 管理涉及多个复杂对象的初始化和关闭。如果在每个线程中声明一些私有变量来进行操作,那这个线程就变得不那么“轻量”了,需要频繁的创建和关闭连接。

4.6.2 ThreadLocal 细节分析

Java 中的每个线程都对应一个自己的线程栈,栈中的内容只对当前线程可见。

ThreadLocal 对象的实例是分配在堆上,其实例是被创建它的类持有,包括 ThreadLocal 的值也是被创建它的类持有。这些值是分配在堆上,通过 ThreadLocal 的实现原理来实现只有线程可见。

4.6.3 InheritableThreadLocal 实现多个线程访问 ThreadLocal 变量

副本值可以被子线程所继承。不仅仅是当前线程可以存取副本值,而且它的子线程也可以存取这个副本值。


文章作者: Yu Yang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Yu Yang !
 上一篇
9.Java 对象锁 9.Java 对象锁
9. 锁Java中的锁都是对象锁,Class 类是特殊的 Java 对象,所以类锁也是对象锁。 每个类只有一个 Class 对象,类锁就是 Class 对象的锁。 为什么任意一个 Java 对象都能成为锁对象?对象派生自 Object,
下一篇 
Java 线程池 Java 线程池
12. 线程池 Thread Pool12.1 线程池简介线程池 Thread Pool 是一种基于池化思想管理线程的工具,经常出现在多线程服务器中。 如数据库连接池、httpClient连接池。 Java 中的线程池是 ThreadPo
  目录