多线程
多线程
- 1: 方法一: 继承Thread
- 2: 方法二实现Runnable接口
- 3: 方法三实现Callable接口
- 4: 方法四Executors框架
- 5: 线程池
- 6: 线程交替执行的实践
- 7: Callable捕获异常
1 - 方法一: 继承Thread
import java.io.IOException;
/**
* 方法一: 继承Thread
*/
public class M01ExtentThread {
public static void main(String[] args) throws IOException {
M01ExtentThread test = new M01ExtentThread();
MyThread thread1 = test.new MyThread();
MyThread thread2 = test.new MyThread();
thread1.setName("线程1");
thread2.setName("线程2");
thread1.start();
thread2.start();
System.out.println("主线程输出");
}
class MyThread extends Thread{
@Override
public void run() {
for(int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName()+"线程输出"+i);
}
}
}
}
2 - 方法二实现Runnable接口
/**
* 方法2实现Runnable接口
*/
public class M02ImplentRunnable {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
for(int i = 0; i < 3; i++) {
System.out.println("runnablex线程输出"+i);
}
}
};
new Thread(runnable).start();
System.out.println("主线程输出");
}
}
3 - 方法三实现Callable接口
package cn.anzhongwei.lean.demo.thread;
import java.util.Random;
import java.util.concurrent.*;
/**
* 和Runnable接口不一样,Callable接口提供了一个call()方法作为线程执行体,call()方法比run()方法功能要强大。
*
* call()方法可以有返回值
*
* call()方法可以声明抛出异常
*
* Java5提供了Future接口来代表Callable接口里call()方法的返回值,并且为Future接口提供了一个实现类FutureTask,这个实现类既实现了Future接口,还实现了Runnable接口。
* 因此可以作为Thread类的target。在Future接口里定义了几个公共方法来控制它关联的Callable任务。
* public interface Future<V> {
* //视图取消该Future里面关联的Callable任务
* boolean cancel(boolean mayInterruptIfRunning);
* //如果在Callable任务正常完成前被取消,返回True
* boolean isCancelled();
* //若Callable任务完成,返回True
* boolean isDone();
* //返回Callable里call()方法的返回值,调用这个方法会导致程序阻塞,必须等到子线程结束后才会得到返回值
* V get() throws InterruptedException, ExecutionException;
* //返回Callable里call()方法的返回值,最多阻塞timeout时间,经过指定时间没有返回抛出TimeoutException
* V get(long timeout, TimeUnit unit)
* throws InterruptedException, ExecutionException, TimeoutException;
* }
*
* 介绍了相关的概念之后,创建并启动有返回值的线程的步骤如下:
*
* 1】创建Callable接口的实现类,并实现call()方法,然后创建该实现类的实例(从java8开始可以直接使用Lambda表达式创建Callable对象)。
*
* 2】使用FutureTask类来包装Callable对象,该FutureTask对象封装了Callable对象的call()方法的返回值
*
* 3】使用FutureTask对象作为Thread对象的target创建并启动线程(因为FutureTask实现了Runnable接口)
*
* 4】调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
*/
public class M03ImplementCallable01 {
public static void main(String[] args) {
//1. 此处使用的是匿名类方式
// Callable<String> callable = new Callable<String>() {
// public String call() throws Exception {
// for(int i = 0; i < 3; i++) {
// System.out.println("callable线程输出"+i);
// }
// return "返回随机数"+(new Random().nextInt(100));
// }
// };
//2. 使用静态内部类
Callable<String> callable = new InnerStaticCallable();
//3. 使用外部实现类。略
FutureTask<String> future = new FutureTask<String>(callable);
Thread thread = new Thread(future);
thread.start();
try {
String resI = future.get();
System.out.println(resI);
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
}
//此种方法可以用, 但是不明确应用场景, 返回资源的值是在new FutureTask中传入的,
//runnable接口实现直接使用Thread的start方法运行就行,传到FutureTask中有啥用
Runnable runnable = new Runnable() {
@Override
public void run() {
for(int i = 0; i < 3; i++) {
System.out.println("Runnable线程输出"+i);
}
}
};
FutureTask<Integer> target = new FutureTask<>(runnable, 12);
Thread thread1 = new Thread(target);
thread1.start();
try {
Integer resI1 = target.get();
System.out.println(resI1);
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
}
System.out.println("主线程输出");
}
//也可以使用内部类方式
static class InnerStaticCallable implements Callable<String> {
@Override
public String call() {
for(int i = 0; i < 3; i++) {
System.out.println("callable线程输出"+i);
}
return "返回随机数"+(new Random().nextInt(100));
}
}
}
4 - 方法四Executors框架
import java.util.concurrent.*;
/**
* 1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。
* 要执行任务的人只需把Task描述清楚,然后提交即可。
* 这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。
* 具体点讲,提交一个Callable对象给ExecutorService(如最常用的线程池ThreadPoolExecutor),将得到一个Future对象,调用Future对象的get方法等待执行结果就好了。
* Executor框架的内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。
* 因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,
* 还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。
*
* 关于ExecutorService执行submit和execute的区别
* 1、接收的参数不一样
* 2、submit有返回值,而execute没有
* 3、submit方便Exception处理
* 如果你在你的task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过捕获Future.get抛出的异常。
*/
public class M04ExccutorCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable(){
@Override
public void run() {
for(int i = 0; i < 3; i++) {
System.out.println("Runnable接口"+i);
}
}
};
//执行Runnable接口,无返回值
executorService1.execute(runnable);
executorService1.shutdown();
//执行Callable接口,有返回值
Callable<String> stringCallable = new Callable(){
@Override
public String call() throws Exception {
for(int i = 0; i < 3; i++) {
System.out.println("Callable接口"+i);
}
return "Hello World";
}
};
Future<String> submit2 = executorService2.submit(stringCallable);
executorService2.shutdown();
//get操作会造成执行线程的阻塞
String submit2Res = submit2.get();
System.out.println(submit2Res);
System.out.println("main Thread output");
}
}
5 - 线程池
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TestThreadPoolExecutor {
/*
corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
值得注意的是,如果使用了无界的任务队列这个参数就没用了
执行:
1、2任务直接压入1、2线程直接创建并执行,
3任务放入缓冲队列,最大线程数未满,创建新线程执行3任务
*/
public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = 2;
int maximumPoolSize = 10;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory);
executor.prestartAllCoreThreads(); // 预启动所有核心线程
for (int i = 1; i <= 10; i++) {
// Thread.sleep(1);
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
System.out.println("当前队列数两"+executor.getQueue().size());
}
System.out.println(executor.getActiveCount());
Thread.sleep(10000);
System.out.println(executor.getActiveCount());
System.in.read(); //阻塞主线程
}
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
// try {
// Thread.sleep(200);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
Thread.sleep(1000); //让任务执行慢点
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
6 - 线程交替执行的实践
线程交替执行的实践 也就是 1a2b3c4d交替输出的问题
- 只能保证交替执行, 至于先输出1 还是先输出A 谁先抢到o锁谁就输出
public class Syncwaitnotify1 {
public static void main(String[] args) {
final Object o = new Object();
char[] aI = "123456789".toCharArray();
char[] aC = "ABCDEFGHI".toCharArray();
new Thread(()->{
//1. 先拿到o锁
synchronized (o) {
for(char c : aI) {
//2. 执行
System.out.print(c);
try {
o.notify();//唤醒线程队列中得下一个线程, 这里只有两个,就相当于唤醒另一个
o.wait();//让出锁,重新进入等待队列
}catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t1").start();
new Thread(()->{
//1. 和上面得Thread强锁
synchronized (o) {
for(char i : aC) {
System.out.print(i);
try {
o.notify();
o.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t2").start();
}
}
- 先等待一直等到CountDownLatch计数器为0, 所以这个程序一定先输出A
import java.util.concurrent.CountDownLatch;
public class Syncwaitnotify2 {
/*
CountDownLatch latch = new CountDownLatch(n);
latch.countDown();每执行一次,n-1
*/
private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) {
final Object o = new Object();
char[] aI = "123456789".toCharArray();
char[] aC = "ABCDEFGHI".toCharArray();
new Thread(()->{
try{
latch.await();
}catch (InterruptedException e) {
e.printStackTrace();
}
//1. 先拿到o锁
synchronized (o) {
for(char c : aI) {
//2. 执行
System.out.print(c);
try {
o.notify();//唤醒线程队列中得下一个线程, 这里只有两个,就相当于唤醒另一个
o.wait();//让出锁,重新进入等待队列
}catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t1").start();
new Thread(()->{
//1. 和上面得Thread强锁
synchronized (o) {
for(char i : aC) {
System.out.print(i);
latch.countDown();
try {
o.notify();
o.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t2").start();
}
}
- 通过volatile 实现 volatile是原子操作并且线程可见,也可以保证顺序执行
/**
* 如果t1先拿到锁,此时t25 = false 在while中不断得会让出锁
* 等t2拿到锁后, 设置t25=true就可以正常交替执行了
*/
public class Syncwaitnotify3 {
private static volatile boolean t25 = false;
public static void main(String[] args) {
final Object o = new Object();
char[] aI = "123456789".toCharArray();
char[] aC = "ABCDEFGHI".toCharArray();
new Thread(()->{
//1. 如果t1先拿到锁
synchronized (o) {
//2. 此时t25=false
while (!t25) {
try {
//3. 让出线程, 进入等待队列
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for(char c : aI) {
System.out.print(c);
try {
o.notify();//唤醒线程队列中得下一个线程, 这里只有两个,就相当于唤醒另一个
o.wait();//让出锁,重新进入等待队列
}catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t1").start();
new Thread(()->{
//4. 此时t2后拿到锁
synchronized (o) {
for(char i : aC) {
System.out.print(i);
t25 = true;
try {
o.notify();
o.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t2").start();
}
}
- 指定线程锁,相互唤醒,第一次也使用latch.await(); 保证, 和2差不多
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockCondition {
public static void main(String[] args) {
char[] aI = "123456789".toCharArray();
char[] aC = "ABCDEFGHI".toCharArray();
Lock lock = new ReentrantLock();
Condition conditionT1 = lock.newCondition();
Condition conditionT2 = lock.newCondition();
CountDownLatch latch = new CountDownLatch(1);
new Thread(()->{
try {
//1. 启动线程直接进入等待
latch.await();
}catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
for(char c : aI) {
System.out.print(c);
conditionT2.signal();
conditionT1.await();
}
conditionT2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(()->{
lock.lock();
try {
//2. 线程启动直接进入循环进行输出
for(char c : aC) {
System.out.print(c);
//3. 标记完成
latch.countDown();
//4. 唤醒conditionT1队列
conditionT1.signal();
conditionT2.await();
}
conditionT1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
}
- LockSupport的实现
import java.util.concurrent.locks.LockSupport;
public class LockSupport1 {
static Thread t1=null, t2=null;
public static void main(String[] args) {
char[] aI = "123456789".toCharArray();
char[] aC = "ABCDEFGHI".toCharArray();
t1 = new Thread(() -> {
for(char c: aI) {
LockSupport.park();
System.out.print(c);
LockSupport.unpark(t2);
}
},"t1");
t2 = new Thread(() -> {
for(char c: aC) {
System.out.print(c);
LockSupport.unpark(t1);
LockSupport.park();
}
},"t2");
t1.start();
t2.start();
}
}
- TransferQueue的实现
import java.util.concurrent.LinkedTransferQueue;
public class TransferQueue1 {
public static void main(String[] args) {
char[] aI = "123456789".toCharArray();
char[] aC = "ABCDEFGHI".toCharArray();
java.util.concurrent.TransferQueue<Character> queue = new LinkedTransferQueue<>();
new Thread(() ->{
try{
for(char c: aI) {
System.out.print(queue.take());
queue.transfer(c);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() ->{
try{
for(char c: aC) {
queue.transfer(c);
System.out.print(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
7 - Callable捕获异常
import java.util.concurrent.*;
/**
* Callable捕获异常
*/
public class CallableGetThrow {
public static void main(String[] args){
int timeout = 2;
ExecutorService executor = Executors.newSingleThreadExecutor();
Boolean result = false;
Future<Boolean> future = executor.submit(new TaskThread("发送请求"));//将任务提交给线程池
try {
System.out.println("try");
result = future.get(timeout, TimeUnit.SECONDS);//获取结果
future.cancel(true);
System.out.println("发送请求任务的返回结果:"+result); //2
} catch (InterruptedException e) {
System.out.println("线程中断出错。"+e);
future.cancel(true);// 中断执行此任务的线程
} catch (ExecutionException e) {
System.out.println("线程服务出错。");
future.cancel(true);
} catch (TimeoutException e) {// 超时异常
System.out.println("超时。");
future.cancel(true);
}finally{
System.out.println("线程服务关闭。");
executor.shutdown();
}
}
static class TaskThread implements Callable<Boolean> {
private String t;
public TaskThread(String temp){
this.t= temp;
}
/*
try
继续执行..........
发送请求任务的返回结果: true
线程服务关闭。
*/
// public Boolean call() throws InterruptedException {
// Thread.currentThread().sleep(1000);
// System.out.println("继续执行..........");
// return true;
// }
/*
try
超时。
线程服务关闭。
*/
// public Boolean call() throws InterruptedException {
// Thread.currentThread().sleep(3000);
// System.out.println("继续执行..........");
// return true;
// }
/*
* try
* start
* 线程服务出错。
* 线程服务关闭。
*/
public Boolean call() throws InterruptedException {
System.out.println("start");
throw new InterruptedException();
}
}
}