返回信息流关于java生产者消费者的问题,面试时经常遇到,在网上找了个这份代码
package Thread;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
/**
* Simple Java program to demonstrate How to use wait, notify and notifyAll()
* method in Java by solving producer consumer problem.
*
* @author Javin Paul
*/
public class ProducerConsumerInJava {
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start(); consumer.start(); }
}
/**
* Producer Thread will keep producing values for Consumer
* to consumer. It will use wait() method when Queue is full
* and use notify() method to send notification to Consumer
* Thread.
*
* @author WINDOWS 8
*
*/
class Producer extends Thread
{
private Queue<Integer> queue;
private int maxSize;
public Producer(Queue<Integer> queue, int maxSize, String name){
super(name); this.queue = queue; this.maxSize = maxSize;
}
@Override public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out .println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println("Producing value : " + i); queue.add(i); queue.notifyAll();
}
}
}
}
/**
* Consumer Thread will consumer values form shared queue.
* It will also use wait() method to wait if queue is
* empty. It will also use notify method to send
* notification to producer thread after consuming values
* from queue.
*
* @author WINDOWS 8
*
*/
class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize, String name){
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Consuming value : " + queue.remove()); queue.notifyAll();
}
}
}
}
运行之后,基本是这样的输出。
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
Producing value : 121236026
Producing value : 1088779884
Producing value : 2114880110
Producing value : 553933662
Producing value : 1142835122
Producing value : -1442476004
Producing value : 552048650
Producing value : 1469959203
Producing value : -993350885
Producing value : 1941523712
Queue is full, Producer thread waiting for consumer to take something from queue
Consuming value : 121236026
Consuming value : 1088779884
Consuming value : 2114880110
Consuming value : 553933662
Consuming value : 1142835122
Consuming value : -1442476004
Consuming value : 552048650
Consuming value : 1469959203
Consuming value : -993350885
Consuming value : 1941523712
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
Producing value : 1208260243
直观上看是队列满了才开始消费,然后队列空了,再开始生产。要怎么改成生产-消费是乱序进行的呢?
这是一条镜像帖。来源:北邮人论坛 / java / #56945同步于 2017/8/3
该镜像源已超过 30 天没有更新,可能在源站已被删除。
Java机器人发帖
【问题】求教关于java生产者-消费者的问题
dragontwf
2017/8/3镜像同步14 回复
订阅后,新回复会通过你的通知中心匿名送达。
9 条回复
如果不sleep,notify之后,本身的线程继续获取锁的几率更大么?
【 在 mrcuber 的大作中提到: 】
: 生产或者消费完了之后sleep一段时间,增大其他线程获得锁的几率。
没有理论依据,仅仅是自己的猜想:
notify之后,生产者和消费者都没有持有锁,要去竞争。这个竞争其实是不公平的,线程切换(producer切到consumer或者反向)要比继续在当前线程执行任务耗费更多资源,操作系统更倾向于不切换。所以输出会变成一直生产或者一直消费。
试了一下,我之前说的还有问题,应该在synchronized语句块结束之后sleep,此时两个线程都不持有锁(update:这段有误,Thread.sleep()方法不释放锁,只是暂停执行,让切换线程与不切换耗费的资源在一个数量级),两个线程竞争锁时才会更公平。另外,sleep时间最好是随机数,否则调度器切换线程时还会有固定偏好。
【 在 dragontwf 的大作中提到: 】
: 如果不sleep,notify之后,本身的线程继续获取锁的几率更大么?
自己写一个store类呀.
不要用原生的队列呗..
自己编写的类用 数组...然后加上 头标..尾标...
加上判断空函数...判断满函数..
这么搞..生产和消费就不用竞争锁了..
确实,synchronized结束后sleep才有用,亲测可用,有没有不加sleep的方法呢,加了sleep等于降低了吞吐量吧
【 在 mrcuber 的大作中提到: 】
: 没有理论依据,仅仅是自己的猜想:
: notify之后,生产者和消费者都没有持有锁,要去竞争。这个竞争其实是不公平的,线程切换(producer切到consumer或者反向)要比继续在当前线程执行任务耗费更多资源,操作系统更倾向于不切换。所以输出会变成一直生产或者一直消费。
: 试了一下,我之前说的还有问题,应该在synchronized语句块结束之后sleep,此时两个线程都不持有锁(update:这段有误,Thread.sleep()方法不释放锁,只是暂停执行,让切换线程与不切换耗费的资源在一个数量级),两个线程竞争锁时才会更公平。另外,sleep时间最好是随机数,否则调度器切换线程时还会有固定偏好。
为什么用内部类不行,分开写就可以呢?
【 在 buwenyuwu 的大作中提到: 】
: 别用内部类,分成Pro、Con、Store三个类写,可以交叉进行。
我尝试了在syn块之外调用yield方法,貌似也没有用。(好像是可以的,之前看走眼了)
我又尝试了一下ReentrantLock,并指定公平模式(非公平模式仍然没有用)。这种情况下,生产和消费是交替进行的。仅根据输出来看,队列中仅有1个元素或者0个元素,也就是说队列的其他9个空间没有利用上。
为什么会出现生产消费交替出现的情况呢?由于我指定了公平模式,公平模式是指,当同步阻塞队列(AbstractQueuedSynchronizer中维护的等待队列,简称sync queue)非空时(准确地说,sync queue中除了头结点之外含有其他有效节点时),当前线程在执行lock方法时,必然会入队,即进入sync queue的尾部。而与公平模式相对应的非公平模式是指,某个线程执行lock()方法时,会首先尝试获取锁,如果失败才会入队,这种方式可能会导致队列中处于等待状态的线程饿死。
其实下面代码中的await和signalAll并不会起作用。
```Java
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerInJava {
static ReentrantLock queueLock = new ReentrantLock(true);
static Condition queueEmpty, queueFull;
static {
queueEmpty = queueLock.newCondition();
queueFull = queueLock.newCondition();
}
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start();
consumer.start();
}
}
/**
* Producer Thread will keep producing values for Consumer
* to consumer. It will use wait() method when Queue is full
* and use notify() method to send notification to Consumer
* Thread.
*
* @author WINDOWS 8
*/
class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Producer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
try {
ProducerConsumerInJava.queueLock.lock();
while (queue.size() == maxSize) {
try {
System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
ProducerConsumerInJava.queueFull.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println("Producing value : " + i);
queue.add(i);
ProducerConsumerInJava.queueEmpty.signalAll();
} finally {
ProducerConsumerInJava.queueLock.unlock();
}
}
}
}
/**
* Consumer Thread will consumer values form shared queue.
* It will also use wait() method to wait if queue is
* empty. It will also use notify method to send
* notification to producer thread after consuming values
* from queue.
*
* @author WINDOWS 8
*/
class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
try {
ProducerConsumerInJava.queueLock.lock();
while (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
try {
ProducerConsumerInJava.queueEmpty.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Consuming value : " + queue.remove());
ProducerConsumerInJava.queueFull.signalAll();
}finally {
}
}
}
}
```
如果直接用BlockingQueue来实现的话,好像是可以满足你的需求的(即满足消费-生产随机,又不降低吞吐量)。实现类可以选择ArrayBlockingQueue,其内部维护了一个数组,putIndex和takeIndex两个索引,一个用于取元素,一个用于放元素。当队列满或者空时,会依赖于ReentrantLock以及Condition达到一个类似于wait/notify机制的效果。
```Java
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerInJava {
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
int maxSize = 10;
BlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(maxSize);
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start();
consumer.start();
}
}
/**
* Producer Thread will keep producing values for Consumer
* to consumer. It will use wait() method when Queue is full
* and use notify() method to send notification to Consumer
* Thread.
*
* @author WINDOWS 8
*/
class Producer extends Thread {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
}
@Override
public void run() {
while (true) {
Random random = new Random();
int i = random.nextInt();
try {
queue.put(i);
} catch (InterruptedException e) {
}
System.out.println("Producing value : " + i);
}
}
}
/**
* Consumer Thread will consumer values form shared queue.
* It will also use wait() method to wait if queue is
* empty. It will also use notify method to send
* notification to producer thread after consuming values
* from queue.
*
* @author WINDOWS 8
*/
class Consumer extends Thread {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Consuming value : " + queue.take());
} catch (InterruptedException e) {
}
}
}
}
```
另外,题外话,怎么贴出带有行号的code?
【 在 dragontwf 的大作中提到: 】
: 确实,synchronized结束后sleep才有用,亲测可用,有没有不加sleep的方法呢,加了sleep等于降低了吞吐量吧