Java 并发梳理

Java 并发

线程

what:cpu调度的最小单位

创建方式:

  • 继承Thread,实现重写run方法,创建Thread对象,调用start方法。
  • 实现Runnable,实现run方法,创建Runnable对象,传入Thread对象,调用start方法
  • 后文的中Callable

线程的状态:新建,可执行,执行,阻塞,死亡

1561517414845

yield只是让出cpu的使用权,下次还有可能抢占cpu

调用其它线程的join,当前线程会阻塞,知道那个线程执行完毕。

线程安全问题

多线程访并发访问共享资源,会导致数据安全问题。

synchronized 加悲观锁

通过synchronized添加锁,可以加载方法上和代码块上。在被锁起来的地方只会又一个线程进入,直到该代码执行完毕。该实现为jvm内置实现

1
2
3
public synchronized void test(){}

synchronized(obj){}

线程8锁问题:

synchronized锁需要看锁的对象是谁,线程需要锁对象为同一个的情况下,进入了锁区域另外的线程就必须等待。

  • 锁对象为普通对象
  • 所对象为class对象,即在静态方法上加锁

要根据具体锁对象来判断执行效果。持有同一锁对象,操作即为互斥。

Lock 乐观锁

代码实现,需要手动释放锁,不然会造成死锁,需要在finally释放锁

有三个实现类

  • ReentrantLock 可重入锁
  • ReadLock 读锁,通过new ReentrantReadWriteLock .readLock()获得
  • WriteLock写锁,通过new ReentrantReadWriteLock .writeLock()获得

读写锁:读写互斥,写写互斥,读读共享

Condition

在synchronized锁下面,jvm提供了obect.wait,和notify,这两个通信是依赖synchronized关键字的,从等待池到锁池。而在Lock锁下与之对应线程通信的则是Condition对象,condition.await()进入等待和condition.sigal()唤醒

Lock和synchronized的选择

  总结来说,Lock和synchronized有以下几点不同:

  1)Lock是一个接口基于AQS实现,而AQS基于CAS实现,而synchronized是Java中的关键字,synchronized是内置的语言实现;

  2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;

  3)Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;

  4)通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。

  5)Lock可以提高多个线程进行读操作的效率。

  在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。

锁的概念

  1.可重入锁

如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

1
2
3
4
5
6
7
8
9
class MyClass {
public synchronized void method1() {
method2();
}

public synchronized void method2() {

}
}

synchronized和Lock都具备可重入性

2.可中断锁

  可中断锁:顾名思义,就是可以相应中断的锁。

  在Java中,synchronized就不是可中断锁,而Lock是可中断锁。

  如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。

lockInterruptibly()的用法时已经体现了Lock的可中断性。

3.公平锁

  公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。

  非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。

  在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。

  而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

ReentrantLock lock = new ReentrantLock(true);

true 为公平,false为不公平

另外在ReentrantLock类中定义了很多方法,比如:

  isFair() //判断锁是否是公平锁

  isLocked() //判断锁是否被任何线程获取了

  isHeldByCurrentThread() //判断锁是否被当前线程获取了

  hasQueuedThreads() //判断是否有线程在等待该锁

  在ReentrantReadWriteLock中也有类似的方法,同样也可以设置为公平锁和非公平锁。不过要记住,ReentrantReadWriteLock并未实现Lock接口,它实现的是ReadWriteLock接口。

4.读写锁

Volatile关键字

内存可见性是指当某个线程正在使用对象状态而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。当多个线程进行操作共享数据时,可以保证内存中的数据可见。相较于 synchronized 是一种较为轻量级的同步策略

  1. volatile 不具备“互斥性”

  2. volatile 不能保证变量的“原子性”

CAS算法 无锁算法

原子性操作问题,CAS是一种硬件对并发的支持,cpu指令,用于管理对共享数据的访问,CAS是一种无锁的非阻塞算法实现

1、原子变量:在java.util.concurrent.atomic包下提供了基本的原子变量

​ 【1】Volatile保证内存可见性

​ 【2】CAS算法保证数据变量的原子性

2、CAS算法实现CAS包含了三个操作变量:

​ 【1】内存值V

