Java线程池应用实例

线程池的学习

  • 基本概念
    • 好处
    • 应用场景
    • ThreadPoolExecutor
    • 实例理解:
    • 执行流程
  • 自定义线程池
    • 4大核心参数
    • 测试demo
    • 输出结果:
    • 结论
  • ExecutorService
    • 常用方法
    • 思考
    • 获取ExecutorService
    • 代码示例
    • 总结
  • ScheduleExecutorService
    • 常用获取方式如下
    • ScheduledExecutorService常用方法如下:
    • 代码示例:
    • 总结:
  • Future
    • Future的常用方法如下:
    • 代码示例
  • 综合案例
    • 秒杀商品
      • 思路提示
      • 代码步骤:
      • 代码示例:
    • ATM机取款
      • 思路提示:
      • 代码示例
  • 线程池的五大使用步骤:

基本概念

线程池是多线程的一种处理方式,处理过程中将任务添加到队列中,线程创建完成后自动启动这些任务,任务就是实现了Runnable或Callable接口的实例对象

好处

可以根据系统需求和硬件环境灵活控制线程的数量,对线程进行统一管理

应用场景

1、网购秒杀
2、云盘上传和下载
3、12306网上购票

ThreadPoolExecutor

public ThreadPoolExecutor
(int corePoolSize,//核心线程数:当一个任务提交到线程池中,如果当前运行的线程数量小于核心线程数,会新开一个线程来执行任务
int maximumPoolSize,//最大线程数:当大于核心线程数时,可以设置一个最大线程数
long keepAliveTime,//最大空闲时间(存活时间):当一个任务没有运行时候,task可以存活的时间
TimeUnit unit,//时间单位:枚举类型,可设置天、小时、时分秒等
BlockingQueue<Runnable> workQueue,//任务队列(临时缓冲区):当任务达到核心线程数量时,再有task提交到线程池需要在任务队列先排队,当任务队列满了之后会根据最大线程数创建新线程
ThreadFactory threadFactory,//线程工厂:创建线程的工厂
RejectedExecutionHandler handler) {//饱和处理机制:当核心线程数、最大线程数贺任务队列都满了需要处理的事情						  

实例理解:

假设某银行营业部有两名正式工,一名临时工,一个空闲等待座位,当有a、b、c、d客户依次来办理业务,a、b由两名正式工接待工作,c客户由临时工接待,d客户在座位等待,如果营业部是线程池,核心线程数(两名正式工)就是2,最大线程数(外加临时工)为3,任务队列(座位缓冲区)为1,当此时来了客户e办理业务,银行只能按照饱和处理机制拒绝接待客户e,当营业部临时工空闲时间超过1个小时后,经理就会让临时工(线程销毁)下班,有一个allowCoreThreadTimeOut变量控制是否允许销毁核心线程,默认为false,即时正式工闲着也不得提前下班

执行流程

自定义线程池

4大核心参数

1、核心线程数
按照8020原则设计,例如:一个任务执行需要0.1秒,系统80%每秒产生100个任务,那么一秒钟处理完需要10个线程,核心线程数就是10
2、任务队列长度
核心线程数/单个任务执行时间*2即可,任务队列长度为200
3、最大线程数
最大线程数还需要参照每秒产生的最大任务数,例如:系统每秒产生的最大任务数为1000,最大线程数=(最大任务数-任务队列长度)*单个任务执行时间
即最大线程数=(1000-200)*0.1=80个
4、最大空闲时间
根据系统参数设定,没有固定的参考值

测试demo

1:编写任务类(MyTask),实现Runnable接口;
2:编写线程类(MyWorker),用于执行任务需要持有所有任务
3:编写线程池类(MyThreadPool),包含提交任务执行任务的能力:
4:编写测试类(MyTest),创建线程池对象提交多个任务测试
MyTask

/**
 * 包含任务编号,每个任务执行时间为0.2秒
 */
public class MyTask implements Runnable {

    private int id;

    /**
     * 由于run方法是重写接口中的方法,id属性初始化必须使用构造方法完成
     */
    public MyTask(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
//        System.out.println("线程:" + name + "线程即将执行任务" + id);
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程:" + name + "线程完成了任务" + id);
    }

    @Override
    public String toString() {
        return "MyTask{" +
                "id=" + id +
                '}';
    }
}

MyWork

import java.util.List;

/**
 * 设计一个集合,用于保存所有任务
 */
public class MyWork extends Thread {

    //保存线程的名字
    private String name;
    private List<Runnable> tasks;

    public MyWork(String name, List<Runnable> tasks) {
        super(name);
        this.tasks = tasks;
    }

    @Override
    public void run() {
        //判断集合中是否有任务,只要有就一直执行任务
        while (tasks.size() > 0) {
            Runnable runnable = tasks.remove(0);
            runnable.run();
        }
    }
}

MyThreadPool

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/**
 * 这是自定义的线程池类;
 * 成员变量:
 * 1:任务队列集合需要控制线程安全问题
 * 2:当前线程数量
 * 3:核心线程数量
 * 4:最大线程数量
 * 5:任务队列的长度
 * 成员方法
 * 1: 提交任务;
 * 将任务添加到集合中,需要判断是否超出了任务总长度2: 执行任务;
 * 判断当前线程的数量,决定创建核心线程还是非核心线程
 */
public class MyThreadPool {

    /**
     * 1:任务队列集合需要控制线程安全问题
     * 2:当前线程数量
     * 3:核心线程数量
     * 4:最大线程数量
     * 5:任务队列的长度
     */
    private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>());
    private int num;
    private int corePoolSize;
    private int maxSize;
    private int workSize;

    public MyThreadPool(int corePoolSize, int maxSize, int workSize) {
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.workSize = workSize;
    }

    public void submit(Runnable runnable) {
        //判断当前集合中任务数量是否超出了最大任务数量
        if (tasks.size() >= workSize) {
            System.out.println("任务:" + runnable + "被丢弃了...");
        } else {
            tasks.add(runnable);
            //执行任务
            execTask(runnable);
        }

    }

    private void execTask(Runnable runnable) {
        //判断当前线程池中的线程总数量,是否超出了核心数
        if (num < corePoolSize) {
            new MyWork("核心线程:" + num, tasks).start();
            num++;
        } else if (num < maxSize) {
            new MyWork("非核心线程:" + num, tasks).start();
            num++;
        } else {
            System.out.println("任务:" + runnable + "被缓存了...");
        }
    }

}

