Java基础知识2

1

参考

《疯狂Java讲义》 Hacker_ZhiDian的博客 《Java高并发程序设计》

多线程

建议请先学习我的深入JVM博客2中关于多线程的讲解。
之前的程序都是在做单线程的编程,所有的程序都只有一条顺序执行流,也就是
都从main()方法开始从上往下执行,如果执行时遇到阻塞,就会一直停留在该
处。开发一个服务器程序时,会接收到来自不同的客户端服务,不同的客户端
应该互不干扰,也就是有多个顺序执行流

线程概述

几乎所以的操作系统都支持同时运行多个任务,一个任务就是一个程序,一个
程序就是一个进程,而一个进程又包含很多顺序执行流,也就是包含多线程

线程和进程

进程是系统进行资源分配的调度的一个独立单位

  • 独立性 进程是系统中独立存在的实体,它可以拥有独立的资源,每一个进程
    都有自己的私有地址空间,在没有经过进程允许的情况下不能访问其他进程
  • 动态性 进程与程序的区别在于,程序是一个静态指令的集合,而进程是在一
    个正在系统中活动的指令集合,具有生命周期和各种不同状态
  • 并发性 多个进程可以在单个处理器上并发执行,相互之间不影响。也就是
    说同一时刻只有一个指令执行,但是多个进程指令快速轮换执行。而并行是
    在多个处理器上多个进程同时执行

多线程扩展了多进程的概念,使得同一个进程可以同时并发处理多个线程,线
程也被称为轻量级进程,线程是进程的执行单元,当进程被初始化时主线程也
被创建。多线程可以共享父进程中的共享变量同时也有自己的资源

ThreadAPI解析

以下是JDK11对Thread类的英文介绍以及我自己的理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
A thread is a thread of execution in a program. The Java Virtual Machine 
allows an application to have multiple threads of execution running concurrently.
Every thread has a priority. Threads with higher priority are executed
in preference to threads with lower priority. Each thread may or may not also
be marked as a daemon. When code running in some thread creates a new
Thread object, the new thread has its priority initially set equal to
the priority of the creating thread, and is a daemon thread if and only if
the creating thread is a daemon.

Java虚拟机允许多个线程同时运行,每一个线程都有一个优先级,拥有更高优先级的
线程优先执行,每一个线程都有可能被标记为一个守护线程,当在一个线程中又创建
一个新的线程,这个新线程的优先级默认是与创造它的线程相同,如果创造它的是一
个守护线程,那么新线程也是守护线程

When a Java Virtual Machine starts up, there is usually a single
non-daemon thread (which typically calls the method named main of
some designated class). The Java Virtual Machine continues to
execute threads until either of the following occurs:

当Java虚拟机开始运行的时候,经常会有一条非守护线程也就是主线程执行,当遇到
如下情况虚拟机运行停止

The exit method of class Runtime has been called and the security manager
has permitted the exit operation to take place.
All threads that are not daemon threads have died, either by returning from
the call to the run method or by throwing an exception that propagates
beyond the run method.

当Runtime类的exit方法被调用以及安全管理员已经允许exit方法执行
所有的非守护线程都已经正常结束,包括run方法的正常返回或者在run方法中抛出异常

There are two ways to create a new thread of execution. One is to declare
a class to be a subclass of Thread. This subclass should override the run
method of class Thread. An instance of the subclass can then be allocated
and started. For example, a thread that computes primes larger than a
stated value could be written as follows:

有两种方式创建一个新的线程,一种方式是声明一个子类继承Thread,这个子类需要
重写run方法,一个子类的实例可以被分配空间并执行start方法,写法如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class PrimeThread extends Thread {
long minPrime;
PrimeThread(long minPrime) {
this.minPrime = minPrime;
}

public void run() {
// compute primes larger than minPrime
. . .
}
}
..
PrimeThread p = new PrimeThread(143);
p.start();
1
2
3
4
5
6
7
8
The other way to create a thread is to declare a class that implements 
the Runnable interface. That class then implements the run method. An
instance of the class can then be allocated, passed as an argument when
creating Thread, and started. The same example in this other style looks
like the following:

另一种方式是声明一个类实现Runnable接口,这个类中实现run方法。创建这个类的实例
并作为参数传递给Thread类然后Thread类执行start方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class PrimeRun implements Runnable {
long minPrime;
PrimeRun(long minPrime) {
this.minPrime = minPrime;
}

public void run() {
// compute primes larger than minPrime
. . .
}
}
//由此可以看出多个Thread实例可以共用一个target
PrimeRun p = new PrimeRun(143);
new Thread(p).start();