​ 【2】内存预估值A

​ 【3】内存更新值B当且仅当V==A时,V=B;否则会重复尝试。

线程通信
基于synchronized 锁

wait:中断方法的执行,使本线程等待,暂时让出 cpu 的使用权,并允许其他线程使用这个同步方法。

notify:唤醒由于使用这个同步方法而处于等待线程的 某一个结束等待

notifyall:唤醒所有由于使用这个同步方法而处于等待的线程结束等待

锁池:假设线程A已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用这个对象的某个synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前必须先获得该对象的锁的拥有权,但是该对象的锁目前正被线程A拥有,所以这些线程就进入了该对象的锁池中。

等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁后,进入到了该对象的等待池中
所谓唤醒线程,另一种解释可以说是将线程由等待池移动到锁池,notifyAll调用后,会将全部线程由等待池移到锁池,然后参与锁的竞争,竞争成功则继续执行,如果不成功则留在锁池等待锁被释放后再次参与竞争。而notify只会唤醒一个线程。

基于Lock锁

从Lock中获得Condition,类似于上面的方式:

1
2
3
4
5
6
7
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long var1) throws InterruptedException;
boolean await(long var1, TimeUnit var3) throws InterruptedException;
boolean awaitUntil(Date var1) throws InterruptedException;
void signal();
void signalAll();

线程池

what

维护了一些线程的队列,线程创建和销毁是非常消耗资源的。利用线程池来重用这些线程,提高了响应速度

降低 资源消耗

提高响应速度

提高线程的 可管理性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。
*
* 二、线程池的体系结构:
* java.util.concurrent.Executor : 负责线程的使用与调度的根接口
* |--**ExecutorService 子接口: 线程池的主要接口
* |--ThreadPoolExecutor 线程池的实现类
* |--ScheduledExecutorService 子接口:负责线程的调度
* |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
*
* 三、工具类 : Executors
* ExecutorService newFixedThreadPool() : 创建固定大小的线程池
* ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
* ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程
*
* ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
*/
how
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
execute 执行任务
submit 提交任务,并且返回Future
shutDown 中止线程池,设置线程池状态为SHUTDOWN,中断没有运行的线程
shtDownNow 中止线程池,设置线程池状态为SHUTDOWN, 尝试停止所有线程,并返回等待任务队列
不同应用场景使用不同的配置线程
任务的性质:CPU密集型任务、IO密集型任务和混合型任务
任务的优先级:高、中和低
任务的执行时间:长、中和短
任务的依赖性:是否依赖其他系统资源,如数据库连接。
CPU密集型任务
应配置尽可能小的线程,配置
N(CPU)+1
I/O密集型任务
业务读取较多,线程并不是一直在执行任务,则应配置尽可能多的线程
N(CPU) * 2
混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量.如果这两个任务执行时间相差太大,则没必要进行分解.
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

关于线程池构造方法参数说明:

corePoolSize 核心线程数量

maximumPoolSize 最大线程数量

keepAliveTime 存活时间

TimeUnit 时间单位

workQueue 等待队列

拒绝策略
  • AbortPolicy:丢弃任务,抛出 RejectedExecutionException
  • CallerRunsPolicy:只用调用者所在线程来运行任务,有反馈机制,使任务提交的速度变慢)。
  • DiscardOldestPolicy
    若没有发生shutdown,尝试丢弃队列里最近的一个任务,并执行当前任务, 丢弃任务缓存队列中最老的任务,并且尝试重新提交新的任务
  • DiscardPolicy:不处理,丢弃掉, 拒绝执行,不抛异常
    当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略.如记录日志或持久化存储不能处理的任务

一个任务被提交到线程池中,首先会去检查核心线程是否达最大,如果没有则创建新线执行任务,如果核心线程达最大且都在执行任务,下一步将会去判断任务队列是否满,如果没有满则入队列等待,如果满了则创建新的线程,此时线程池最大能达到的线程数量即为最大线程数,当任务执行完毕,线程空闲之后等待存活时间后就会杀死线程。知道线程数量维持在核心线程数量。