JavaTest

/**
 * 1、创建线程池对象
 * 2、提交多个任务
 */
public class JavaTest {
    public static void main(String[] args) {
        //1、创建线程池对象
        MyThreadPool pool = new MyThreadPool(2, 4, 20);
        //2.提交多个任务:当线程超过24时会出现线程被丢弃现象
        for (int i = 0; i < 3; i++) {
            //3.创建任务对象,并提交给线程池
            MyTask myTask = new MyTask(i);
            pool.submit(myTask);
        }
    }
}

输出结果:

当for循环中i为2,只执行两个核心线程
当for循环中i为4,执行两个核心线程,两个非核心线程
当for循环中i为5,执行两个核心线程,两个非核心线程,一个线程被缓存
当for循环中i为25,执行两个核心线程,两个非核心线程,20个线程被缓存,1个缓存被丢弃

结论

2个核心线程,2个非核心线程,20个任务队列,之后都会被饱和处理机制给丢弃

ExecutorService

ExecutorService接口是java内置的线程池接口,通过学习接口中的方法,可以快速掌握java内置线程池的基本使用

常用方法

<T> Future<T> submit(Callable<T> task):执行带返回值的任务,返回一个Future对象
Future<?> submit(Runnable task):执行 Runnable 任务,并返回一个表示该任务的 Future.
<T> Future<T> submit(Runnable task,T result):执行 Runnable 任务,并返回一个表示该任务的 Future.
void shutdown:启动一次顺序关闭,执行以前提交的任务,但不接受新任务.
List<Runnable> shutdownNow:停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表

思考

既然ExecutorService是一个接口,接口是无法直接创建对象的,那么我们该如何获取ExecutorService的对象呢?