线程的创建和启动

Java用Thread类来代表线程,所有的线程对象都必须是Thread类或其子类的
实例。接下来看一下Thread源码内部的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private static int threadInitNumber;
public Thread() {
this((ThreadGroup)null, (Runnable)null, "Thread-" + nextThreadNum(),
0L);
}
//target就是实现Runnable接口的类,这是第二种创建方式的构造器
public Thread(Runnable target) {
this((ThreadGroup)null, target, "Thread-" + nextThreadNum(), 0L);
}

public Thread(ThreadGroup group, Runnable target, String name, long
stackSize) {
this(group, target, name, stackSize, (AccessControlContext)null, true);
}
//这个方法就是为Thread实例命名服务的,Thread-threadInitNumber
private static synchronized int nextThreadNum() {
return threadInitNumber++;
}
//也可以自定义name
public Thread(String name) {
this((ThreadGroup)null, (Runnable)null, name, 0L);
}

//线程的优先级默认是NORM_PRIORITY,优先级越高越容易优先执行,当不代表一定优先执行
public static final int MIN_PRIORITY = 1;
public static final int NORM_PRIORITY = 5;
public static final int MAX_PRIORITY = 10;
//可以设置线程的优先级,从1到10
public final void setPriority(int newPriority) {
this.checkAccess();
if (newPriority <= 10 && newPriority >= 1) {
ThreadGroup g;
if ((g = this.getThreadGroup()) != null) {
if (newPriority > g.getMaxPriority()) {
newPriority = g.getMaxPriority();
}

this.setPriority0(this.priority = newPriority);
}

} else {
throw new IllegalArgumentException();
}
}
//静态方法,获取当前正在执行的线程
@HotSpotIntrinsicCandidate
public static native Thread currentThread();

//函数式接口,可以使用Lambda表达式创建对象
@FunctionalInterface
public interface Runnable {
void run();
}

线程的生命周期

线程被创建并启动后并不是立即进入执行状态也不是一直处于执行状态,线程
的生命周期可以分为5个部分 这张是指点的图

  1. 新建 只是new一个普通的线程实例对象
  2. 就绪 必须调用start方法后线程处于就绪状态,何时运行取决于JVM线程调度
  3. 运行 当处于就绪状态的线程获取CPU执行就处于运行状态,在运行过程中可
    能会中断使其他线程获得执行机会
  4. 阻塞 大多数操作系统采用抢占式调度策略
  • 调用sleep方法线程主动放弃所占用的资源
  • 线程调用一个阻塞IO的方法
  • 线程试图获取一个同步监视器,但是这个监视器被其他线程持有
  • 线程再等待某个通知notify
  • 线程调用suspend方法将线程挂起,一般不用

线程由阻塞状态重新进入就绪状态的情况如下

  • 调用sleep的线程过了指定时间
  • 线程调用的阻塞式IO方法已经返回
  • 线程成功获得同步监视器
  • 线程等待某个通知时其他线程发出一个通知
  • 处于挂起状态的线程调用resume恢复方法
  1. 死亡 不可以对一个已经死亡的线程再次调用start方法
  • run方法正常结束
  • 线程抛出一个未捕获的异常Exception或error
  • 直接调用线程的stop方法来结束一个线程,一般不用

守护线程

在调用start方法之前调用setDaemon(true)就能把一个线程变为守护线程
当所有非守护线程都结束后就算守护线程没有执行完也会结束,不要把重要
的任务放在守护线程

控制线程