这样就好比如一个工厂本来又四个人在工作,但是订单增加了,但是能工厂负责人认为工人还能承受住,于是就将累计的订单来排队,时间一长还是能完成,但是随着订单的增加,工厂的排队已经排不下了。工厂负责人招了了一些了临时工参与工作,随着时间推移订单还在增加,此时排队满了,工人工作的位置也满了。此时工厂负责人就要考虑使用那种策略来对待新的订单。当订单减少了,临时工没事做了,此时工厂负责人为了成本考虑,就要撤下临时工了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int c = ctl.get();
//工作任务小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//创建线程
return;
c = ctl.get();
}
//大于核心线程数,尝试入工作队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}//入队列失败,尝试新增线程,此时线程数即将大于核心线程,但是要小于最大线程数
else if (!addWorker(command, false))
reject(command);//达到最大线程数,采取拒绝策略
关于获取Worker

线程池维持一个Worker的HashSet,Work本身也是一个Runnable,内部包含了一个第一个任务,和一个Thread对象(线程),也就是一个work持有一个线程。这个线程在addWorker成功时,会去开启线程。也就是调用线程池外围的runWorker方法。从这里就开始进入worker的线程任务了,在线程中开启while循环去获得Task,Task的获得同样时循环去阻塞队列拿runnable。如果工作队列空了,拿取超时,则会去减少Worker数量。

1
2
3
4
5
6
7
8
9
10
11
//Worker类
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//addWorke方法 
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//runThis方法
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

参考:https://www.nowcoder.com/discuss/152050?type=0&order=0&pos=6&page=0

JUC

阻塞队列

参考:https://blog.csdn.net/yudiandemingzi/article/details/82318390

内部基于Lock锁的通信实现等待阻塞。

并发容器

ConcurrentMap

CopyOnWriteList

原子变量

硬件级别实现,普通变量int进行自增时,在jvm表现看来,这不是一个原子操作,所以线程就有可能打断,从而造成数据安全问题。

而原子变量通过使用volatile来保证内存可见性以及通过cas算法保证原子性。

CountDownLatch 闭锁

也可以叫倒计数器

what:

1
2
3
4
5
6
//创建计数为10的闭锁
CountDownLatch countDownLatch = new CountDownLatch(10);
//计数减一
countDownLath.countDown();
//线程等待,直到计数为0
countDownLath.await();

在线程完成某项操作之前它允许让一个或多个线程进行等待。倒计数,在内部计数到零之前,所调用countDownLatch.await的线程会等待,直到计数到达0为止。

how:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package learn2;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatchTest test = new CountDownLatchTest();
test.runTest();
}

private void runTest() {
countDownLatch = new CountDownLatch(2);
Thread threadA = new Thread(runnable);
Thread threadB = new Thread(runnable);
threadA.start();
threadB.start();
try {
System.out.println("Main is waiting.");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main is over");
}

CountDownLatch countDownLatch;

Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Current Thred is " + Thread.currentThread().getId() + ",print : " + i);
}
countDownLatch.countDown();
}
};
}

//结果
Main is waiting.
Current Thred is 12,print : 0
Current Thred is 13,print : 0
Current Thred is 12,print : 1
Current Thred is 13,print : 1
Current Thred is 12,print : 2
Current Thred is 12,print : 3
Current Thred is 12,print : 4
Current Thred is 12,print : 5
Current Thred is 13,print : 2
Current Thred is 12,print : 6
Current Thred is 13,print : 3
Current Thred is 12,print : 7
Current Thred is 13,print : 4
Current Thred is 12,print : 8
Current Thred is 13,print : 5
Current Thred is 13,print : 6
Current Thred is 12,print : 9
Current Thred is 13,print : 7
Current Thred is 13,print : 8
Current Thred is 13,print : 9
Main is over
CyclicBarrier

What:栅栏,类似于闭锁,但是呢,闭锁是等待事件,即线程等待到countDown数量为0为止。而栅栏则是,线程到达栅栏开始等待,知道所有线程都达到栅栏了,就不再等待。而且闭锁使用一次,栅栏可以重置,继续使用。

书上是这么说的:你希望创建一组任务,使他们并行执行,然后下一个步骤之前等待,直到所有任务完成,它使得所有并行任务在栅栏处排队,因此可以一致的向前移动。

