Java基础知识3

1

参考

《疯狂Java讲义》 《Java高并发程序设计》 Hacker_ZhiDian的博客

线程池

对于一个服务器端的程序,对于每个用户的请求,为了提高服务器资源的利用率
和用户请求的响应速度,可以给每个用户都创建一个线程处理请求,如果用户访
问非常频繁那么频繁的创建和销毁线程会给服务器带来巨大开销,线程池可以理
解为一个处理任务的线程集合,在某个线程处理完任务后并不会立即销毁,而是
在线程池中保留一段时间,如果有新的请求那么这个线程就会继续处理任务

Java线程池中的线程分为核心线程和非核心线程,一个线程池中提供的线程数量
是有限的,如果核心线程数没有达到规定的线程的最大数量,那么会创建新的核
心线程来执行任务,否则就把任务放到任务队列的末尾,这个操作也有两种可能
如果队列未满就添加到任务队列,否则创建非核心线程执行任务,如果非核心线
程也达到最大值,就会执行饱和策略

线程池提交的整个流程
注意notify调用后锁对象依然由调用notify的那个线程持有,原先被阻塞的线程
必须等到锁对象释放才可以抢占锁资源,以下的例子可以说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package a;

public class SingleTon extends Thread{
public synchronized void wa() {
try {
System.out.println(Thread.currentThread().getName()+"开始等待");
wait();
System.out.println(Thread.currentThread().getName()+"等待结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void ov() {
try {
System.out.println(Thread.currentThread().getName()+"准备唤起");
notify();
System.out.println(Thread.currentThread().getName()+"唤起线程");
Thread.sleep(10000);
System.out.println(Thread.currentThread().getName()+
"唤起后wa执行了吗");
}catch (Exception e)
{
e.printStackTrace();
}
}
public void run()
{
new Thread(new Runnable(){
public void run()
{
wa();
}
},"线程1").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable(){
public void run()
{
ov();
}
},"线程2").start();
}
public static void main(String[] args) {
SingleTon a=new SingleTon();
a.run();
}
}

输出如下

1
2
3
4
5
线程1开始等待
线程2准备唤起
线程2唤起线程
线程2唤起后wa执行了吗
线程1等待结束

线程池源码分析

ThreadPoolExecutor继承于AbstractExecutorService类

1
2
public class ThreadPoolExecutor extends AbstractExecutorService {
}

AbstractExecutorService实现ExecutorService接口

1
2
public interface ExecutorService extends Executor {
}

这个接口又继承了Executor接口,Executors是一个类注意区分,一个线程池可以
接受任务类型有Runnable和Callable,分别对应了execute和submit方法

1
2
3
4
public interface Executor {
//这个方法就是向线程池中提交任务的核心方法,由var1执行任务
void execute(Runnable var1);
}

再回到最初看一下ExecutorService的源码,ExecutorService代表尽快执行
线程的线程池,只要线程池中有空闲线程就会立刻执行线程任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.util.Collection;
import java.util.List;

public interface ExecutorService extends Executor {
/*
这个方法用于关闭线程池,调用这个方法之后,通过execute 方法提交的任务将不会被接受,
但是其会等待线程池中任务队列中已有的任务和正在执行的任务执行完成之后再关闭线程池
*/
void shutdown();
/*
这个方法尝试立即关闭线程池,停止处理器正在执行的任务并拒绝接受新的任务,
并且将任务队列中未被执行的任务添加到一个 List 列表作为返回值返回,
*/
List<Runnable> shutdownNow();
//线程池是否已经被关闭
boolean isShutdown();
//判断线程池中所有任务是否被完全终止,只有调用shutdown或shutdownNow才可能返回true
boolean isTerminated();

/*
阻塞调用该方法的线程,直到发生了下面三种情况:
1、方法参数规定的时间段过去,此时方法返回
2、线程池中所有的任务执行完成并且线程池被关闭,方法返回
3、调用该方法的线程发生了 InterruptedException 异常,此时方法
会抛出 InterruptedException 异常
如果线程池中所有任务被成功的完成并且线程池成功关闭,那么方法返
回 true,否则方法返回 false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交新任务到线程池中,返回Future对象,这个对象封装了获取任务执行状态信息的方法
<T> Future<T> submit(Callable<T> task);
/*提交新任务到线程池中,将任务执行状态信息储存在 result 对象中,
最后返回一个 Future 对象,提供获取任务执行状态信息的方法*/
<T> Future<T> submit(Runnable task, T result);
//返回null
Future<?> submit(Runnable var1);
/*
提交多个任务到线程池中,并且阻塞调用该方法的线程,
直到所有的任务都被执行完成或者调用该方法的线程发生
InterruptedException 异常,此时方法会抛出该异常,
方法返回一个保存了每个任务的执行状态信息的 Future 对象的 List 对象
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1)
throws InterruptedException;
/*
功能同上,不过添加了时间参数限制,即为如果线程池没有在参数规定的时间内执行完成所
有的任务,那么方法会强制返回,此时,执行完成的任务对应的 Future 对象的
isDone() 方法返回 true,代表对应任务执行完成,其他任务对应的 Future 对象
的 isCancelled() 方法返回 true,代表任务未执行完成并且被取消
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1,
long var2, TimeUnit var4) throws InterruptedException;
/*
也是提交多个任务到线程池中,并且阻塞调用该方法的线程,但是这个方法是有任意
一个任务被执行完成就会返回,返回执行完成的那个任务的执行结果(即为对应任
务 Callable 对象的 call() 方法的返回结果)同样的如果在执行任务过程中调
用该方法的线程发生了中断,方法会抛出一个 ExecutionException 异常
如果没有任何一个任务成功执行,那么方法会抛出一个 ExecutionException 异常
*/
<T> T invokeAny(Collection<? extends Callable<T>> var1)
throws InterruptedException, ExecutionException;
/*
和上面方法同样的功能,在其基础上加了时间限制,即如果在规定时间内没有任何一
个提交的任务执行完成,该方法会返回,同时抛出一个 TimeoutException 异常
*/
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2,
TimeUnit var4) throws InterruptedException,
ExecutionException, TimeoutException;
}