Java线程提供了一些便捷的方法控制线程的执行,比如线程的开始start,
线程的休眠sleep,线程的停止stop,目前stop方法已经不推荐使用,停
止一个线程有三种方式

  1. run方法正常结束,有些情况下run方法会一直执行,比如服务端不断接
    受客户端的请求,可以设置一个标志位控制run方法的结束
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void run()
    {
    boolean flag=true;
    while(flag)
    {
    ...
    if(done)
    {
    ...
    flag=false;
    }
    }
    }
  2. stop 调用stop会导致调用它的线程释放该线程所有的锁,导致同步数据
    出现问题。不会再继续执行剩下的代码,并会抛出ThreadDeath异常
  3. interrupt 调用该方法仅仅在线程中打一个标记,并不会立刻停止线程,
    本质还是通过boolean 标志来控制线程的结束。不过要注意,当调用 Object
    类的wait方法或者线程类的 join sleep 等方法时,如果当前线程已经中断
    (中断标志标记为true),调用以上那些方法时将会抛出一个异常,同时清除
    线程的中断标志,抛出InterruptedException异常
  • Thread.interrupt(),设置当前中断标记为true
  • Thread.isInterrupted(),检测当前的中断标记
  • Thread.interrupted(),检测当前的中断标记,然后重置中断标记为false
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public void run() {
    while (Thread.currentThread().isInterrupted() == false) {
    if (/*任务完成*/) {
    Thread.currentThread().interrupt();
    } else {
    // do something ...
    }
    }
    }
    //实际可能发生的情况,以下是指点的代码
    public void run() {
    for (int i = 0; Thread.currentThread().isInterrupted() == false;
    i++) {
    // 如果 i 大于 5 则设置当前线程中断标志为 true,
    // 在此之后 Thread.currentThread().isInterrupted() 方法返回 true
    if (i > 5) {
    Thread.currentThread().interrupt();
    }
    System.out.println("i: " + i);
    try {
    Thread.sleep(1000);
    // 在抛出异常的时候会设置当前线程中断标志为 false,
    // 在此之后 Thread.currentThread().isInterrupted() 方法返回 false
    } catch (InterruptedException e) {
    e.printStackTrace();
    return ; // 防止死循环,在捕货异常时直接返回结束 run 方法
    }
    }
    }

join线程

Thread提供了让一个线程等待另一个线程完成的方法–join。当在某个执行
流中调用其他线程的join方法时,调用线程将被阻塞,直到被join方法加入
的join线程执行完为止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class JoinThread extends Thread
{
public JoinThread(String name)
{
super(name);
}
public void run()
{
for(int i=0;i<10;i++)
System.out.println(i);
}
public static void main(String[] args) throws Exception
{
new JoinThread("新线程").start();
for(int i=0;i<100;i++)
{
if(i==10)
{
JoinThread jt=new JoinThread("被join的线程");
jt.start();
/*
在main线程的执行流中调用了join方法,所以
main线程被停止,只有两个子线程在执行,当
jt线程执行完后主线程才会继续执行,join方
法也可以加时间参数,过了这个时间方法失效
*/
jt.join();
}
}
}
}

看一下源码,在main线程中调用jt.join(),main线程会获取线程对象jt的
锁,调用到wait方法时当前线程就会一直等待并释放jt对象的锁,而jt中的
线程可以执行,执行完死亡时会调用自己的notifyAll方法,这时主线程就
可以继续执行

1
2
3
4
while(isAlive())
{
this.wait(0);//与wait()等效,不考虑等待时间,获取通知前一直等待
}

sleep

如果让当前正在执行的线程暂停一段时间,并进入阻塞状态,可是使用Thread
的静态方法sleep 实现,可以指定一个时间参数,单位是毫秒。在暂停的时间
内,即便没有其他线程需要执行该线程也不会执行。一般不要在线程对象实例
使用wait或notify方法

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception
{
for(int i=0;i<10;i++)
{
if(i==5)
//主线程暂停1s
Thread.sleep(1000);
}
}

yield

yield也是一个静态方法,可以让正在执行的线程暂停但是不会阻塞该线程,只
是将线程转入就绪状态,让系统线程调度器重新调度一次,很有可能该线程又
重新执行

线程的同步

先参考深入JVM2中关于Java线程、工作内存和主内存的讲解

原子性

对于一个操作系统来说,如果一个操作在执行过程中一定不会被打断直到完成这个
操作,否则这个操作不会执行,那么这个操作就就有原子性。例如a=1具有原子性,
a++和a+=1不具有原子性,需要了解这些代码在JVM虚拟机中具体的指令情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int a;
public static void add()
{
a++;
}
public static void set()
{
a=1;
}
public static void addd()
{
a+=1;
}
public static void ad()
{
a=a+1;
}
public static int get()
{
return a;
}

编译.java文件,查看相应的字节码指令。add addd ad三个方法的字节码指令
都一样,说明 a++ a+=1 a=a+1 完全等效,set方法只有三条指令,先将常量
1 压入操作数栈顶,然后从操作数栈顶取出值写入内存常量池中的#2所在的变
量,这只需要一条指令putstatic,可以认为a=1是具有原子性的操作,而a++
有多条指令,比如执行了iadd后切换线程,将导致putstatic没有执行没有及
时更新a++的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//set #2在常量池中指示a
iconst_1
putstatic #2
return