获取ExecutorService

可以利用JDK中的Executors 类中的静态方法常用获取方式如下:

static ExecutorService newCachedThreadPool():创建一个默认的线程池对象,里面的线程数不固定,且在第一次使用时才创建,适合硬件条件好的情况
static ExecutorService newFixedThreadPool(int nThreads):创建一个有固定线程数的线程池
static ExecutorService newSingleThreadExecutor():创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程

static ExecutorService newCachedThreadPool(ThreadFactory threadFactory):线程池中的所有线程都使用ThreadFrctory来创建这样的线程无需手动启动,自动执行
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory):创建一个有固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)创建一个使用单个 worker 线程的 Executor,且线程池中的所有线程都使用ThreadFactory来创建

代码示例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
 * 练习Executors获取ExecutorService,然后调用方法,提交任务
 */
public class MyTest01 {
    public static void main(String[] args) {
        //1、使用工厂类获取线程池对象
        ExecutorService service = Executors.newCachedThreadPool();
        //指定3个线程执行
//        ExecutorService service = Executors.newFixedThreadPool(3);
        //单一线程执行
//        ExecutorService service = Executors.newSingleThreadExecutor();
        //2、提交任务
//        test01(service);
        test02();
        //关闭线程池:在单一线程才能看到效果
//        service.shutdown();
        //一旦关闭无法再提交
//        service.submit(new MyRunnable01(888));
    }

    private static void test02() {
        //单一线程执行
//        ExecutorService service = Executors.newSingleThreadExecutor(new ThreadFactory()
        //指定3个线程执行
//        ExecutorService service = Executors.newFixedThreadPool(3,new ThreadFactory()
        ExecutorService service = Executors.newCachedThreadPool(new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "自定义的线程名称" + n++);
            }
        });
        for (int i = 0; i < 10; i++) {
            //如果线程在忙会重新创建新的线程,当线程空闲到一定时间会自动销毁,默认时间为60s
            service.submit(new MyRunnable01(i));
        }
        //立刻关闭线程池,线程池中有缓存未执行则取消执行,并返回这些任务
//        List<Runnable> runnableList = service.shutdownNow();
//        System.out.println(runnableList);
    }

    private static void test01(ExecutorService service) {
        for (int i = 0; i < 10; i++) {
            //如果线程在忙会重新创建新的线程,当线程空闲到一定时间会自动销毁,默认时间为60s
            service.submit(new MyRunnable01(i));
        }
    }
}

/**
 * 任务类:在任务中打印出哪一个线程正在执行任务
 */
class MyRunnable01 implements Runnable {

    private int id;

    public MyRunnable01(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了任务..." + id);
    }

    @Override
    public String toString() {
        return "MyRunnable01{" +
                "id=" + id +
                '}';
    }
}

单一线程shutdownNow执行结果:

自定义的线程名称1执行了任务...0
[java.util.concurrent.FutureTask@548c4f57, 
java.util.concurrent.FutureTask@1218025c, 
java.util.concurrent.FutureTask@816f27d, 
java.util.concurrent.FutureTask@87aac27, 
java.util.concurrent.FutureTask@3e3abc88, 
java.util.concurrent.FutureTask@6ce253f1, 
java.util.concurrent.FutureTask@53d8d10a, 
java.util.concurrent.FutureTask@e9e54c2, 
java.util.concurrent.FutureTask@65ab7765]

总结

1、前三个创建service实例,然后调用submit提交线程执行
2、后三种在类中传入匿名内部类,重写newThread方法,在方法体里实现具体业务

ScheduleExecutorService

当我们的线程池提交后需要延迟一定时间来执行任务,ExecutorService就不适用了,Java给了内置线程池ScheduleExecutorService
ScheduledExecutorService是ExecutorService的子接口,具备了延迟运行或定期执行任务的能力

常用获取方式如下

static ScheduledExecutorService newSingleThreadScheduledExecutor:(创建一个单线程执行程序,它允许在给定延迟后运行命令或者定期地执行
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory):创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory):创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建,且允许延迟运行或定期执行任务