接下来看一下AbstractExecutorService的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public abstract class AbstractExecutorService implements ExecutorService {
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
} else {
RunnableFuture<Void> ftask = this.newTaskFor(task, (Object)null);
this.execute(ftask);
return ftask;
}
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException();
} else {
RunnableFuture<T> ftask = this.newTaskFor(task, result);
this.execute(ftask);
return ftask;
}
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException();
} else {
RunnableFuture<T> ftask = this.newTaskFor(task);
this.execute(ftask);
return ftask;
}
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask(callable);
}
}

以上submit的重载方法,方法中先是调用newTashFor方法获取RunnableFuture
对象,RunnableFuture是一个接口,这个接口继承了Runnable和Future,因此
其对象既可以作为Runnable 对象来给execute(Runnable task) 提供参数,也
可以作为submit的返回值,而其实现类FutureTask则是实现了RunnableFuture
接口的方法,在其的run()方法中会调用创建FutureTask对象时传入的Callable
对象的call()方法或者是Runnable对象的run()方法

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

两个submit方法通过execute来向线程池中提交任务AbstractExecutorService
并没有实现execute方法,其子类实现了execute方法,这个方法的执行逻辑可以
分为三步

  1. 如果当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新
    提交的任务(此时新建的线程相当于核心线程)
  2. 如果当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添
    加到缓存队列中
  3. 如果当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启
    动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    // 代表线程池运行状态的常量
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
    // 获取线程池的运行状态
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    // 获取线程池的当前线程总数
    private static int workerCountOf(int c) { return c & CAPACITY; }
    public void execute(Runnable command) {
    if (command == null) {
    throw new NullPointerException();
    } else {
    // ctl 是一个 AtomicInteger 类型的对象,即为原子类,
    // 可以将 ctl 理解成保存了线程池的线程数、运行状态等信息的变量,
    // 通过对应的方法可以提取出对应的信息,
    // 比如 workerCount(ctl) 方法可以得到当前线程池中的线程总数
    int c = this.ctl.get();
    // 第一步:如果线程数量小于核心线程数,则启动一个核心线程执行任务
    // 并且将当前任务作为该核心线程的第一个执行任务
    if (workerCountOf(c) < this.corePoolSize) {
    // 如果成功创建了核心线程处理任务,方法返回
    if (this.addWorker(command, true)) {
    return;
    }
    /* 防止在这个过程中又有新的任务提交了造成错误,于是需要再次获取检查
    一次变量值 */
    c = this.ctl.get();
    }
    //第二步:当前线程数量大于等于核心线程数,加入任务队列,成功的话会进行二次检查
    if (isRunning(c) && this.workQueue.offer(command)) {
    /* 同样的道理,为了防止添加任务到任务队列中又有新的任务提交造成错误,
    再次更新变量值 */
    int recheck = this.ctl.get();
    // 如果线程池不处于运行状态(shutdown、stop),
    // 并且将刚添加的任务成功从任务队列移除,执行饱和策略
    if (!isRunning(recheck) && this.remove(command)) {
    this.reject(command);
    } else if (workerCountOf(recheck) == 0) {
    //启动非核心线程执行,注意这里任务是null,其实里面会去取任务队列里的任务执行
    this.addWorker((Runnable)null, false);
    }
    //第三步:加入不了队列(即队列满了),尝试启动非核心线程
    //如果启动不了非核心线程执行,说明到达了最大线程数量的限制,会使用第7个参数抛出异常
    /*如果创建非核心线程执行任务失败,那么证明整个线程池的线程数达到
    最大线程数、任务队列已满,或者是调用了线程池的
    shutdown() 方法,拒绝接受任何新的任务,此时应该调用饱和策略*/
    } else if (!this.addWorker(command, false)) {
    this.reject(command);
    }

    }
    }

    }
    以上代码可以看出即便当前活动的线程有空闲的,只要这个活动的线程数量小
    于设定的核心线程数,那么依旧会启动一个新线程来执行任务execute方法中
    多次通过addWorker方法添加线程任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    //添加新的线程来将 firstTask任务作为第一任务执行,执行完成之后执
    //行线程池任务队列中其他任务,core 参数为是否添加核心线程
    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
    int c = ctl.get();
    // 记录当前线程池运行状态
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 如果线程池已经被停止或者关闭等,那么返回 false
    if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
    firstTask == null &&
    ! workQueue.isEmpty()))
    return false;

    for (;;) {
    // 获取线程池中线程数
    int wc = workerCountOf(c);
    //如果线程数大于线程池最大线程或者想添加核心线程来处理firstTask但是核心线程已经饱和
    // 或者添加非核心线程来处理 firstTask 但是线程池总线程数已达到饱和,返回 false
    if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;
    // 如果保存线程池状态信息的变量没发生变化,证明这个过程线程池是没有发生状态变化的,
    // 此时将 ctl 值更新,使得其包装的线程池工作线程数信息加一
    if (compareAndIncrementWorkerCount(c))
    break retry;
    // 防止这个过程中又有其他操作进行造成错误,再次读取 ctl 的值
    c = ctl.get(); // Re-read ctl
    // 如果线程池当前运行状态和线程池前面的运行状态不等,证明线程池状态发生改变,
    // 调到外层循环重新执行
    if (runStateOf(c) != rs)
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    // 添加新的工作对象来处理任务,
    // Worker 对象为 任务--线程 的包装类,创建该对象时会创建一个新的线程
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    // 同步块
    mainLock.lock();
    try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    int rs = runStateOf(ctl.get());
    // 检测当前线程池运行状态和方法参数是否合法
    if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive()) // precheck that t is startable
    throw new IllegalThreadStateException();
    // 工作集中添加新建的线程包装类, workers 为一个 HashSet<Woker> 的对象
    // 保存的是线程池中的所有 Worker 对象
    workers.add(w);
    int s = workers.size();
    // 更新线程池中出现过的最大的线程数
    if (s > largestPoolSize)
    largestPoolSize = s;
    // 工作添加标志置为 true
    workerAdded = true;
    }
    } finally {
    mainLock.unlock();
    }
    if (workerAdded) {
    // 启动新建的线程,执行任务
    t.start();
    // 线程启动标志置为 true
    workerStarted = true;
    }
    }
    } finally {
    if (! workerStarted)
    addWorkerFailed(w);
    }
    return workerStarted;
    }
    创建新线程处理firstTask任务时通过新建Worker对象完成,Worker是
    ThreadPoolExecutor类的内部类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private final class Worker extends AbstractQueuedSynchronizer implements 
    Runnable {
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
    this.setState(-1);
    this.firstTask = firstTask;
    /*在创建一个新的线程时传入一个Runnable对象,线程运行时执行
    该Runnable对象的run方法
    */
    this.thread = ThreadPoolExecutor.this.getThreadFactory()
    .newThread(this);
    }
    public void run() {
    ThreadPoolExecutor.this.runWorker(this);
    }
    }
    这个类封装了要执行的任务和执行这个任务的Thread线程对象,创建线程时传入 this
    参数,那么创建的线程在执行时就会调用这个对象的run方法,即调用runWorker方法
    到这里就应该知道线程池存储的并不是Thread对象,而是封装后的Worker对象,并且
    每新建一个Worker对象都会吧这个对象存入workers集合中,wokers集合实际是
    ThreadPoolExecutor的一个HashSet集合类型的成员变量
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    final void runWorker(ThreadPoolExecutor.Worker w) {
    Thread wt = Thread.currentThread();
    /*如果Worker对象的firstTask不为空,则先执行第一个任务,即创建Worker对象
    传入的Runnable参数*/
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;

    try {
    //从线程池的任务阻塞队列中取出任务,如果任务队列为空则会使线程陷入阻塞
    while(task != null || (task = this.getTask()) != null) {
    w.lock();
    if ((runStateAtLeast(this.ctl.get(), 536870912) ||
    Thread.interrupted() && runStateAtLeast(this.ctl.get(),
    536870912)) && !wt.isInterrupted()) {
    wt.interrupt();
    }

    try {
    this.beforeExecute(wt, task);

    try {
    //执行任务
    task.run();
    this.afterExecute(task, (Throwable)null);
    } catch (Throwable var14) {
    this.afterExecute(task, var14);
    throw var14;
    }
    } finally {
    task = null;
    ++w.completedTasks;
    w.unlock();
    }
    }

    completedAbruptly = false;
    } finally {
    this.processWorkerExit(w, completedAbruptly);
    }

    }
    在这个方法中线程会不断从线程池中取出任务并执行任务,直到取出的任务对象
    为空,这是线程池已经关闭或任务队列为空,这是就会跳出while循环进入
    finally语句块,接下来是getTask的源码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    private Runnable getTask() {
    boolean timedOut = false;
    //死循环,方法要么返回null要么返回Runnable对象代表取到任务
    while(true) {
    int c = this.ctl.get();
    if (runStateAtLeast(c, 0) && (runStateAtLeast(c, 536870912) ||
    this.workQueue.isEmpty())) {
    this.decrementWorkerCount();
    return null;
    }

    int wc = workerCountOf(c);
    boolean timed = this.allowCoreThreadTimeOut || wc >
    this.corePoolSize;
    if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1
    && !this.workQueue.isEmpty()) {
    try {
    Runnable r = timed ? (Runnable)this.workQueue.poll(
    this.keepAliveTime, TimeUnit.NANOSECONDS) :
    (Runnable)this.workQueue.take();
    if (r != null) {
    return r;
    }

    timedOut = true;
    } catch (InterruptedException var6) {
    timedOut = false;
    }
    } else if (this.compareAndDecrementWorkerCount(c)) {
    return null;
    }
    }
    }
    以下是一个线程池的实际用例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class ThreadPollTest
    {
    public static void main(String[] args) throws Execption
    {
    // 创建一个具有固定线程数 (6) 线程
    ExecutorService pool = Executors.newFixedThreadPool(6);
    // 使用 Lambda 表达式创建 Runnable对象
    Runnable target = () -> {
    for (int i= 0 ; i < 100 ; i ++ )
    System.out.println(Thread.currentThread().getName()+i);
    } ;
    //向线程池提交两个线程
    pool.submit(target);
    pool.submit(target);
    //关闭线程池
    pool.shutdown() ;
    }
    }