//add
getstatic #2
iconst_1
iadd
putstatic #2
return

//addd
getstatic #2
iconst_1
iadd
putstatic #2
return

//ad
getstatic #2
iconst_1
iadd
putstatic #2
return

//get
getstatic #2
ireturn

线程安全问题

接下来我用指点的例子分析一下 多次运行的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 售卖火车票的测试类
*/
public static class SellTickets {
static int tickets = 10; // 10 张火车票

protected static void sell() {
System.out.println(Thread.currentThread().getName() + "卖出了第 "
+ tickets-- + " 张票");
}

public static void startSell() {
// 开启 5 个线程售票
for (int i = 0; i < 5; i++) {
new Thread("窗口" + (i+1)) {
@Override
public void run() {
while (tickets > 0) {
sell();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
}

当这些结果看起来没有逻辑,接下来分析一下内部的操作。首先肯定是从1开始创
建5个线程,线程1创建完成后休眠切换到线程2以此切换到线程5,线程5的sell
方法先执行,当打印的是9,说明线程1在完成ticke–之后线程进行了切换,并
且已经把更新的值由工作内存写回主内存,所以线程5拿到更新的值后执行sell
方法。第二个结果出现卖相同票的情况,线程3和线程2都是从主内存中拷贝相同
的ticket,因为线程3执行完sell方法后并未将更新的ticket值从工作内存写
回主内存,导致线程2拿到的依然是主内存中未更新的值

同步操作

线程的同步是指一个线程对内存进行操作时,其他线程都不可以对这个内存地址
进行操作,直到该线程完成操作,其他线程才可以对内存地址进行操作

之前的车票问题主要原因是sell方法不具有原子性,也就是需要同一时刻只有
一个线程能进入sell方法,当一个线程执行完sell方法其余线程才可以执行
sell方法。所以可以将sell方法加锁,有锁的时候别的线程只能等待该线程
执行完sell方法,执行完后就解锁别的线程就可以抢占执行sell

synchronized同步机制

synchronized可以修饰方法、代码块,但是不能修饰变量和构造器,默认实现锁
机制,(线程获取锁资源和线程释放锁资源),同步方法的同步监视器是this,
而this总代表调用该方法的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//对于实例方法锁住的是对象,对于静态方法锁住的是类
/*当线程执行到这里的时候会检查调用该方法的对象是否已经被锁住
如果被锁住则等待锁的释放
*/
public synchronized void sell()
{

}
public void sell()
{
//obj就表示当前对象
synchronized(obj)
{

}
}

关于如何理解锁对象我在网上找到了一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Test {
public static void main(String[] args) throws InterruptedException {
Human a=new Human();
Human b=new Human();
new Thread(() -> {
try {
a.drink();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
},"A").start();
Thread.sleep(1000);
new Thread(() -> {
try {
a.sleep();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
},"B").start();
}

}
class Human
{
public void eat() {
System.out.println(Thread.currentThread().getName()+ ": *****eat*****");
}
public synchronized void drink() throws Exception {
System.out.println("先执行drink");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+
": *****drink*****");
}
public synchronized void sleep() {
System.out.println("先执行sleep");
System.out.println(Thread.currentThread().getName()+
": *****sleep*****");
}
}

执行以上代码会发现输出如下,一开始保证线程A一定先执行,可是在drink中
线程中先会休眠10s,但是这时候线程B确没有开始执行,从这可以看出锁的是
同一个对象,两个synchronized方法必须有一个先执行完释放锁对象另一个
才可以执行。注意接下来的操作每一个都是独立的

1
2
3
4
5
先执行drink
该线程B执行了
A: *****drink*****
先执行sleep
B: *****sleep*****

接下来将修饰sleep方法的synchronized关键字去掉,可以发现sleep先执行,
说明sleep方法不受锁影响

1
2
3
4
5
先执行drink
该线程B执行了
先执行sleep
B: *****sleep*****
A: *****drink*****

接下来将sleep方法用staitc修饰,使用static说明锁住的是类而不是对象,两
者没有任何关联不冲突

1
2
3
4
5
先执行drink
该线程B执行了
先执行sleep
B: *****sleep*****
A: *****drink*****

接下来线程B用对象b调用sleep方法,a和b是两个对象,synchronized锁住的
是对象,不同对象不冲突

1
2
3
4
5
先执行drink
该线程B执行了
先执行sleep
B: *****sleep*****
A: *****drink*****

大家可以吧synchronized换成ReentrantLock,ReentrantLock是锁资源,多
个线程共同抢占一个锁资源,跟synchronized本质很像

死锁

当两个线程相互等待对方释放同步监视器就会导致死锁,死锁一旦发生就不会
产生任何异常或错误,只是所有线程处于阻塞状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class A
{
public synchronized void foo(B b)
{
...
Thread.sleep(2000);
b.last();
}
public synchronized void last()
{
...
}
}
class B
{
public synchronized void bar(A a)
{
...
Thread.sleep();
a.last();
}
public synchronized void last()
{

}
}
public class C
{
A a=new A();
B b=new B();
public void init()
{
a.foo(b);
}
public void run()
{
b.bar(a);
}
public static void main(String[] args)
{
C c=new C();
new Thread(c).start();
c.init();
}
}

比如先进入主线程main,对象a获得锁,然后sleep切换到副线程,对象b加锁,
sleep切换到主线程,由于b已经被加锁所以无法执行b.last方法,线程阻塞
切换到副线程,sleep结束后由于a还未解锁所以无法执行a.last方法。
Object类提供了一些方法用于实现更精细的线程之间的同步控制,这些方法只
能在synchronized修饰的方法或代码块中使用

1
2
3
4
5
6
7
8
/*调用这个方法的线程释放对象锁并陷入无限等待,直到某个线程调用这个
对象的notify或notifyAll方法,线程被唤醒后就进入就绪状态,如果中断
标志为true会抛出IterruptException异常 */
Ojbect.wait();
//唤醒一个调用对象的wait方法而陷入等待的线程,如果有多个线程都在等待,随机唤起一个
Object.notify();
//唤醒所有调用对象的t的wait方法而陷入等待的线程
Object.notiryAll();

接下来用指点的例子分析一下以上方法的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 使用 synchronized 和对象的 wait 方法、notifyAll 方法模拟银行账户转账
*/
public static class TransferTest {
int[] accountBalance; // 每个账户的余额,为了简便,这里直接假设为 int 类型

public TransferTest(int[] accountBalance) {
if (accountBalance == null) {
// 此处应做特殊处理
return ;
}
// 账户信息赋初值
this.accountBalance = accountBalance;
}

// 获得账户的总额
public long getAccountSum() {
long res = 0;
for (int i = 0; i < accountBalance.length; i++) {
System.out.println("账户" + i + "余额:" + accountBalance[i]);
res += accountBalance[i];
}
return res;
}

/**
* 进行转账的同步方法
* @param fromIndex 转账方账户下标
* @param toIndex 接受方账户下标
* @param money 转账金额
*/
protected synchronized void transfer(int fromIndex, int toIndex, int
money) {
if (money < 0) {
// 此处应做特殊处理
return ;
}
System.out.println("账户" + fromIndex + "想向" + toIndex + "账户转账"
+ money + "元");
/* 如果转账方账户余额不足,那么调用当前对象的
wait 方法使得当前线程释放对象锁陷入无限等待,
直到其它线程调用了 notify 或者 notifyAll 方法,*/
while (accountBalance[fromIndex] < money) {
System.out.println("账户余额不足,无法转账!");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 账户余额变更
accountBalance[fromIndex] -= money;
accountBalance[toIndex] += money;
System.out.println("转账成功");
notifyAll(); // 唤醒所有因调用了当前对象的 wait 方法而陷入等待的线程
}

public void startTransfer() {
Random random = new Random();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 转账方、接收方、转账金额
int fromAccount;
int toAccount;
int money;
for (int j = 0; j < 10; j++) {
fromAccount = random.nextInt(accountBalance.length);
toAccount = random.nextInt(accountBalance.length);
if (fromAccount == toAccount) {
j--;
continue;
}
money = random.nextInt(500);
transfer(fromAccount, toAccount, money);
}
}
}).start();
}
}
}

public static void main(String[] args) {
TransferTest test = new TransferTest(new int[]{500, 500, 500, 500, 500,
500, 500, 500, 500, 500});
test.startTransfer();

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
System.out.println("当前账户总余额: " + test.getAccountSum());
}

Condition

ReentrantLock类中提供了一个方法newCondition()来获取Condition实例对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Condition {
//与Object的wait方法相同,直到其他线程调用Condition的signal或signalAll
void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long var1) throws InterruptedException;

boolean await(long var1, TimeUnit var3) throws InterruptedException;

boolean awaitUntil(Date var1) throws InterruptedException;
/*唤醒在Lock对象上等待的线程,如果所有线程都在该Lock对象上等待,
则会选择唤醒其中一个线程
*/
void signal();
//唤醒所有等待的线程
void signalAll();
}

以上这些方法需要在获取ReentrantLock锁资源的情况下才能使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* 使用 ReentrantLock 类和 Condition 类模拟银行账户转账
*/
public static class TransferTest2 {
int[] accountBalance; // 每个账户的余额,为了简便,这里直接假设为 int 类型

ReentrantLock lock = new ReentrantLock(); // 创建锁对象
Condition con = lock.newCondition();

public TransferTest2(int[] accountBalance) {
if (accountBalance == null) {
// 此处应做特殊处理
return ;
}
// 账户信息赋初值
this.accountBalance = accountBalance;
}

// 获得账户的总额
public long getAccountSum() {
long res = 0;
for (int i = 0; i < accountBalance.length; i++) {
System.out.println("账户" + i + "余额:" + accountBalance[i]);
res += accountBalance[i];
}
return res;
}

/**
* 进行转账的同步方法
* @param fromIndex 转账方账户下标
* @param toIndex 接受方账户下标
* @param money 转账金额
*/
protected void transfer(int fromIndex, int toIndex, int money) {
// 当前线程尝试获取锁资源
lock.lock();
try {
if (money < 0) {
// 此处应做特殊处理
return ;
}
System.out.println("账户" + fromIndex + "想向" + toIndex +
"账户转账" + money + "元");
/* 如果转账方账户余额不足,那么调用 con 对象的
await 方法使得当前线程释放对象锁陷入无限等待,
直到其它线程调用了 con 对象的 signal 或者 signalAll 方法,*/
while (accountBalance[fromIndex] < money) {
System.out.println("账户余额不足,无法转账!");
try {
con.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 账户余额变更
accountBalance[fromIndex] -= money;
accountBalance[toIndex] += money;
System.out.println("转账成功");
con.signalAll();
// 唤醒所有因调用了 con 对象的 await 方法而陷入等待的线程
} catch (Exception e) {

} finally {
lock.unlock(); // 当前线程释放锁资源
}
}

public void startTransfer() {
Random random = new Random();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 转账方、接收方、转账金额
int fromAccount;
int toAccount;
int money;
for (int j = 0; j < 10; j++) {
fromAccount = random.nextInt(accountBalance.length);
toAccount = random.nextInt(accountBalance.length);
if (fromAccount == toAccount) {
j--;
continue;
}
money = random.nextInt(500);
transfer(fromAccount, toAccount, money);
}
}
}).start();
}
}
}

