高效使用阻塞隊列:生產者-消費者模型的最佳實踐 | 阻塞隊列 | 3 種實現

高效使用阻塞隊列:生產者-消費者模型的最佳實踐 | 阻塞隊列 | 3 種實現

Overview of Content

生產者-消費者模型(Producer-Consumer Model)是一種常見的多執行緒佈局,這裡我們將深入探討其核心元素和應用場景

首先,我們會介紹生產者和消費者這兩個角色,並解釋它們如何通過阻塞隊列進行有效的通信,我們將探討阻塞隊列的特性,特別是其在資源管理中的重要性

接著,我們將詳述不同類型的阻塞隊列,包括 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue 等,分析每種隊列的特性及其適用場景。我們還會對比 ArrayBlockingQueueLinkedBlockingQueue 的鎖機制,幫助您理解它們在多執行緒環境中的不同表現

最後,我們將展示如何實現生產者-消費者模型,包括使用 ArrayBlockingQueuePriorityQueueReentrantLock 的具體實現方法,並探討這些方法在提升系統效能方面的優勢

透過本篇文章,您將全面了解生產者-消費者模型及其相關技術,並能夠在實際應用中選擇最適合的解決方案

寫文章分享不易,如有引用參考請詳註出處,如有指導、意見歡迎留言(如果覺得寫得好也請給我一些支持),感謝 😀

個人程式分享時比較注重「縮排」,所以可能不適合手機的排版閱讀,建議切換至「電腦版」、「平板版」視窗看


認識生產者、消費者

生產者-消費者模型(Producer-Consumer Model)是一種常見的多執行緒佈局,這裡我們先捨去「多執行序概念」,先了解這兩個角色

生產者Producer):負責生成數據或任務

消費者Consumer):從隊列中取出數據、任務並進行處理

而這兩種通常會一同處理同一個事物,目標是加速處理資料的效率… 那為了達到這個目標,我們需要什麼呢?請繼續往下看…

生產者、消費者模型:所需元素

● 要製作生產者、消費者模型我們需要以下幾個元素