线程组

Java用ThreadGroup来表示线程组,用于对一批线程进行分类管理。Java允许
程序对线程组进行控制,对线程组的控制相当于控制这一批线程,用户创建的
所有线程都属于线程组,如果没有指定则属于默认线程组,子线程默认与父线
程处在同一个线程组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 线程组测试
*/
public static class ThreadGroupTest {

// 获取主线程所在线程组,在主线程中执行
public static void printMainThreadGroup() {
// 获取当前线程所在的线程组对象
ThreadGroup group = Thread.currentThread().getThreadGroup();
System.out.println(Thread.currentThread().getName() +
" 线程所在的线程组:" + group.getName());
}

public static void startTest() {
printMainThreadGroup();
}
}

public static void main(String[] args) {
ThreadGroupTest.startTest();
}

主线程组的name是main,线程组除了可以包含线程也可以包含子线程组,类似文
件夹。Thread提供了几个构造器来设置新创建的线程属于哪个线程组

1
2
3
4
5
Thread(ThreadGroup group,Runnable target)
Thread(ThreadGroup group,Runnable target,String name) 线程名为name
Thread(ThreadGroup group,String name)
Thread没有提供修改线程所属的方法,但是提供获取线程组的方法
getThreadGroup()

以下是ThreadGroup的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public class ThreadGroup implements UncaughtExceptionHandler {
private final ThreadGroup parent;
String name;
//线程的最大优先级
int maxPriority;
boolean destroyed;
boolean daemon;
int nUnstartedThreads;
int nthreads;
Thread[] threads;
int ngroups;
ThreadGroup[] groups;
private ThreadGroup() {
this.nUnstartedThreads = 0;
this.name = "system";
this.maxPriority = 10;
this.parent = null;
}
//创建一个新线程组,以调用这个构造方法的线程所在线程组为父线程组
public ThreadGroup(String name) {
this(Thread.currentThread().getThreadGroup(), name);
}
//创建一个新线程组,parent为指定的父线程组
public ThreadGroup(ThreadGroup parent, String name) {
this(checkParentAccess(parent), parent, name);
}
//返回在当前线程组和子线程组中活跃的线程数量(估计值)
public int activeCount() {
int result;
int ngroupsSnapshot;
ThreadGroup[] groupsSnapshot;
synchronized(this) {
if (this.destroyed) {
return 0;
}

result = this.nthreads;
ngroupsSnapshot = this.ngroups;
if (this.groups != null) {
groupsSnapshot = (ThreadGroup[])Arrays.copyOf(
this.groups, ngroupsSnapshot);
} else {
groupsSnapshot = null;
}
}

for(int i = 0; i < ngroupsSnapshot; ++i) {
result += groupsSnapshot[i].activeCount();
}

return result;
}
//返回在当前线程组和子线程组中活跃的线程组(估计值)
public int activeGroupCount() {
int ngroupsSnapshot;
ThreadGroup[] groupsSnapshot;
synchronized(this) {
if (this.destroyed) {
return 0;
}

ngroupsSnapshot = this.ngroups;
if (this.groups != null) {
groupsSnapshot = (ThreadGroup[])Arrays.copyOf(
this.groups, ngroupsSnapshot);
} else {
groupsSnapshot = null;
}
}

int n = ngroupsSnapshot;

for(int i = 0; i < ngroupsSnapshot; ++i) {
n += groupsSnapshot[i].activeGroupCount();
}

return n;
}
//判断当前执行这个方法的线程有没有权限更改这个线程组的属性
//如果没有抛出 SecurityException 异常
public final void checkAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccess(this);
}

}