public static void main(String[] args) {
TransferTest2 test = new TransferTest2(new int[]{500, 500, 500, 500,
500, 500, 500, 500, 500, 500});
test.startTransfer();

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
System.out.println("当前账户总余额: " + test.getAccountSum());
}

ReentrantLock和synchronized的区别

ReentrantLock比synchronized更加灵活,必须手动加锁解锁,而灵活之处
在于ReentrantLock有一个tryLock方法,这个方法也会尝试获取锁,但是
获取失败也不会阻塞,而是直接返回锁的结果,这样就可以在线程获取锁资
源失败的时候让这个线程做别的事而不是一直等待

1
2
3
4
5
6
// 如果在参数给定的时间内成功获取锁资源,那么执行相关任务
if (lock.tryLock() || lock.tryLock(time, unit)) {
// do something...
} else {
// do something else...
}

volatile

用volatile修饰的变量,在线程中被修改后会立刻同步到主内存中,保证在任
意时刻,某个线程从主内存获取的值是最新的,之前在深入JVM2中分析了不能
实现同步,这里就不再解释了。volatile的另一个作用就是防止指令重排序,
指令重排序就是在Java编译期间编译器可能基于优化程序的目的对目的代码
中翻译成的机器指令进行重排序,线程内表现为串行(多个任务,一个接着
一个执行)的语义,在多线程的环境下指令重排可能导致错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 此处变量要用 volatile 关键字修饰,避免因指令重排序导致错误
volatile boolean isInitialized = false;