ScheduledExecutorService常用方法如下:

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):时间单位是unit,数量是delay:延迟后执行callable
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):时间单位是unit;数量是delay:延迟后执行command
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):时间单位是unit,数量是initialDelay,每间隔period时间重复执行一次command
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):间隔delay时间后首次执行,然后再间隔delay时间再执行

代码示例:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * 测试ScheduleExecutorService接口中延迟执行任务和重复执行任务的功能
 */
public class ScheduleExecutorService01 {
    public static void main(String[] args) {
        //1、获取一个具备延迟执行任务的线程池对象
        ScheduledExecutorService scheduledThreadPool1 = Executors.newScheduledThreadPool(3);
        //2、创建多个任务对象,提交任务,每个任务延迟2秒执行
        scheduledThreadPool1.schedule(new MyRunnable(1), 2, TimeUnit.SECONDS);
        System.out.println("over");
        ScheduledExecutorService scheduledThreadPool2 = Executors.newScheduledThreadPool(3, new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "自定义线程名:" + n++);
            }
        });
        //2、创建多个任务对象,提交任务,延迟1秒执行第一个任务,之后延迟2秒执行
        scheduledThreadPool2.scheduleAtFixedRate(new MyRunnable(1), 1, 2, TimeUnit.SECONDS);
        System.out.println("over");
        ScheduledExecutorService scheduledThreadPool3 = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "自定义线程名:" + n++);
            }
        });
        //2、创建多个任务对象,提交任务,延迟1秒执行第一个任务,之后延迟2秒执行
        scheduledThreadPool3.scheduleWithFixedDelay(new MyRunnable(1), 1, 2, TimeUnit.SECONDS);
        System.out.println("over");
    }
}

class MyRunnable implements Runnable {

    private int id;

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了任务:" + id);
    }
}

总结:

schedule:延迟多久执行一次
scheduleAtFixedRate:间隔多久重复去执行
scheduleWithFixedDelay:第一个任务执行完成后,间隔多久去执行下一个任务

Future

刚刚在学习java内置线程池使用时,没有考虑线程计算的结果,但开发中,我们有时需要利用线程进行一些计算,然后获取这些计算的结果,而java中的Future接口专门用于描述异步计算结果,我们可以通过Future 对象获取线程计算的结果:

Future的常用方法如下:

boolean cancel(boolean maylnterruptlfRunning)
取消任务
V get()
获取计算完成后的结果
V get(long timeout, TimeUnit unit)
等待给定的时间后获取其结果
boolean isCancelled()
在任务正常完成前取消,则返回 true。
boolean isDone()
任务已完成,则返回 true。

代码示例

import java.util.concurrent.*;

/**
 * 练习异步计算结果
 */
public class FutureTest {
    public static void main(String[] args) throws Exception {
        //1、获取线程池对象
        ExecutorService executorService = Executors.newCachedThreadPool();
        //2、创建Callable类型的任务对象
        Future<Integer> future = executorService.submit(new MyCall(1, 2));
        //3、判断任务是否完成
//        test01(future);
//        future.cancel(true);
//        boolean cancelled = future.isCancelled();
//        System.out.println("任务执行的结果是:" + cancelled);
        //由于等待时间过短,任务来不及执行完成会报异常
        Integer integer = future.get(1, TimeUnit.SECONDS);
        System.out.println("任务执行的结果是:" + integer);
    }

    private static void test01(Future<Integer> future) throws InterruptedException, ExecutionException {
        boolean done = future.isDone();
        System.out.println("第一次判断任务是否完成:" + done);
        boolean cancelled = future.isCancelled();
        System.out.println("第一次判断任务是否取消:" + cancelled);
        Integer result = future.get();//一直等待任务的执行,直到执行完成为止
        System.out.println("任务执行的结果是:" + result);
        boolean done2 = future.isDone();
        System.out.println("第二次判断任务是否完成:" + done2);
        boolean cancelled2 = future.isCancelled();
        System.out.println("第二次判断任务是否取消:" + cancelled2);
    }
}

class MyCall implements Callable<Integer> {