//清除当前线程组和子线程组,需要保证当前线程组和子线程组所有线程都停止
public final void destroy()

// 将当前线程组和其子线程组中的线程拷贝到参数指定的线程数组中,
// 如果线程数组的长度小于线程组中线程的数量,那么多余的线程不会拷贝
int enumerate​(Thread[] list)

// 将当前线程组中的线程拷贝到参数指定的线程数组中,如果 recurse 参数为 true,
// 那么会递归将其子线程组中的线程也拷贝,
// 如果线程数组的长度小于线程组中线程的数量,那么多余的线程不会拷贝
int enumerate​(Thread[] list, boolean recurse)

// 将当前线程组(不包括本身)和其子线程组中的线程组拷贝到参数指定的线程组数组中,
// 如果线程组数组的长度小于线程组的数量,那么多余的线程组不会拷贝
int enumerate​(ThreadGroup[] list)

// 将当前线程组(不包括本身)中的子线程组拷贝到参数指定的线程组数组中,
//如果 recurse 参数为 true,
// 那么会递归将其子线程组中的子线程组也拷贝,
// 如果线程数组的长度小于线程组中线程的数量,那么多余的线程不会拷贝
int enumerate​(ThreadGroup[] list, boolean recurse)

//中断线程组中所有的线程
public void interrupt() {
if (this != currentThread()) {
this.checkAccess();
synchronized(this.blockerLock) {
Interruptible b = this.blocker;
if (b != null) {
this.interrupt0();
b.interrupt(this);
return;
}
}
}

this.interrupt0();
}

