Loading...
墨滴

Bayanbulake

2021/12/25  阅读:51  主题:自定义主题1

Java多线程通信方式[一]

线程通信

多个线程在争夺同一个资源时,为了让这些线程协同工作、提高CPU利用率,可以让线程之间进行通信,具体可以通过wait()notify()notifyAll()实现:

  • wait():使当前线程处于等待状态(阻塞),直到其他线程调用此对象的notify()方法或者notifyAll()

  • notify():唤醒在此对象监视器上等待的单个线程;如果有多个线程同时在监视器上等待,则随机唤醒一个。

  • notifyAll():唤醒在对象监视器上等待的所有线程。

简言之,wait()、会使线程阻塞,notify()或者notifyAll()可以唤醒线程,使之成为就绪状态。

在实际使用这些方法时,还要注意以下几点:

  • 以上三个方法都是Objectnative方法,不是Thread类的。这是因为Java提供的锁机制是对象级的,而不是线程级的。

  • 这三个方法都必须在synchronized修饰的方法(或者代码块)中使用,否则会抛出异常java.lang.IllegaMonitorStateException

  • 在使用wait()时,为了避免并发带来的问题,通常建议将wait()方法写在循环内部,JDK在定义此方法时,也对此增加了注释说明。

下面通过一个生产者和消费者的案例,强化对线程通信的理解。该案例的逻辑如下:

  1. 生产者CarProducter不断地向共享缓冲区增加数据(我们使用cars++模拟)。
  2. 同时消费者不断地从共享缓冲区消费数据(cars--)。
  3. 共享缓冲区有固定的大小的容量(20)。
  4. 当产量达到20时,生产者将会不在生产,生产者的线程就会通过wait()使自己处于阻塞状态;直到消费者消费了一些产量(<20),再通过notify()notifyAll()唤醒生产者去继续生产。
  5. 当产量为0时,消费者会调用wait()使自己处于阻塞状态,直到生产者增加了产量后(>0),再通过notify()notifyAll()唤醒消费者去继续消费。

这样的规则,就会使得产量在0~20之间一直处于波动的状态。

package com.geovis.bin.custom.study.wangbin.lock.contion;

/**
 * @Author: Wangb
 * @EMail: 1149984363@qq.com
 * @Date: 24/12/2021 上午11:50
 * @Description
 */

//car的库存
class CarStock {
    //最多能存放20辆车
    int cars;

