Hey ProducerAndCustomer
The best preparation for tomorrow is doing your best today.–对明天做好的准备就是今天做到最好!
在生产者与消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程,生产者线程负责提交用户请求,消费者线程负责具体的处理生产者计较的任务.生产者和消费者之间通过共享内存缓冲区进行通信.
鉴于此,实现一个基于生产者-消费者模式的求整数平方的并行程序.
生产者线程实现如下: 它构建 PCData 数据对象 并且进行放入到 BlockingQueue内存缓冲区中
//用于 生产数据 然后放到 缓存中
public class Producter implements Runnable {
private volatile boolean isRunning = true;
// 内存缓冲区 用于 共享数据
private BlockingQueue<PCData> queue;
// 用于整数的 自增取值 保证了线程安全性
private static AtomicInteger count = new AtomicInteger();
// 睡眠时间
private static final int SlEEPTIME = 100;
public Producter(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
// 创建 数据载体
PCData data = null;
Random r = new Random();
System.out.println("start product id =" + Thread.currentThread().getId());
try {
while (isRunning){
Thread.sleep(r.nextInt(SlEEPTIME));
// 得到 数据
data = new PCData(count.incrementAndGet());
System.out.println(data + " is put into queue");
// 这里用offer方法往阻塞队列里面添加对象,此方法表示若队列满了,则等待2毫秒,2毫秒后若队列还是满的,则丢弃数据.
if(!queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("field to put data :" + data);
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning = false;
}
}
里面的 offer()方法源码如下 :
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 如果 对象为空直接 报错
if (e == null) throw new NullPointerException();
// 等待的时间长度
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//上锁
putLock.lockInterruptibly();
try {
// 判断 当前值是否 等于 最大数值 (在本例中 最大是10)
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
} 对应的 消费者如下:
public class Customer implements Runnable {
// 创建 内存缓冲区
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 100;
public Customer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
// 得到 当前线程id
System.out.println("start customer id = " + Thread.currentThread().getId());
Random random = new Random();
try {
while (true) {
// 得到 缓冲区里面的对象
PCData data = queue.take();
if (data != null) {
// 得到 值 data
int re = data.getIntData() * data.getIntData();
// 进行整数平方运算
System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(), data.getIntData(), re));
Thread.sleep(random.nextInt(SLEEPTIME));
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
PCData 作为生产者与消费者之间的共享数据模型 ,爱码如下:
public class PCData {
private final int intData;
public PCData(int intData) {
this.intData = intData;
}
public PCData(String data) {
intData = Integer.valueOf(data);
}
public int getIntData() {
return intData;
}
@Override
public String toString() {
return intData+": data";
}
}
测试类如下 :
public class TestProAndCus {
public static void main(String[] args) {
// 创建 内存缓冲区 最大容量值 是 10
BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
// 创建 三个生产者
Producter producter1 = new Producter(queue);
Producter producter2 = new Producter(queue);
Producter producter3 = new Producter(queue);
// 创建 三个消费者
Customer customer1 = new Customer(queue);
Customer customer2 = new Customer(queue);
Customer customer3 = new Customer(queue);
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 执行 生产者
executorService.execute(producter1);
executorService.execute(producter2);
executorService.execute(producter3);
// 执行消费者
executorService.execute(customer1);
executorService.execute(customer2);
executorService.execute(customer3);
try {
Thread.sleep(1000);
producter1.stop();
producter2.stop();
producter3.stop();
Thread.sleep(3000);
executorService.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果 如下:
start product id =11
start customer id = 14
start product id =13
start customer id = 15
start product id =12
start customer id = 16
1: data is put into queue
2: data is put into queue
2*2=4
1*1=1
3: data is put into queue
3*3=9
4: data is put into queue
4*4=16
5: data is put into queue
5*5=25
6: data is put into queue
6*6=36
7: data is put into queue
7*7=49
8: data is put into queue
9: data is put into queue
10: data is put into queue
11: data is put into queue
12: data is put into queue
8*8=64
9*9=81
10*10=100
.....
如下便完成了 生产者与消费者并行的问题.