/*设置当前线程组中线程允许的最大优先级,线程组中已经存在的优先级大于这个
pri 参数的线程不会受影响,小于这个 pri 参数的线程和之后添加的线
程设置的最大优先级不能超过这个值,如果当前线程组的父线程组不为 null
,那么当前线程组的最大优先级设置为pri和父线程组优先级较小的那一个,
否则直接设置为 pri */
void setMaxPriority​(int pri)

}

一个线程只有调用start方法后才会被加入线程组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private Thread(ThreadGroup g, Runnable target, String name, 
long stackSize, AccessControlContext acc, boolean inheritThreadLocals) {
...
//这是一个ThreadGroup的方法
g.addUnstarted();
...
}
//字面意思理解添加未start的线程
void addUnstarted() {
synchronized(this) {
if (this.destroyed) {
throw new IllegalThreadStateException();
} else {
++this.nUnstartedThreads;
}
}
}
//当线程调用start方法的内部情况
public synchronized void start() {
if (this.threadStatus != 0) {
throw new IllegalThreadStateException();
} else {
//这个方法将线程加入线程组
this.group.add(this);
boolean started = false;

try {
this.start0();
started = true;
} finally {
try {
if (!started) {
this.group.threadStartFailed(this);
}
} catch (Throwable var8) {
}

}

}
}
void add(Thread t) {
synchronized(this) {
if (this.destroyed) {
throw new IllegalThreadStateException();
} else {
if (this.threads == null) {
this.threads = new Thread[4];
} else if (this.nthreads == this.threads.length) {
this.threads = (Thread[])Arrays.copyOf(this.threads,
this.nthreads * 2);
}
//将线程加入线程数组
this.threads[this.nthreads] = t;
++this.nthreads;
--this.nUnstartedThreads;
}
}
}

线程池和线程组