    //通知生产者去生产车
    public synchronized void productCar() {
        try {
            if (cars <= 20) {
                System.out.println("生产车...."+ cars);
                Thread.sleep(100);
                //通知正在监听CarStock并且处于阻塞状态的线程(即处于wait()状态的消费者)
                notifyAll();
                cars++;

            } else {//超过了最大库存20
                //使自己(当前的生产者线程)处于阻塞状态,等待消费者消执行car--(即等待消费者调用notifyAll()方法)
                wait();

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //通知消费者去消费车
    public synchronized void consumeCar() {
        try {
            if (cars > 0) {
                System.out.println("销售车...."+ cars);
                Thread.sleep(100);
                notifyAll();
                cars--;
                //通知正在监听CarStock并且处于阻塞状态的线程(即处于wait()状态的生产者)
            } else {
                //使自己(当前的消费者线程)处于阻塞状态,等待消费者消执行car++(即等待生产者调用notifyAll()方法)
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//生产者
class CarProducter implements Runnable {
    CarStock carStock;
    public CarProducter(CarStock clerk) {
        this.carStock = clerk;
    }

    @Override
    public void run() {
        while (true) {
            carStock.productCar(); //生产车
        }
    }
}

//消费者
class CarConsumer implements Runnable {
    CarStock carStock;

    public CarConsumer(CarStock carStock) {
        this.carStock = carStock;
    }

    @Override
    public void run() {
        while (true) {
            carStock.consumeCar();//消费车
        }
    }
}

//测试方法
public class ProducerAndConsumer {
    public static void main(String[] args) {
        CarStock carStock = new CarStock();
        //注意:生产者线程和消费者线程,使用的是同一个carStock对象
        CarProducter product = new CarProducter(carStock);
        CarConsumer resumer = new CarConsumer(carStock);
        //2个生产者,2个消费者
        Thread tProduct1 = new Thread(product);
        Thread tProduct2 = new Thread(product);
        Thread tResumer1 = new Thread(resumer);
        Thread tResumer2 = new Thread(resumer);
        tProduct1.start();
        tProduct2.start();
        tResumer1.start();
        tResumer2.start();
    }
}

程序运行起来,如下图所示:

上面的代码是一个非常简单的生产者和消费者共享变量的的程序。

接下来,我们使用队列和线程池技术对该程序进行改进,并且此次共享数据是一个BlockingQueue队列,该队列可以保存100个Car Data对象。

汽车实体类

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

public class CarData {
    private int id;

    //其他字段
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }
}

汽车库存类CarStock,包含共享缓冲区BlockingQueue对象

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class CarStock {
    //统计一共生产了多少辆车
    private static int count = 0;
    //存放CarData对象的共享缓冲区
    private BlockingQueue<CarData> queue;

    public CarStock(BlockingQueue<CarData> queue) {
        this.queue = queue;
    }

    //生产车
    public synchronized void productCar() {
        try {
            CarData carData = new CarData();
            //向CarData队列增加一个CarData对象
            boolean success = this.queue.offer(carData, 2, TimeUnit.SECONDS);
            if (success) {
                int id = ++count;
                carData.setId(id);
                System.out.println("生产CarData,编号:" + id + ",库存:" + queue.size());
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
            } else {
                System.out.println("生产CarData失败....");
            }
            if (queue.size() < 100) {
            } else {

                System.out.println("库存已满,等待消费...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //消费车
    public synchronized void resumeCar() {
        try {
            // 从CarData队列中,拿走一个CarData对象
            CarData carData = this.queue.poll(2, TimeUnit.SECONDS);
            if (carData != null) {
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
                System.out.println("消费CarData,编号:" + carData.getId() + ",库存: " + queue.size());
            } else {
                System.out.println("消费CarData失败....");
            }
            if (queue.size() > 0) {

            } else {
                System.out.println("库存为空,等待生产...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

生产者类

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

public class CarProducter implements Runnable {
    //共享缓存区
    private CarStock carPool;
    //多线程的执行状态,用于控制线程的启停
    private volatile boolean isRunning = true;

    public CarProducter(CarStock carPool) {
        this.carPool = carPool;
    }

    @Override
    public void run() {
        while (isRunning) {
            carPool.productCar();
        }
    }

    //停止当前线程
    public void stop() {
        this.isRunning = false;
    }
}

消费者类

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

//消费者
public class CarConsumer implements Runnable {
    //共享缓存区:CarData队列
    private CarStock carPool;

    public CarConsumer(CarStock carPool) {
        this.carPool = carPool;
    }

    @Override
    public void run() {
        while (true) {
            carPool.resumeCar();
        }
    }
}

测试类

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class TestProducerAndConsumer {

    public static void main(String[] args) throws Exception {
        //共享缓存区:CarData队列
        BlockingQueue<CarData> queue = new LinkedBlockingQueue<CarData>(100);
        //CarData库存,包含了queue队列
        CarStock carStock = new CarStock(queue);
        //生产者
        CarProducter carProducter1 = new CarProducter(carStock);
        CarProducter carProducter2 = new CarProducter(carStock);
        CarProducter carProducter3 = new CarProducter(carStock);
        //消费者
        CarConsumer carConsumer1 = new CarConsumer(carStock);
        CarConsumer carConsumer2 = new CarConsumer(carStock);
        CarConsumer carConsumer3 = new CarConsumer(carStock);
        //将生产者和消费者加入线程池运行
        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(carProducter1);
        cachePool.execute(carProducter2);
        cachePool.execute(carProducter3);
        cachePool.execute(carConsumer1);
        cachePool.execute(carConsumer2);
        cachePool.execute(carConsumer3);
//  carProducter1.stop();停止p1生产
//  cachePool.shutdown();//关闭线程池 
    }
}

测试类代码的执行结果,如下图所示:

以上就是使用JDKObject类中的方法实现的线程通信,为了避免文章过于冗长,后面的文章我们还会介绍其他方式的线程通信。

Bayanbulake

2021/12/25  阅读:51  主题:自定义主题1

作者介绍

Bayanbulake