如何看待生产者与消费者?

基于生产者和消费者实现整数平方功能

Posted by MasterJen on November 18, 2018

Hey ProducerAndCustomer

The best preparation for tomorrow is doing your best today.–对明天做好的准备就是今天做到最好!

在生产者与消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程,生产者线程负责提交用户请求,消费者线程负责具体的处理生产者计较的任务.生产者和消费者之间通过共享内存缓冲区进行通信.

鉴于此,实现一个基于生产者-消费者模式的求整数平方的并行程序.

生产者线程实现如下: 它构建 PCData 数据对象 并且进行放入到 BlockingQueue内存缓冲区中

 //用于 生产数据 然后放到 缓存中
 public class Producter implements Runnable {
     private volatile boolean isRunning = true;
 //    内存缓冲区  用于 共享数据
     private BlockingQueue<PCData> queue;
 //  用于整数的 自增取值  保证了线程安全性
     private static AtomicInteger count = new AtomicInteger();
 //    睡眠时间
     private static final int SlEEPTIME = 100;
 
     public Producter(BlockingQueue<PCData> queue) {
         this.queue = queue;
     }   
     @Override
     public void run() {
 //        创建 数据载体
         PCData data = null;
         Random r = new Random();
         System.out.println("start product id =" + Thread.currentThread().getId());
         try {
             while (isRunning){
                 Thread.sleep(r.nextInt(SlEEPTIME));
 //                得到 数据
                 data = new PCData(count.incrementAndGet());
                 System.out.println(data + " is put into queue");
 //               这里用offer方法往阻塞队列里面添加对象,此方法表示若队列满了,则等待2毫秒,2毫秒后若队列还是满的,则丢弃数据.
                 if(!queue.offer(data,2, TimeUnit.SECONDS)){
                     System.out.println("field to put data :" + data);
                 }
 
             }
         } catch (Exception e) {
             e.printStackTrace();
             Thread.currentThread().interrupt();
         }
     }
     public void stop(){
         isRunning = false;
     }
 }

里面的 offer()方法源码如下 :

  public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果  对象为空直接 报错
        if (e == null) throw new NullPointerException();
        // 等待的时间长度
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //上锁  
        putLock.lockInterruptibly();
        try {
        // 判断 当前值是否 等于 最大数值 (在本例中 最大是10)
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    } 对应的 消费者如下:

public class Customer implements Runnable {
    //    创建 内存缓冲区
    private BlockingQueue<PCData> queue;
    private static final int SLEEPTIME = 100;
    public Customer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
//        得到 当前线程id
        System.out.println("start customer id = " + Thread.currentThread().getId());
        Random random = new Random();
        try {
            while (true) {
//                得到  缓冲区里面的对象
                PCData data = queue.take();
                if (data != null) {
//                  得到 值 data
                    int re = data.getIntData() * data.getIntData();
//                    进行整数平方运算
                    System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(), data.getIntData(), re));
                    Thread.sleep(random.nextInt(SLEEPTIME));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

PCData 作为生产者与消费者之间的共享数据模型 ,爱码如下:

public class PCData {
    private final int intData;

    public PCData(int intData) {
        this.intData = intData;
    }

    public PCData(String data) {
        intData = Integer.valueOf(data);
    }

    public int getIntData() {
        return intData;
    }

    @Override
    public String toString() {
        return intData+":  data";
    }
}

测试类如下 :

public class TestProAndCus {
      public static void main(String[] args) {
//          创建 内存缓冲区  最大容量值 是 10
          BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
//          创建 三个生产者
          Producter producter1 = new Producter(queue);
          Producter producter2 = new Producter(queue);
          Producter producter3 = new Producter(queue);
//        创建 三个消费者
          Customer customer1 = new Customer(queue);
          Customer customer2 = new Customer(queue);
          Customer customer3 = new Customer(queue);
//         创建线程池
          ExecutorService executorService = Executors.newCachedThreadPool();
//        执行 生产者
          executorService.execute(producter1);
          executorService.execute(producter2);
          executorService.execute(producter3);
//          执行消费者
          executorService.execute(customer1);
          executorService.execute(customer2);
          executorService.execute(customer3);
          try {
              Thread.sleep(1000);
              producter1.stop();
              producter2.stop();
              producter3.stop();
              Thread.sleep(3000);
              executorService.shutdown();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
}

结果 如下:

start product id =11
start customer id = 14
start product id =13
start customer id = 15
start product id =12
start customer id = 16
1:  data is put into queue
2:  data is put into queue
2*2=4
1*1=1
3:  data is put into queue
3*3=9
4:  data is put into queue
4*4=16
5:  data is put into queue
5*5=25
6:  data is put into queue
6*6=36
7:  data is put into queue
7*7=49
8:  data is put into queue
9:  data is put into queue
10:  data is put into queue
11:  data is put into queue
12:  data is put into queue
8*8=64
9*9=81
10*10=100
.....

如下便完成了 生产者与消费者并行的问题.