线程组注重对多个线程的管理,线程池注重的是利用多个线程执行大量任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 线程组测试
*/
public static class ThreadGroupTest {

// 获取主线程所在线程组,在主线程中执行
public static void printMainThreadGroup() {
// 获取当前线程所在的线程组对象
ThreadGroup group = Thread.currentThread().getThreadGroup();
System.out.println(Thread.currentThread().getName() +
" 线程所在的线程组:" + group.getName());
}

// 通过 ThreadGroup 批量停止线程
public static void stopThreadsByThreadGroup() {
ThreadGroup tg = new ThreadGroup("线程组1");
// 新建 10 个子线程并添加到 tg 线程组中
for (int i = 0; i < 10; i++) {
new Thread(tg, "子线程" + (i+1)) ,{
@Override
public void run() {
// 当前线程的中断标志为 false 的时候,继续循环
while (!currentThread().isInterrupted()) {
System.out.println(currentThread().getName() + "打印");
}
}
}.start();
}
try {
// 主线程休眠 3 秒
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
// 设置线程中断标志为 true,以中断线程组中的线程
tg.interrupt();
}

public static void startTest() {
stopThreadsByThreadGroup();
}

public static void main(String[] args) {
ThreadGroupTest.startTest();
}

主线程在休眠3s后中断线程组中的所有线程,不需要一个个中断所有线程

ThreadLocal

ThreadLocal代表一个线程局部变量,通过把数据放在ThreadLocal中就可以
让每个线程创建该变量的副本,从而避免并发访问问题

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
This class provides ThreadLocal variables. These variables differ from 
their normal counterparts in that each thread that accesses one (via its get
or set method) has its own, independently initialized copy of the
variable. ThreadLocal objects are typically private static variables
in classes that wish to associate state with a thread (e.g., a user ID
or Transaction ID).
这个类提供线程本地变量,这些变量即每个线程都独自拥有的不同于线程中普通的副本变量
初始化时每个线程副本都独自拥有,ThreadLocal是线程的私有字段(用户ID 交易ID)


Each thread holds an implicit reference to its copy of a ThreadLocal as long a
s the thread is alive and the ThreadLocal object is accessible; after a
thread goes away, all of its copies of ThreadLocal variables are subject
to garbage collection (unless other references to these copies exist).
每个线程都有对线程局部变量副本的隐式引用,只要线程处于活跃状态以及ThreadLocal
是可以访问的那么就可以访问这个隐式引用,线程结束后所有线程保存的ThreadLocal
对象都会被垃圾回收,除非还有引用指向这些对象

每一个使用ThreadLocal的线程都有一个变量副本,每个线程独立改变这个副本,
不会与其他先冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* ThreadLocal 线程变量副本保存测试
*/
public static class ThreadLocalTest {
// 新建一个 ThreadLocal 对象
static ThreadLocal<Integer> value = new ThreadLocal<Integer>();

public static void startTest() {
// 新建 5 个子线程,run 方法中调用新建的 ThreadLocal 对象 value 的
//get/set 方法来获取/设置对应值
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
public void run() {
// 当当前线程中 value 值不大于 5 时候继续循环
while (value.get() <= 5) {
System.out.println(Thread.currentThread().getName() +
" 的 value 值:" + value.get());
// 当前线程的 value 自增一
value.set(value.get() + 1);
}
};
}, "线程 " + (i+1)).start();
}
}
}

public static void main(String[] args) {
ThreadLocalTest.startTest();
}

这个程序会报空指针异常,可是value是新建的一个对象,是否是没有
赋初始值,如果在startTest开头加value.set(0)依然报空指针异常
接下来分析一下get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public T get() {
Thread t = Thread.currentThread();
//通过当前线程得到ThreadLocalMap
ThreadLocal.ThreadLocalMap map = this.getMap(t);
if (map != null) {
ThreadLocal.ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = e.value;
return result;
}
}
//如果对象为空调用这个设置初始值的方法
return this.setInitialValue();
}

ThreadLocal.ThreadLocalMap getMap(Thread t) {
//返回threadLocals
return t.threadLocals;
}
//Thread中定义了这个变量,Thread的构造方法中并未对这个变量赋值
ThreadLocalMap threadLocals;
//threadLocals默认为null,所以会调用以下方法
private T setInitialValue() {
//获取initialValue的返回值
T value = this.initialValue();
Thread t = Thread.currentThread();
ThreadLocal.ThreadLocalMap map = this.getMap(t);
if (map != null) {
map.set(this, value);
} else {
this.createMap(t, value);
}

if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal)this);
}

return value;
}
//返回null,到这里可知get为什么返回null
protected T initialValue() {
return null;
}
//setInitialValue方法中也会给threadLocals创建一个新的对象
//ThreadLocalMap类似一个映射表,键就是当前ThreadLocal对象
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocal.ThreadLocalMap(this, firstValue);
}

创建ThreadLocal对象时重写initialValue方法并返回一个非null的默认值
作为每个线程存储的变量的默认值,调用get方法时就不会返回null值。接下
来看一下ThreadLocalMap的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
static class ThreadLocalMap {
private ThreadLocal.ThreadLocalMap.Entry[] table;
private int size = 0;
private int threshold;
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
this.value = v;
}
}
}