通訊容器:這個容器就是隊列(Queue

需要有一個橋樑將生產者、消費者連接起來,這個橋樑就是「隊列」,這樣兩種才可以通訊(相互傳遞是否已經處理完畢,或是生產出需要處理的物件)

● 為什麼隊列?不是其他數據結構

因為隊列(Queue)這種數據結構本身就具有 FIFO 的特性,這個特性擁有「排隊」的功能,能夠「有順序性」的處理事件

graph TD subgraph Queue Task1 --> Task2 --> Task3 end Producer -.-> |生成數據或任務| Queue Queue -.-> |取出數據 and 任務| Consumer subgraph 生產者-消費者模型 Producer["生產者 (Producer)"] Queue["隊列 (Queue)"] Consumer["消費者 (Consumer)"] end

這個通訊容器「可阻塞」

阻塞意味著「等待」,因為不管是生產者還是消費者,雙方往往不會有資源對等的情況,更常碰到的是其中一方缺乏資源,而另外一方不斷搜尋(Pulling)的情況

這意味著資源的耗費,所以 生產者、消費者模型 往往需要一個可阻塞的容器

為什麼需要阻塞

阻塞在執行序來說就是休眠,意味這讓出 CPU 資源,這種特性運作在隊列中代表的含義就是等待任務(並且不耗費 CPU 資源),可以大大提升應用的性能

graph TD Producer -.->|生成數據或任務| Queue Queue -.->|取出數據 and 任務| Consumer subgraph 生產者-消費者模型 Producer["生產者 (Producer)"] Queue["阻塞隊列 (Blocking Queue)"] Consumer["消費者 (Consumer)"] end classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px; Queue:::queueClass

● 再來,我們需要 這個通訊容器「執行序安全」:也就同步機制

在這裡我們再加入多執行序(Multi-Thread)的特性,我們知道多執行序在運行時沒有任何的鎖 🔒 那就是一種非安全操作,所以我們需要這個通訊容器內擁有鎖 🔒

而怎麼鎖,幾把鎖,又該如何挑選鎖… 可以點擊以下連結去了解鎖,而這篇文章底下會說明如何實現「生產者、消費者模型」

對鎖 🔒 不清楚的讀者,可以點擊 全面解析多執行緒與同步技術:SYNC、CAS、ThreadLocal | 公平鎖、可重入鎖、樂觀鎖 參考

graph TD Producer -.->|生成數據或任務| Queue Queue -.->|取出數據 and 任務| Consumer subgraph " 🔒 🔒 🔒 生產者-消費者模型 🔒 🔒 🔒 " Producer["生產者 (Producer)"] Queue["阻塞隊列 (Blocking Queue)"] Consumer["消費者 (Consumer)"] end classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px; Queue:::queueClass

阻塞隊列特性

堵塞:執行緒在阻塞時會讓出 CPU 資源,再讓出 CPU 資源後,系統就會將 CPU 運算資源讓給其他執行序

阻塞隊列:阻塞隊列是「產生者、消費者模式」中的一個元素,在不符合條件時就會進行阻塞動作(可以想成等待),其操作必須符合 兩個條件

A. 作為「消費者」角色,當 隊列為空時,消費者要獲取元素,會呈現執行序 等待狀態(阻塞),直到有數據放入佇列(通常帶有通知功能)

graph TD Producer -->|生成數據或任務| Queue Queue -->|取出數據 and 任務| Consumer subgraph 生產者-消費者模型 Producer["生產者 (Producer)"] Queue["阻塞隊列 (Blocking Queue)"] Consumer["消費者 (Consumer)"] end classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px; Queue:::queueClass subgraph 阻塞特性 阻塞-->讓出CPU資源 讓出CPU資源-->其他執行緒獲取CPU end Consumer -.-> |1. 無元素可消耗| 阻塞 其他執行緒獲取CPU -.-> |2. 通知| Producer

B. 作為「生產者」角色,當 隊列滿時,生產者要加入元素,也會呈現等待狀態(阻塞)等佇列有空間,並同時讓出 CPU 資源(通常帶有通知功能)

graph TD Producer -->|生成數據或任務| Queue Queue -->|取出數據 and 任務| Consumer subgraph 生產者-消費者模型 Producer["生產者 (Producer)"] Queue["阻塞隊列 (Blocking Queue)"] Consumer["消費者 (Consumer)"] end classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px; Queue:::queueClass subgraph 阻塞特性 阻塞-->讓出CPU資源 讓出CPU資源-->其他執行緒獲取CPU end Producer -.-> |1. 隊列滿| 阻塞 其他執行緒獲取CPU -.-> |2. 通知| Consumer

生產者、消費者模型:應用場景

資源管理

生產者-消費者模型可以「有效、安全」地管理和分配系統資源,避免資源的競爭和衝突

異步處理

生產者和消費者可以在不同的執行緒中執行,實現任務的異步處理,提高系統的響應速度

多執行緒處理

在多執行緒環境中,生產者生成數據,消費者處理數據,這樣可以充分利用多核處理器的性能,提高應用的效率(提升利用 CPU 的吞吐量

併發編程中使用生產者 & 消費者模式能夠解決絕大多數的併發問題,如果發生生產 & 消費兩者效率不同時就必須使用一個容器去解決,該容器就是隊列,隊列會產生一種緩衝的功能


堵塞隊列:Blocking-Queue

我們前面已經說明了為何生產者、消費者模型要使用隊列,那接著我們就來看看 Java 內置的阻塞隊列有哪些?這些隊列又哪有些特性、特色

BlockingQueue 界面:阻塞隊列核心方法

● Java BlockingQueue 是一個界面(interface)它定義了阻塞隊列的核心方法,下表是 BlockingQueue 類的重點方法,其中 只有 put、take 會有阻塞 現象

● 對列表「添加」元素

NameParamsreturnFunc
addE(泛型)boolean如果不違反容量限制則回傳 true,沒空間拋出 IllegalStateException
offerEboolean就像是 add(E) 方法,但並不會拋出異常
offerE, time, unitboolean限定時間內要放入數據
put(會阻塞)Evoid將數據加入 BlockingQueue 裡面,如果沒有空間,則調用此方法的執行序被阻斷,直至有空間放入

● 對列表「移除」元素

NameParamsreturnFunc
polltimeE搜尋 & 移除 Header 數據
polltime, unitE限定時間內要取出數據
take(會阻塞)voidE執行序一直等待直到取出任務
drainToCollection<? super E>int一次性取出所有任務,添加任務到指定集合,返回任務數量
drainToCollection<? super E>, maxint一次性取出限制數量任務,返回任務數量

● 「提交任務」到列表

NameParamsreturnFunc
executeRunnablevoid提交任務去給執行序池執行

● 下表則是比較 BlockingQueue 相同目的之下(以下分為插入、移除、檢查),所擁有的不同方法,這些方法也會有不同的反應

方法目的操作失敗時拋出異常操作時不拋出,使用返回值操作失敗時一直堵塞
插入add(E)offer(E)put()
移除remove()poll()take()
檢查element()peek()---

BlockingQueue 實現:各種阻塞隊列特性

● 我們上面說過 BlockingQueue 是一個界面不能直接實例化,所以這裡就要來介紹有哪些類實作了 BlockingQueue 界面,不同的類個別都會有不同特性,我們可以根據自身的業務需求去做選擇…

首先,我們再來重新複習一下「隊列」的功能:隊列是 在生產者、消費者之間所需要的 容器,目的是為了 平衡兩方生產、消耗不均的問題,否則沒辦法協作;

生產者、消費者之間不需要知道對方,不會直接產生關係 (就像是生產的流水線關係)

● Java BlockingQueue 有幾種實現類 (要注意※ 各有不同特性),以下我們將阻塞隊列分為「有界」、「無界」來看

有界 & 無界 ?

既然是排隊就會有所謂的上限,而有界就是有上限,無界就是無上限

而無界的特性就是,放入不會被堵塞,取得會堵塞,當寫入空間不足時會不斷地擴容,值到超出上限發生了 OOM,系會被把這個進程給 kill

有界隊列

A. ArrayBlockingQueue 隊列 : 由數據結構靜態 Queue 所組成的 「有界」 堵塞隊列

它的特性是 FIFO,默認 不保證執行序(Thread)公平 的訪問對列,但仍可透過參數設定調整為公平(如下圖)

什麼是公平的訪問

如果是不公平訪問,意味著 無論一個執行緒在隊列前面等待了多長時間,新的執行緒仍然可能在它之前獲得訪問權(效率高)

而公平訪問則是 按照阻塞的先後順序訪問隊列(效率較低)

B. LinkedBlockingQueue 隊列 : 由數據結構動態 Queue 所組成的 「有界」 堵塞隊列

它的特性是 FIFO,能高效的處理數據主要是因為,對於生產者消費者使用了個別「獨立」的鎖,意味著可以分開等待時間,使用高並發進型數據處理

Queue 的容量預設為 Integer#MAX_VALUE 的數量,但也可透過參數自己設定

C. LinkedBlockingDeque 隊列 : 由數據結構雙向 Queue 所組成的 「雙向」 堵塞隊列

多執行序同時入隊時,競爭少一半所以速度較快,雙端可取可放

透過設定值可以影響到它是有界或是無界

無界隊列

A. PriorityBlockingQueue 隊列 : 支持優先即排序的 「無界」 堵塞隊列

其特色是插入隊列時無堵塞,而從隊列中取出元素就有堵塞行為

默認初始容量為 11,可自定 compareTo 方法,自訂比較器

B. LinkedTransferQueue 隊列 : 由數據結構 Queue 所組成的 「無界」 堵塞隊列,實現了重要界面 TransferQueue

特性是提供比 LinkedBlockingQueue 更高效的轉移操作

除了擁有 LinkedBlockingQueue 的所有功能外,它還提供了一個 transfer 方法,該方法允許將一個元素直接轉移給消費者,而不是將其排入隊列

該隊列特別適合在生產者和消費者的數量相差不大且需要快速傳遞的場景中使用。它也能在生產者和消費者數量不平衡的情況下,通過等待策略來實現高效運行

C. DelayQueue 隊列 : 使用優先即排序的「無界」堵塞隊列,剩餘時間越短越快取出,並且時間到後才能取出

DelayQueue 是一個泛型,泛型有規範必須實現 Delayed 界面,並且該界面實現 Comparable<Delayed> 界面,支持可延遲獲取元素

D. SynchroniusQueue 隊列 : 不儲存元素的隊列,解偶生產者 & 消費者

okHttp 有使用,目的是添加元素一失敗,馬上創建新的執行序去訪問網路資源

ArrayBlockingQueue & LinkedBlockingQueue:鎖的差別 🔒

如果不清楚「鎖 🔒」的概念,可以先去了解 全面解析多執行緒與同步技術:SYNC、CAS、ThreadLocal | 公平鎖、可重入鎖、樂觀鎖

ArrayBlockingQueueLinkedBlockingQueue 兩者都是使用 ReentrantLock 顯式可重入鎖,但是兩者 鎖的實現有差別

● ArrayBlockingQueue 實現的隊列,生產、消費是使用「同把鎖」,效率較低,因為等待生產者與消費者之間必須相互等待,無法獨立作業

● LinkedBlockingQueue 鎖是分離的,生產使用的是 putLock,消費使用的是 takeLock,可分開做等待、喚醒的動作,加強的效率

B. 生產 & 消費時的操作不同

ArrayBlockingQueue 使用 Array 陣列,所以在取值時速度較快

LinkedBlockingQueue 使用 Linked 數據結構,再取值時需要再 new Node<E> 進行插入、移除,所以 Linked 效率會低一點

C. 初始化大小

Array 必須指定初始化陣列的數值,用此數值來創建陣列大小

Linked 在創建時不需要指定大小,因為它是使用串列,有頭指標

SynchronousQueue

SynchronousQueue 其特色是它只會儲存當前元素 (只存一個元素,但是有稱為隊列),每一個 put 操作都必須等待一個 take 消費,否則不能添加元素


// SynchronousQueue.java
// 內部並沒有等待

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

LinkedTransferQueue

LinkedTransferQueue 特色主要在於 transfer 方法

方法名功能
transfer當數據還在等待時間 (時間到才能被消費),生產者一個立即給消費者; 當沒有消費者時如同 put 會做等待
tryTransfer該方法用來試探傳入元素是否能直接給消費者,如果沒有消費者則返回 false,不做等待,算是一種試探

LinkedBlockingDequeue

● 其使用的方式是雙端隊列,雙端隊列可以增加數據的吞吐量


實現生產者、消費者模型

接下來我們就透過 Java 中已有的一些共能(鎖、阻塞隊列)來實現產生產者與消費者模型

ArrayBlockingQueue 實現模型

● ArrayBlockingQueue 內部就實現了堵塞,如果我們想要單純的使用 Java 內置的阻塞隊列來實現生產者、消費者模型也是可以的,範例如下

● 使用 ArrayBlockingQueue 阻塞隊列

以下簡單的使用靜態資源(static member


abstract class abCommonClass {
    protected static final int SIZE = 10;
    protected static ArrayBlockingQueue<String> a = new ArrayBlockingQueue<>(SIZE, true);
}

● 為什麼要使用靜態資源?

其實只是為個簡單示範而已,這樣我在外部就不需要創建隊列… 而使用靜態(static)成員的原因是因為,「生產者、消費者」的目標容器要相同,否則就無法協作!

消費者consumer):

首先消費者進入無限循環,不斷的取出元素,直到元素都被取完就「阻塞」(poll 方法的特性,如果沒有元素可取 poll 就會阻塞當前執行緒)


class abConsumer extends abCommonClass implements Runnable {

    @Override
    public void run() {
        while(true) {
            try {
                TimeUnit.SECONDS.sleep(3);
                a.poll();
                System.out.println("Poll Data " + a.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

生產者producer):

相同,生產者進入無限循環,不斷的放入元素,直到隊列被填滿就「阻塞」(offer 方法的特性,如果隊列滿 offer 就會阻塞當前執行緒)


class abProducer extends abCommonClass implements Runnable {

    @Override
    public void run() {
        while(true) {
            try {
                TimeUnit.SECONDS.sleep(1);
                a.offer("String");
                System.out.println("Offer Data " + a.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

● 測試自實現的生產者-消費者模型


public class testArrayBlockingQueue {

    public static void main(String[] args) {
        abConsumer c = new abConsumer();
        abProducer p = new abProducer();

        new Thread(c).start();
        new Thread(p).start();
    }
}

--實做--

PriorityQueue、Synchronized 實現模型

● 以下我們自己去實現實現生產者、消費者模型

🔒:鎖是為了要顧慮到生產者、消費者模型的執行緒安全

這邊簡單的使用靜態資源(成員 p)作為鎖


abstract class commonKey {
    // 限制 Queue 的大小
    protected static final int SIZE = 10;
    
    // Java 的阻塞隊列
    protected static PriorityQueue<String> p = new PriorityQueue<>(SIZE);
}

● Java 的阻塞隊列內就已經有鎖了,為什麼還要鎖?

這是因為阻塞隊列的鎖是針對隊列內的操作元素,而不是外部的操作,我們現在的鎖是針對 生產者、消費者操作的鎖

消費者consumer):

首先消費者開始運行後,1.進入 while 無限循環,不斷判斷是否有元素要消耗,並且在 2. 判斷元素時使用鎖 🔒 保障資源的安全消耗

鎖配合 synchronized 關鍵字

如果 3. 「沒有元素」要消耗則執行序進入 wait() 阻塞休眠,4. 消耗則使用 poll() 方法,並呼叫 notify 通知生產這繼續生產


class consumer extends commonKey implements Runnable {

    @Override
    public void run() {
        while(true) {
            synchronized (p) {
                while(p.size() == 0) {
                    try {
                        System.out.println("Queue Empty can not Poll");
                        p.wait();	// Object's method
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        p.notify();
                    }
                }

                try {
                    //TimeUnit.SECONDS.sleep(1);
                    System.out.println("Poll Data " + p.size());
                    p.poll();
                    p.notify();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

生產者producer):

同樣生產者者開始運行後,1.進入 while 無限循環,不斷判斷是否有元素需要消耗,並且在 2. 判斷元素時使用鎖 🔒 保障資源的安全消耗

鎖配合 synchronized 關鍵字

如果 3. 「元素超過隊列限制」則生產者執行序進入 wait() 阻塞休眠,4. 生產則使用 offer() 方法,並呼叫 notify 通知消費者繼續消費


class producer extends commonKey implements Runnable {

    @Override
    public void run() {
        while(true) {
            synchronized (p) {
                while(p.size() == SIZE) {
                    try {
                         System.out.println("Queue Full can not offer anymore");
                         p.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        p.notify();
                    }
                }


                try {
                    TimeUnit.SECONDS.sleep(1);
                     System.out.println("producer " + p.size());
                    p.offer("Hello");
                    p.notify();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

● 測試自實現的生產者-消費者模型


public class testBlockingQueue {

    public static void main(String[] args) {
        consumer c = new consumer();
        producer p = new producer();

        Thread t1 = new Thread(c);
        Thread t2 = new Thread(p);

        t1.start();
        t2.start();
    }

}

PriorityQueue、ReentrantLock 實現模型:更高效能

● 接著,如同上個小節的範例,上個小節的範例在使用鎖時使用 synchronized 關鍵字配合「同把鎖」來達到生產者、消費者

這種實現雖然可以完成模型,不過效能不夠好,所以這裡我們透過鎖的優化(使用 ReentrantLock),來達成 生產者、消費者使用「不同條件來操作鎖 🔒」

🔒:鎖是為了要顧慮到生產者、消費者模型的執行緒安全

這邊進階改成 ReentrantLock 作為鎖,並分別出兩個條件分別給生產者(使用 con 成員)、消費者控制(使用 pro 成員)


class rCommonReentrant {
    // 隊列大小限制
    protected static final int SIZE = 10;

    // Java 的阻塞隊列
    protected static final PriorityQueue<String> p = new PriorityQueue<>(SIZE);

    // Java 的可重入鎖
    protected static ReentrantLock r = new ReentrantLock();

    // 創建兩個條件去解鎖
    protected static Condition con = r.newCondition();
    protected static Condition pro = r.newCondition();
}

消費者Consumer):

這裡我們只說明不同點,首先 1. 使用鎖不再是透過 synchronized 關鍵字,而是使用 lockunlock 操作,再來 2. 鎖的等待使用消費者自身的 Condition#await 方法

在消耗完元素後使用生產者的 Condition#signal 喚醒生產者


class rConsumer extends rCommonReentrant implements Runnable {

    @Override
    public void run() {
        while(true) {
            try {
                r.lock();

                while(p.size() == 0) {
                    System.out.println("Queue Empty can not Poll");
                    con.await();
                }

                //TimeUnit.SECONDS.sleep(1);

                p.poll();
                System.out.println("Poll Data " + p.size());

                // 緩醒生產者
                pro.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                r.unlock();
            }
        }
    }
}

生產者Producer):

這裡我們只說明不同點,首先 1. 使用鎖不再是透過 synchronized 關鍵字,而是使用 lockunlock 操作,再來 2. 鎖的等待使用生產者自身的 Condition#await 方法

在生產完元素後使用消費者的 Condition#signal 喚醒消費者


class rProducer extends rCommonReentrant implements Runnable {

    @Override
    public void run() {
        while(true) {

            try {
                r.lock();

                while(p.size() == 10) {
                     System.out.println("Queue Full can not offer anymore");
                     pro.await();	
                }
                
                // 故意拖延生產
                TimeUnit.SECONDS.sleep(1);

                p.offer("Hello");
                System.out.println("offer Data " + p.size());

                // 喚醒消費者
                con.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                r.unlock();
            }
        }
    }
}

● 測試自實現的生產者-消費者模型


public class reenBlocking {

    public static void main(String[] args) {
        rConsumer c = new rConsumer();
        rProducer p = new rProducer();

        new Thread(c).start();
        new Thread(p).start();
    }

}


更多的 Java 語言相關文章

Java 語言深入

● 在這個系列中,我們全方位地探討了 Java 語言的各個核心主題,旨在幫助你徹底掌握這門強大的編程語言。無論你是想深入理解 Java 的基礎類型與變數作用域,還是探索異常處理與運算子的細節,這些文章都將為您提供寶貴的知識

深入 Java 物件導向

● 探索 Java 物件導向的奧妙,掌握介面、抽象類、繼承等重要概念!幫助你針對物件導向設計有更深入的了解!


Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

發表迴響