使用Java实现生产者消费者队列
参考: https://www.jianshu.com/p/66e8b5ab27f6
1、使用wait()和notify()实现
public class TestMultiThread { private final static Integer FULL = 10; private static Integer count = 0; private final static Class MYLOCK = TestMultiThread.class; public static void main(String[] args) { TestMultiThread testMultiThread = new TestMultiThread(); for (int i = 0; i < 4; i++) { new Thread(testMultiThread.new Producer()).start(); new Thread(testMultiThread.new Consumer()).start(); } } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (MYLOCK) { while (count == FULL) { try { MYLOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } ++count; System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count + "个任务"); MYLOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (MYLOCK) { while (count == 0) { try { MYLOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } --count; System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count + "个任务"); MYLOCK.notifyAll(); } } } } }
2、使用可重入锁ReentrantLock实现
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestMultiThread { private final static Integer FULL = 10; private static Integer count = 0; private Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args) { TestMultiThread testMultiThread = new TestMultiThread(); for (int i = 0; i < 4; i++) { new Thread(testMultiThread.new Producer()).start(); new Thread(testMultiThread.new Consumer()).start(); } } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); lock.lock(); while (count == FULL) notFull.await(); ++count; System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count + "个任务"); notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(5000); lock.lock(); while (count == 0) notEmpty.await(); --count; System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count + "个任务"); notFull.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }
3、使用阻塞队列BlockingQueue实现
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class TestMultiThread { private AtomicInteger count = new AtomicInteger(0); //创建一个阻塞队列 final static BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10); private final Class MYLOCK = TestMultiThread.class; public static void main(String[] args) { TestMultiThread testMultiThread = new TestMultiThread(); for (int i = 0; i < 5; i++) { new Thread(testMultiThread.new Producer()).start(); new Thread(testMultiThread.new Consumer()).start(); } } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); blockingQueue.put(1); synchronized (MYLOCK) { System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count.incrementAndGet() + "个任务"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(5000); blockingQueue.take(); synchronized (MYLOCK) { System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count.decrementAndGet() + "个任务"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } }
4、使用信号量Semaphore实现
import java.util.concurrent.Semaphore; public class TestMultiThread { private static Integer count = 0; private final Semaphore notFull = new Semaphore(10); private final Semaphore notEmpty = new Semaphore(0); private final Semaphore mutex = new Semaphore(1); public static void main(String[] args) { TestMultiThread testMultiThread = new TestMultiThread(); for (int i = 0; i < 4; i++) { new Thread(testMultiThread.new Producer()).start(); new Thread(testMultiThread.new Consumer()).start(); } } class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { notFull.acquire(); mutex.acquire(); ++count; System.out.println("生产者:" + Thread.currentThread().getName() + "生产数据,目前共有:" + count + "个任务"); } catch (InterruptedException e) { e.printStackTrace(); } finally { notEmpty.release(); mutex.release(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try { notEmpty.acquire(); mutex.acquire(); --count; System.out.println("消费者:" + Thread.currentThread().getName() + "消费数据,目前共有:" + count + "个任务"); } catch (InterruptedException e) { e.printStackTrace(); } finally { notFull.release(); mutex.release(); } } } } }
5、使用管道输入输出流PipedInputStream,PipedOutInputStream实现
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class TestMultiThread { public static void main(String[] args) { TestMultiThread testMultiThread = new TestMultiThread(); PipedOutputStream pos = new PipedOutputStream(); PipedInputStream pis = new PipedInputStream(); try { pos.connect(pis); new Thread(testMultiThread.new Producer(pos)).start(); new Thread(testMultiThread.new Consumer(pis)).start(); } catch (IOException e) { e.printStackTrace(); } } class Producer implements Runnable { private PipedOutputStream pos; public Producer(PipedOutputStream pos) { this.pos = pos; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { int num = (int) (Math.random() * 100); System.out.println("生产者:" + Thread.currentThread().getName() + "生产了数字:" + num); pos.write(num); pos.flush(); } catch (IOException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private PipedInputStream pis; public Consumer(PipedInputStream pis) { this.pis = pis; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { int num = pis.read(); System.out.println("消费者:" + Thread.currentThread().getName() + "消费数字:" + num); } catch (IOException e) { e.printStackTrace(); } } } } }