这个类很像HashMap,是ThreadLocal的静态子类,键是一个ThreadLocal对象
调用ThradLocal的get方法时,首先会获取当前线程的ThreadLocalMap对象,
如果这个对象不为空,那么就会获取这个ThreadLocalMap对象的Entry对象,
并获取Entry对象的value返回值,如果为空就会创建ThreadLocalMap对象,
value值就是initialVlaue中的返回值
上图的意思就是多个线程中的ThreadLocalMap将同一个ThreadLocal对象作为
ThreadLocalMap的键,但是存储的值是相互独立的。再来看一下set方法

1
2
3
4
5
6
7
8
9
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocal.ThreadLocalMap map = this.getMap(t);
if (map != null) {
map.set(this, value);
} else {
this.createMap(t, value);
}
}

刚刚的问题也得到了解决,除了在创建ThreadLocal的时候重写initialValue
方法赋初始值,也可以使用set方法赋初始值然后使用get得到,不过这种方法
对每个线程来说都要先使用set赋初始值
也可以这样写

1
2
3
4
5
6
7
static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
// 重写 initialValue 方法,提供给每个线程保存的对象一个默认的值
@Override
protected Integer initialValue() {
return 0;
};
};

并发

乐观锁与悲观锁

参考 https://www.jianshu.com/p/d2ac26ca6525
这其实是一种思想,并不只是关系数据库系统才有这个概念。乐观锁适用于读多
写少的情况,悲观锁适用于写多读少的情况

悲观锁

当对数据库中一条数据进行修改的时候,可以直接对该数据进行加锁来防止并
发,这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称为悲
观并发控制,从名字看这个锁具有独占排他性,指的是对数据被外界修改持保
守态度,在整个数据处理过程中数据处于锁定状态
这是一种对数据持有悲观态度的并发控制方式,总是假设最坏情况,每次读取
数据的时候都默认别的线程会更改数据。悲观锁可以细分为共享锁和排他锁

  1. 共享锁 又称为读锁,简称S锁,共享就是说多个事务对同一数据可以共享
    一把锁,都能访问数据但是只能读不能改
  2. 排他锁 又称为写锁,简称X锁,只有一个事务可以获取数据的排他锁,其
    他事务不能获取,并且获取后可以进行修改

悲观锁为数据处理提供了安全性,但是加锁会让数据库产生额外的开销,还有
可能变为死锁

乐观锁

乐观锁假设数据一般不会产生冲突,所以在数据产生提交更新的时候才会对数
据的冲突与否进行检测,如果冲突则返回给用户错误的信息,让用户决定如何
去做,多用于读操作多的场景,可以提高程序吞吐量
乐观锁并不刻意使用数据库本身的锁机制,而是依据数据本身来保证数据的安
全性,有两种实现方式

  1. CAS实现 Java 中java.util.concurrent.atomic包下面的原子变量使用
    了乐观锁的一种CAS实现方式
  2. 版本号控制:一般是在数据表中加上一个数据版本号version字段,表示数
    据被修改的次数。当数据被修改时,version值会 +1。当线程A要更新数据值时
    ,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的
    version值与当前数据库中的version值相等时才更新,否则重试更新操作
    ,直到更新成功

乐观并发控制相信事务之间的数据竞争概率比较小,所以直到提交才会锁定

具体实现

悲观锁实现

依靠数据库提供的锁机制

  1. 在对记录进行修改前,先尝试为该记录加上排他锁(exclusive locks)
  2. 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者
    抛出异常。具体响应方式由开发者根据实际需要决定
  3. 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了
  4. 期间如果有其他对该记录做修改或加排他锁的操作,都会等待解锁或直
    接抛出异常

要使用悲观锁必须关闭Mysql的自动提交属性,自动提交就是说当执行一个
更新操作Mysql会自动将结果提交

1
2
3
4
5
begin;
//通过for update进行加锁
select quantity from items where id=1 for update;
update items set quantity=2 where id=1;
commit;

乐观锁实现

不需要使用数据库的锁机制,主要就是两个步骤:冲突检测和数据更新。其
实现方式之一就是CAS,CAS是乐观锁技术,当多个线程尝试使用CAS同时
更新一个变量时,只有一个线程能更新变量,其余失败的线程并不会被
挂起,而是告知这次竞争中失败,并可以再次尝试

1
2
3
4
//先查询处商品库存信息,quantity=3
select quantity from items where id=1;
//修改为2
update items set quantity=2 where id=1 and quantity=3;

在更新之前,先查询库存表中当前库存数,然后在update的时候以库存数作
为一个修改条件,当提交更新的时候判断数据库表对应记录的当前库存数与
第一次取出来的库存数进行比对,如果数据库表当前库存数与第一次取出来
的库存数相等,则予以更新,否则认为是过期数据。以上存在一个严重的问
题,也就是ABA问题

  1. 比如说线程one从数据库中取出库存数3,这时候线程two也从数据库中取
    出库存数3,并且线程two进行了一些操作变成了2
  2. 然后线程two又将库存数变成3,这时候线程one进行 CAS 操作发现数据库
    中仍然是3,然后线程one操作成功。
  3. 尽管线程one的 CAS 操作成功,但是不代表这个过程就是没有问题的