    private int a;
    private int b;

    public MyCall(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        String name = Thread.currentThread().getName();
        System.out.println(name + "准备开始计算...");
        Thread.sleep(2000);
        System.out.println(name + "计算完成...");
        return a + b;
    }
}

综合案例

秒杀商品

案例介绍:假如某网上商城推出活动,新上架10部新手机免费送客户体验,要求所有参与活动的人员在规定的时间同时参与秒杀争抢,假如有20人同时参与了该活动,请使用线程池模拟这个场景,保证前10人秒杀成功,后10人秒杀失败,要求:

1、使用线程池创建线程
2、解决线程安全问题

思路提示

1:既然商品总数量是10个,那么我们可以在创建线程池的时候初始化线程数是10个及以下,设计线程池最大数量为10个
2:当某个线程执行完任务之后,可以让其他秒杀的人继续使用该线程参与秒杀
3:使用synchronized控制线程安全防止出现错误数据

代码步骤:

1:编写任务类,主要是送出手机给秒杀成功的客户
2:编写主程序类创建20个任务(模拟20个客户);
3:创建线程池对象并接收20个任务开始执行任务

代码示例:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 测试任务类
 */
public class MyTest {
    public static void main(String[] args) {
        //1、创建一个线程池对象
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(15));
        //2、循环创建任务对象
        for (int i = 0; i <= 10; i++) {
            MyTask myTask = new MyTask("客户" + i);
            executor.submit(myTask);
        }
        //3、关闭线程池
        executor.shutdown();
    }

}
class MyTask implements Runnable {

    //设计一个变量,用于表示商品的数量
    private static int id = 10;
    //表示商品名称的变量
    private String userName;

    public MyTask(String userName) {
        this.userName = userName;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(userName + "正在使用" + name + "参与秒杀任务...");
        synchronized (MyTask.class) {
            if (id > 0) {
                System.out.println(userName + "使用" + name + "秒杀:" + id + "号商品成功啦!");
            } else {
                System.out.println(userName + "使用" + name + "秒杀失败啦!");
            }
        }
    }
}

ATM机取款

设计一个程序,使用两个线程模拟在两个地点同时从一个账号中取钱,假如卡中一共有1000元,每个线程取800元要求演示结果一个线程取款成功,剩余200元,另一个线程取款失败,余额不足要求:

1:使用线程池创建线程
2:解决线程安全问题

思路提示:

1:线程池可以利用Executors工厂类的静态方法,创建线程池对象;
2:解决线程安全问题可以使用synchronized方法控制取钱的操作;
3:在取款前,先判断余额是否足够,且保证余额和取钱行为的原子性;

代码示例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyTest {
    public static void main(String[] args) {
        //1、创建线程池对象
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
            int id = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ATM" + id--);
            }
        });
        //2、创建两个任务并提交
        for (int i = 0; i < 2; i++) {
            pool.submit(new MyTask("客户" + i, 800));
        }
        //3、关闭线程池
        pool.shutdown();
    }
}


class MyTask implements Runnable {

    //用户姓名
    private String userName;
    //取款金额
    private double money;
    //总金额
    private static double total = 1000;

    public MyTask(String userName, double money) {
        this.userName = userName;
        this.money = money;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(userName + "正在使用" + name + "取款" + money + "元");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (MyTask.class) {
            if (total - money > 0) {
                System.out.println(userName + "正在使用" + name + "取款" + money + "元成功,余额" + (total - money));
                total -= money;
            } else {
                System.out.println(userName + "正在使用" + name + "取款" + money + "元失败,余额" + total);
            }
        }
    }
}

线程池的五大使用步骤:

1:利用Executors工厂类的静态方法创建线程池对象
2:编写Runnable或Callable实现类的实例对象
3:利用ExecutorService的submit方法或ScheduledExecutorService的schedule方法提交并执行线程任务
4:如果有执行结果则处理异步执行结果(Future)
5:调用shutdown0方法关闭线程池

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
乘风的头像乘风管理团队
上一篇 2023年12月15日
下一篇 2023年12月15日

相关推荐