How:

1
2
3
4
5
6
7
//构造方法
CyclicBarrier(int parties);//parties 表示会有多少到达的栅栏的线程
CyclicBarrier(int parties, Runnable barrierAction);//barrierAction 所有线程到达栅栏,会触发

//关键方法
await();//某线程到达栅栏,并且等待,知道所有线程都到达栅栏
reset();//重置栅栏到初始状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package learn2;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrierTest test = new CyclicBarrierTest();
test.runTest();
}
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

private void runTest() {
for (int i = 0 ; i < 5; i++) {
Thread thread = new Thread(runnable, "thread " + i);
thread.start();
}
System.out.println("for is over");
}

private Runnable runnable = () -> {
System.out.println( Thread.currentThread().getName() + " waited");
try {
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " wake up");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
};
}

//outputs 大概就是这个效果

for is over
thread 2 waited
thread 1 waited
thread 3 waited
thread 0 waited
thread 4 waited
thread 4 wake up
thread 1 wake up
thread 0 wake up
thread 3 wake up
thread 2 wake up

Process finished with exit code 0
Callable

What:目前的Runnable执行方式是无法返回数据的。使用Callable创建线程以及配合FutureTask使用

FurtureTask.get时会所在线程锁等FurtureTask执行完毕

How:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.FutureTask;

/*
* 一、创建执行线程的方式三:实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。
*
* 二、执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 FutureTask 是 Future 接口的实现类
*/
public class TestCallable {

public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();

//1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
FutureTask<Integer> result = new FutureTask<>(td);

new Thread(result).start();

//2.接收线程运算后的结果
try {
Integer sum = result.get(); //FutureTask 可用于 闭锁
System.out.println(sum);
System.out.println("------------------------------------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

}

class ThreadDemo implements Callable<Integer>{

@Override
public Integer call() throws Exception {
int sum = 0;

for (int i = 0; i <= 100000; i++) {
sum += i;
}

return sum;
}

}
AQS

基于CAS实现的同步器,内部维持一个线程的队列,通过CAS标志锁的状态信息,控制队列的线程执行或者等待阻塞。

CAS

CompareAndSwap

比较内存值和内存预估值是否等同,是则修改内存值,否则重复尝试。

基于硬件指令支持的线程安全的操作,通过自旋锁去尝试修改变量,不成功则重复尝试,成功则解开自旋。这么一个过程抽象的看成是线程安全,基于CAS的状态修改,可以更加具体的控制实现线程并发问题。

ForkJoinPool

What:继承自AbstractExecutorService,是一种特殊的线程池,通过分治法来分拆合并任务,使用有限的线程进行处理任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.atguigu.juc;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

import org.junit.Test;

public class TestForkJoinPool {

public static void main(String[] args) {
Instant start = Instant.now();

ForkJoinPool pool = new ForkJoinPool();

ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);

Long sum = pool.invoke(task);

System.out.println(sum);

Instant end = Instant.now();

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//166-1996-10590
}

@Test
public void test1(){
Instant start = Instant.now();

long sum = 0L;

for (long i = 0L; i <= 50000000000L; i++) {
sum += i;
}

System.out.println(sum);

Instant end = Instant.now();

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//35-3142-15704
}

//java8 新特性
@Test
public void test2(){
Instant start = Instant.now();

Long sum = LongStream.rangeClosed(0L, 50000000000L)
.parallel()
.reduce(0L, Long::sum);

System.out.println(sum);

Instant end = Instant.now();

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//1536-8118
}

}

class ForkJoinSumCalculate extends RecursiveTask<Long>{

/**
*
*/
private static final long serialVersionUID = -259195479995561737L;

private long start;
private long end;

private static final long THURSHOLD = 10000L; //临界值

public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long length = end - start;

if(length <= THURSHOLD){
long sum = 0L;

for (long i = start; i <= end; i++) {
sum += i;
}

return sum;
}else{
long middle = (start + end) / 2;

ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork(); //进行拆分,同时压入线程队列

ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
right.fork(); //

return left.join() + right.join();
}
}

}