// 以下方法在线程 A 中执行,在里面进行程序初始化的操作
public void initialize() {
// 进行初始化操作 ......、

isInitialized = true;
// 初始化完成之后设置初始化完成标志为 true,即表示程序初始化完成
}

// 以下方法在线程 B 执行
public void startTask() {
while (!isInitialized) {
sleep(); // 继续等待初始化完成
}
// 初始化完成之后开始执行任务
executeTask();
}

如果不使用volatile修饰可能导致isInitialized=true在初始化之前被调

DCL优化

DCL Double-Check-Lock 是单例模式的一种实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SingleTon {
private static SingleTon singleTon;

private SingleTon() {}

public static SingleTon getInstance() {
if (singleTon == null) {
synchronized (SingleTon.class) {
if (singleTon == null) {
singleTon = new SingleTon();
}
}
}
return singleTon;
}
}

新建一个Singleton包含三个字节码指令

1
2
3
new // 为新建的对象分配内存空间,并将地址压入操作数栈顶
dup // 复制操作数栈顶值,并将其压入栈顶
invokespecial // 调用实例初始化方法<init>:(),初始化对象

然后putstatic指令将创建的对象赋值给静态引用singleTon,这时singleTon
就不为 null ,如果不加 volatile 则可能导致指令重排序, putstatic 在
invokespecial之前执行,然后立即切换线程,这时线程获取的singleTon已经
不为null,但是此时还未执行初始化操作,可能导致返回的对象有异常

