Java 多线程

线程,由线程ID,当前指令指针,寄存器集合,以及堆栈组成。一个进程内可以包含多个线程,线程之间可以共享资源

创建线程

实现 Runnable 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Main {

public static void main(String[] args) {

PrintChar printA = new PrintChar('A', 100);
PrintChar printB = new PrintChar('B', 100);

Thread threadA = new Thread(printA);
Thread threadB = new Thread(printB);

threadA.start();
threadB.start();
}
}

class PrintChar implements Runnable {

private char charToPrint;
private int times;

public PrintChar(char charToPrint, int times) {
this.charToPrint = charToPrint;
this.times = times;
}

@Override
public void run() {
for (int i = 0; i < times; i++) {
System.out.print(charToPrint);
}
}
}

注意: 任务中的 run() 方法描述了如何完成这个任务,通过 thread.start() Java 虚拟机会自动调用这个方法。如果直接调用 thread.run(),这仅仅只是在同一个线程中执行该方法,并没有产生新线程

继承 Thread 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Main {

public static void main(String[] args) {

PrintChar printA = new PrintChar('A', 100);
PrintChar printB = new PrintChar('B', 100);

printA.start();
printB.start();
}
}

class PrintChar extends Thread {

private char charToPrint;
private int times;

public PrintChar(char charToPrint, int times) {
this.charToPrint = charToPrint;
this.times = times;
}

@Override
public void run() {
for (int i = 0; i < times; i++) {
System.out.print(charToPrint);
}
}
}

建议: 采用 Runnable 接口的方式,因为 Thread 在继承上存在局限性

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(new PrintChar('A', 100));
executorService.execute(new PrintChar('B', 100));

executorService.shutdown();
}
}

class PrintChar implements Runnable {

private char charToPrint;
private int times;

public PrintChar(char charToPrint, int times) {
this.charToPrint = charToPrint;
this.times = times;
}

@Override
public void run() {
for (int i = 0; i < times; i++) {
System.out.print(charToPrint);
}
}
}

executorService.shutdown() 关闭执行器,但允许完成执行器中的任务
executorService.shutdownNow() 关闭执行器,返回未完成任务列表,return List<Runnable>

进程同步

synchronized 关键字

  • 实例方法加锁,给调用该方法的对象加锁
  • 静态方法加锁,给类加锁
  • 语句加锁,用于任何对象

语句加锁,表达式 expr 必须给出对象的引用。允许加锁部分代码,不必整个方法。

1
2
3
synchronized (expr) {
statements;
}

实例方法加锁可以转换语句加锁

1
2
3
4
5
6
7
8
9
public synchronized void xMethod() {
//method body
}

public void xMethod() {
synchronized (this) {
//method body
}
}

Lock 显示加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Account {
private static Lock lock = new ReentrantLock();
private int balance = 0;

public int getBalance() {
return balance;
}

public void deposit(int amount) {
lock.lock();

balance += amount;

lock.unlock();
}
}

ReentrantLock() 等价于 ReentrantLock(false),公平策略锁,没有特定的获得顺序
ReentrantLock(true),等待时间最长的线程将获得锁

线程间协作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

private static Buffer buffer = new Buffer();

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new ProducerTask());
executorService.execute(new ConsumerTask());
executorService.shutdown();
}

private static class ProducerTask implements Runnable {

@Override
public void run() {
try {
int i = 1;
while (true) {
System.out.println("Producer writes " + i);
buffer.write(i++);
Thread.sleep((int) (Math.random() * 10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class ConsumerTask implements Runnable {

@Override
public void run() {
try {
while (true) {
System.out.println("\t\t Consumer reads " + buffer.read());
Thread.sleep((int) (Math.random() * 10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class Buffer {

private static final int CAPACITY = 1; //buffer size
private LinkedList<Integer> queue = new LinkedList<Integer>();
private static Lock lock = new ReentrantLock();
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();

public void write(int value) {
lock.lock();

try {
while (queue.size() == CAPACITY) {
System.out.println("Wait for notFull condition");
notFull.await();
}

queue.offer(value);
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public int read() {
int value = 0;
lock.lock();

try {
while (queue.isEmpty()) {
System.out.println("\t\t Wait for notEmpty condition");
notEmpty.await();
}

value = queue.poll();
notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
return value;
}
}
}
}

通过 Lock 对象的 newCondition() ,实现进程通信:

  • await(),当前线程等待,直到被唤醒
  • signal(),唤醒一个线程
  • signalAll(),唤醒所有线程

阻塞队列

试图向一个满队列添加元素或者从空队列中删除元素时会导致线程阻塞

ArrayBlockingQueue,使用数组实现阻塞队列,必须指定容量
LinkedBlockingDeque,使用链表实现阻塞队列,可以创造不受限或者受限的队列
PriorityBlockingQueue,优先队列属性,可以创造不受限或者受限的队列
创造不受限的阻塞队列,put 方法永远不会堵塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.LinkedList;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(1);

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new ProducerTask());
executorService.execute(new ConsumerTask());
executorService.shutdown();
}

private static class ProducerTask implements Runnable {

@Override
public void run() {
try {
int i = 1;
while (true) {
System.out.println("Producer writes " + i);
buffer.put(i++);
Thread.sleep((int) (Math.random() * 10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class ConsumerTask implements Runnable {

@Override
public void run() {
try {
while (true) {
System.out.println("\t\t Consumer reads " + buffer.take());
Thread.sleep((int) (Math.random() * 10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Account {

private static Semaphore semaphore = new Semaphore(1);
private int balance = 0;

public int getBalance() {
return balance;
}

public void deposit(int amount) {

try {
semaphore.acquire();
balance += amount;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}

Semaphore(num) 等价于 Semaphore(num,false),公平策略为 false,许可总数为 num
Semaphore(num,true),公平策略为 true
acquire,获取信号量许可,许可总数减一,当无许可可用时,线程锁住
release,释放信号量许可,许可总数加一

死锁

每个线程已经锁定一个对象,而且正在等待锁定另一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
//线程1
synchronized (object1){
synchronized (object2){

}
}

//线程2
synchronized (object2) {
synchronized (object1) {

}
}

使用资源排序可以避免死锁发生,即给每一个需要锁的对象指定一个顺序,确保按照顺序来获得锁

线程状态

线程可以是以下五种状态之一:新建,就绪(可运行),运行,阻塞,结束(死亡)

同步集合

Java 集合框架中的类不是线程安全的,如果同时被多个线程访问和更新,内容可能被破坏

Collections 类提供多种静态方法来将集合转化为同步版本,例如:

  • synchronizedList(List<Object>),返回同步线性表
  • synchronizedMap(Map<Object, Object>),返回同步图

内部实现:

1
2
3
4
5
public boolean add(E o) {
synchronized (this) {
return c.add(o);
}
}

注意:VectorStackHashtable 属于旧类,应使用 ArrayListLinkedListMap 代替

同步集合是线程安全的,但是迭代器有快速失败的特性,意味着当下层集合被另一个线程修改时,如果在整个集合使用一个迭代器,迭代器会抛出异常。使用以下代码防止:

1
2
3
4
5
6
7
8
Set hashSet = Collections.synchronizedSet(new HashSet<>());
synchronized (hashSet){
Iterator iterator=hashSet.iterator();

while(iterator.hasNext()){
System.out.println(iterator.next());
}
}