有一个很好的办法解决ABA问题,就是通过一个单独的可以递增的version
字段
乐观锁在每次执行数据修改操作时,都会带上一个版本号,一旦版本号和数据
的版本号一致就可以执行修改操作并对版本号执行+1操作,否则执行失败,
因为每次操作版本号都会增加,所以就不会出现ABA问题,还可以使用时间
戳,因为时间戳天然具有顺序递增性

CAS

参考 https://www.jianshu.com/p/98220486426a
CAS Compare And Swap。是解决多线程并行情况下使用锁造成性能损耗的
一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)
如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为
新值,否则处理器不做任何操作。无论哪种情况,它都会在 CAS指令之前
返回该位置的值。CAS 有效地说明了我认为位置V应该包含值A,如果包
含该值,则将B 放到这个位置,否则不要更改该位置,只告诉我这个位
置现在的值即可。在 Java 中,sun.misc.Unsafe 类提供了硬件级别
的原子操作来实现这个CAS。java.util.concurrent包下大量的类都
使用了这个 Unsafe.java 类的 CAS 操作。

CAS的应用

java.util.concurrent.atomic 包下的类大多是使用 CAS 操作来实现的,
比如 AtomicInteger、AtomicBoolean、AtomicLong。一般来说在竞争不是
特别激烈的时候,使用该包下的原子操作性能比使用 synchronized 关键
字的方式高效的多。以下是一个计数例子

1
2
3
4
5
6
7
8
9
10
11
12
//使用synchronized加锁,同一时间只有一个线程获取锁,类似于悲观锁
private int count=0;
public synchronized void add()
{
count++;
}
//Atomic原子类
private AtomicInteger count=new AtomicInteger();
public void add()
{
count.increamentAndGet();
}

多个线程可以并发执行AtomicInteger的incrementAndGet(),意思就是把
count 的值累加1,接着返回累加后最新的值。实际上,Atomic 原子类底层
用的不是传统意义的锁机制,而是无锁化的 CAS 机制,通过 CAS 机制保证
多线程修改一个数值的安全性
如果有三个线程并发修改一个AtomicInteger的值,底层的实现如下

  1. 首先,每个线程都会先获取当前的值。然后执行一个原子的CAS操作
  2. 然后CAS操作里,会比较一下现在的值是不是刚才获取到的那个值。如果是
    ,说明没人改过这个值,那设置成累加1之后的一个值
  3. 在执行CAS的时候,发现之前获取的值跟当前的值不一样,会导致CAS失败
    。失败之后,进入一个无限循环,再次获取值,接着执行CAS操作

CAS性能优化

大量的线程同时并发修改一个AtomicInteger,可能有很多线程会不停的自旋,
进入一个无限重复的循环中。这些线程不停地获取值,然后发起CAS操作,但是
发现这个值被别人改过了,于是再次进入下一个循环,获取值,发起CAS 操作
又失败了,再次进入下一个循环。在大量线程高并发更新AtomicInteger的时
候,这种问题可能会比较明显,导致大量线程空循环,自旋转,性能和效率
都不是特别好。Java8提供了一个新的类LongAdder,这个类尝试使用分段
CAS以及自动分段迁移的方式来大幅度提升多线程高并发执行CAS操作的性

LongAdder 核心思想就是热点分离,这一点和 ConcurrentHashMap 的设计
思想相似。就是将value值分离成一个数组,当多线程访问时通过hash算法映
射到其中的一个数字进行计数。而最终的结果,就是这些数组的求和累加。这
样一来,就减小了锁的粒度

AQS

AQS AbstractQueuedSynchronizer 是JDK下提供的一个用于实现基于FIFO
等待队列的阻塞锁和相关的同步器的一个框架。使用一个原子的 int value
state来作为同步器的状态,例如独占锁,1代表已占有,0代表未占有,通
过该类提供的原子修改status 方法,可以把它同步器的基础框架类来实现
各种同步器,AQS还定义了一个实现了Condition接口的ConditionObject
内部类。Condition将Object 监视器方法 (wait、notify和notifyAll)
分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用

公平锁和非公平锁

参考 https://zhuanlan.zhihu.com/p/115543000
公平锁表示线程获取锁的顺序是按照线程加锁的顺序获取的,即先来先得先进
先出

  • 优点 所有线程都能获取资源
  • 缺点 吞吐量下降,除了第一个线程其余线程都会阻塞,唤醒线程开销大

非公平锁是一种获取锁的抢占机制,随机获得锁。默认情况下ReentrantLock
使用的是非公平锁

  • 优点 可以减少唤醒线程的开销
Author: 高明
Link: https://skysea-gaoming.github.io/2020/10/30/Java%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%863/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.