1
2
3
4
new // 为新建的对象分配内存空间,并将地址压入操作数栈顶
dup // 复制操作数栈顶值,并将其压入栈顶
putstatic // 将静态引用 singleTon 指向新建的对象
invokespecial // 调用实例初始化方法<init>:(),初始化对象

如果加上volatile就会禁止指令重排序

原子类

JDK5中引入了AtomicInteger AtomicLong AtomicReference等特殊的原子类
保证使用这些类时可以不主动加入额外的同步手段来保证程序的正确性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* AtomicInteger 类的使用,AtomicInteger 类本身对其中的一些操作提供了多线程程序的同步控制
*/
public static class AtomicIntegerTest {
private static AtomicInteger value = new AtomicInteger(0);

public static int getValue() {
return value.get();
}

// 数字递增方法,每次把 value 的值递增 2
public static void evevIncreament() {
value.addAndGet(2);
}

public static void startTest() {
// 新建 10 个子线程用于进行累加操作
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
evevIncreament();
}
}
}).start();
}
// 主线程不断取 value 的值,如果是奇数,那么打印并退出程序
int value = 0;
while (true) {
value = getValue();
if (value % 2 != 0) {
System.out.println(value);
System.exit(0);
}
}
}
}

public static void main(String[] args) {
AtomicIntegerTest.startTest();
}

线程池

阻塞队列

阻塞队列是可以使线程陷入阻塞状态的存储队列。JDK5提供了一个BlockingQueue
接口,这个接口是Queue的子接口,不是用于存储而是用做线程同步的工具,该
接口具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果
该队列已满,则该线程被阻塞。当消费者线程从BlockingQueue中取出元素时
如果该队列为空则线程被阻塞,程序的两个线程通过交替向BlockingQueue
中放入元素、取出元素可以很好地控制线程的通信
BlockingQueue提供了两个支持阻塞的方法

  • put(E e) 将元素e放入队列中,如果队列中元素已满则阻塞生产者线程
  • take() 从队列头部取出元素,如果队列为空则阻塞消费者线程

自定义阻塞队列

以下是我的仿照指点代码的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class BlockQueue {
private Object[] array;
private int num;
ReentrantLock lock=new ReentrantLock();
Condition full;
Condition empty;
public BlockQueue(int size){
array=new Object[size];
full=lock.newCondition();
empty=lock.newCondition();
}
//生产
public void put(Object e) throws InterruptedException{
if(e==null)
throw new IllegalArgumentException();
else{
lock.lock();
try{
while(num==array.length){
System.out.println("产品已满不能生产");
full.await();
}
array[num++]=e;
//此时产品至少有1个
empty.signalAll();
System.out.println(Thread.currentThread().getName()+
"成功生产一个产品,总数为"+num);
}finally{
lock.unlock();
}
}
}
public Object take() throws InterruptedException{
lock.lock();
try{
while(num==0){
System.out.println("没有产品不能消费");
empty.await();
}
Object obj=array[--num];
//此时产品一定未满
full.signalAll();
System.out.println(Thread.currentThread().getName()+
"成功消费一个产品,总数为"+num);
return obj;
}finally{
lock.unlock();
}
}
}

