Параллелизм Java - интерфейс BlockingQueue

Интерфейс java.util.concurrent.BlockingQueue является подинтерфейсом интерфейса Queue и дополнительно поддерживает такие операции, как ожидание, пока очередь не станет пустой перед извлечением элемента, и ожидание, пока в очереди не станет доступным место перед сохранением элемента. .

BlockingQueue методы

Sr. No. Метод и описание
1

boolean add(E e)

Вставляет указанный элемент в эту очередь, если это возможно сделать немедленно, без нарушения ограничений емкости, возвращая true в случае успеха и генерируя исключение IllegalStateException, если в настоящее время нет свободного места.

2

boolean contains(Object o)

Возвращает истину, если эта очередь содержит указанный элемент.

3

int drainTo(Collection<? super E> c)

Удаляет все доступные элементы из этой очереди и добавляет их в данную коллекцию.

4

int drainTo(Collection<? super E> c, int maxElements)

Удаляет не более указанного количества доступных элементов из этой очереди и добавляет их в данную коллекцию.

5

boolean offer(E e)

Вставляет указанный элемент в эту очередь, если это возможно сделать немедленно, без нарушения ограничений емкости, возвращая истину в случае успеха и ложь, если в данный момент нет свободного места.

6

boolean offer(E e, long timeout, TimeUnit unit)

Вставляет указанный элемент в эту очередь, ожидая до указанного времени ожидания, если необходимо, чтобы пространство стало доступным.

7

E poll(long timeout, TimeUnit unit)

Извлекает и удаляет заголовок этой очереди, ожидая до указанного времени ожидания, если необходимо, чтобы элемент стал доступным.

8

void put(E e)

Вставляет указанный элемент в эту очередь, ожидая, если необходимо, освободится место.

9

int remainingCapacity()

Возвращает количество дополнительных элементов, которые эта очередь может в идеале (при отсутствии ограничений памяти или ресурсов) принять без блокировки, или Integer.MAX_VALUE, если нет внутреннего ограничения.

10

boolean remove(Object o)

Удаляет единственный экземпляр указанного элемента из этой очереди, если он присутствует.

11

E take()

Извлекает и удаляет заголовок этой очереди, при необходимости ожидая, пока элемент станет доступным.

пример

Следующая программа TestThread показывает использование интерфейса BlockingQueue в среде на основе потоков.

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }  


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }	   
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }
      
      @Override
      public void run() {
         
         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

Это даст следующий результат.

Вывод

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27