Java BlockingQueue interface is used to implement producer consumer design pattern. This design pattern comes to solve common problem, in which producer and consumer threads use the same queue to pass over the information.
In the Blocking Queue, producer will put elements and wait till space is available if queue is full and consumer will fetch elements and wait till element is available if queue is empty.
Producer Thread produces data and Consumer thread consumes it in FIFO order as blocking queue allows elements to be accessed in FIFO.
Example:
Blocking Queue Sample:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueSample {
/**
* Blocking Queue sample
*
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
BlockingQueue sharedBlockingQueue = new LinkedBlockingQueue(
5);
Runnable producer = new Producer("Producer", sharedBlockingQueue);
Runnable consumer = new Consumer("Consumer", sharedBlockingQueue);
Thread p = new Thread(producer);
Thread c = new Thread(consumer);
p.start();
Thread.sleep(1000); // let producer allow to fill some data in the
// blocking queue
c.start();
}
}
}
Producer:
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private String name = null;
private BlockingQueue bQueue;
Producer(String name, BlockingQueue sharedQueue) {
this.name = name;
this.bQueue = sharedQueue;
}
@Override
public void run() {
for (int i = 1; i <= 20; ++i) {
try {
/*
* Producer will be blocked and wait to store further data if
* shared queue is full and wait till Consumer consume it.
*/
bQueue.put("data" + String.valueOf(i));
System.out.println(this.name + " Producing :" + "data"+ String.valueOf(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer:
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private String name = null;
private BlockingQueue bQueue;
Consumer(String name, BlockingQueue sharedQueue) {
this.name = name;
this.bQueue = sharedQueue;
}
@Override
public void run() {
for (int i = 1; i <= 20; ++i) {
try {
/*
* Consumer will be blocked and wait if shared queue is empty
* and wait till Producer fills data in queue
*/
String value = bQueue.take();
System.out.println(this.name + " Consuming :" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Output:
Producer Producing :data1
Producer Producing :data2
Producer Producing :data3
Producer Producing :data4
Producer Producing :data5
Producer Producing :data6
Consumer Consuming :data1
Producer Producing :data7
Consumer Consuming :data2
Producer Producing :data8
Consumer Consuming :data3
Producer Producing :data9
Consumer Consuming :data4
Consumer Consuming :data5
Producer Producing :data10
Consumer Consuming :data6
Producer Producing :data11
Consumer Consuming :data7
Producer Producing :data12
Consumer Consuming :data8
Producer Producing :data13
Consumer Consuming :data9
Producer Producing :data14
Consumer Consuming :data10
Producer Producing :data15
Consumer Consuming :data11
Producer Producing :data16
Consumer Consuming :data12
Producer Producing :data17
Consumer Consuming :data13
Producer Producing :data18
Consumer Consuming :data14
Producer Producing :data19
Consumer Consuming :data15
Producer Producing :data20
Consumer Consuming :data16
Consumer Consuming :data17
Consumer Consuming :data18
Consumer Consuming :data19
Consumer Consuming :data20