接下来再Main类中创建线程,读者可以运行一下观察结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Main extends Thread {
public static void main(String[] args) throws InterruptedException {
BlockQueue queue=new BlockQueue(5);
for(int i=0;i<5;i++)
{
Product pro=new Product(i,queue);
pro.start();
}
for(int i=0;i<5;i++){
Customer cus=new Customer(i,queue);
cus.start();
}
}
}
class Product extends Thread{
BlockQueue queue;
int times=0;
public Product(int i,BlockQueue queue){
super("生产者线程"+i);
this.queue=queue;
}
public void run(){
while(true){
try {
Object obj = new Object();
times++;
queue.put(obj);
if(times==10)
break;
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
}
}
class Customer extends Thread{
BlockQueue queue;
int times=0;
public Customer(int i,BlockQueue queue){
super("消费者线程"+i);
this.queue=queue;
}
public void run(){
while(true){
try {
times++;
queue.take();
if(times==10)
break;
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
}

}

常见的阻塞队列

ArrayBlockingQueue

接下来分析一下ArrayBlockingQueue的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class ArrayBlockingQueue<E> extends AbstractQueue<E> 
implements BlockingQueue<E>, Serializable {
//存储元素的数组
final Object[] items;
//取下标
int takeIndex;
//存下标
int putIndex;
//元素个数
int count;
final ReentrantLock lock;
//控制消费者线程唤起和阻塞
private final Condition notEmpty;
//控制生产者线程唤起和阻塞
private final Condition notFull;
public void put(E e) throws InterruptedException {
//先检查与元素是否为空
Objects.requireNonNull(e);
ReentrantLock lock = this.lock;
//如果当前线程为中断则获取该锁
lock.lockInterruptibly();

try {
while(this.count == this.items.length) {
this.notFull.await();
}

this.enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
Object[] items = this.items;
items[this.putIndex] = e;
if (++this.putIndex == items.length) {
this.putIndex = 0;
}

++this.count;
//插入操作一定保证数组是有元素的,这时唤起消费者线程
this.notEmpty.signal();
}
public E take() throws InterruptedException {
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
Object var2;
try {
while(this.count == 0) {
this.notEmpty.await();
}
var2 = this.dequeue();
} finally {
lock.unlock();
}
return var2;
}
private E dequeue() {
Object[] items = this.items;
E e = items[this.takeIndex];
items[this.takeIndex] = null;
if (++this.takeIndex == items.length) {
this.takeIndex = 0;
}
--this.count;
if (this.itrs != null) {
this.itrs.elementDequeued();
}
//取出操作保证数组一定不会满,唤醒生产者线程
this.notFull.signal();
return e;
}
}

接下来使用ArrayBlockingQueue来实现消费者和生产者线程,用了一下指定
的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* ArrayBlockingQueue 类的使用,使用 ArrayBlockingQueue 实现生产者、消费者问题:
*/
public static class ArrayBlockingQueueTest {
static int productCount = 0;
// 模拟产品的类
public static class Product {
private String productName;

public Product(String name) {
this.productName = name;
}
}
// 创建一个容量为 5 的公平阻塞队列,
// 公平即为先被阻塞的线程在被唤醒后可以先得到锁资源继续上次未完成的操作
private static ArrayBlockingQueue<Product> queue = new
ArrayBlockingQueue<Product>(5, true);
// 创建生产者线程
private static Thread productThread = new Thread() {
@Override
public void run() {
Product pro = null;
while (true) {
try {
pro = new Product("产品" + (++productCount));
// 插入元素到队尾
queue.put(pro);
System.out.println(pro.productName + " 存入成功!");
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
};
// 创建消费者线程
private static Thread customThread = new Thread() {
@Override
public void run() {
Product pro = null;
while (true) {
try {
// 取出队头元素
pro = queue.take();
if (pro != null) {
System.out.println(pro.productName + " 取出成功!");
}
} catch (InterruptedException e){
e.printStackTrace();
}
}
};
};

public static void startTest() {
productThread.start();
customThread.start();
}
}

public static void main(String[] args) {
CustomBlockingQueueTest.startTest();
}
Author: 高明
Link: https://skysea-gaoming.github.io/2020/10/17/Java%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%862/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.