如何實作簡單的持久性訊息佇列?
有時候在需要訊息傳遞的應用場景,因為需求簡單不想使用到像 RabbitMQ
這種企業級的訊息中介軟體 (middleware), 使用 Redis
雖然夠簡單但畢竟還是需要執行另一個進程 (process),總還是有殺雞用牛刀的感覺。
但是另一方面又有訊息不能遺失的需求,自己處理訊息持久性 (persistence) 實在麻煩, 畢竟訊息儲存還是要支援 ACID
特性才能號稱做到不遺失。
說到這,山姆鍋相信有些人已經想到使用內嵌式資料庫 (embedded
database) 來實現,例如 SQLite。 雖然山姆鍋也會用 SQLite
做其它用途,但本文山姆鍋使用的是另一個比較少人知道或使用的資料庫:LMDB。
對於懷疑使用資料庫作為訊息佇列是否恰當的人,可以參考
「為什麼使用資料庫當作訊息佇列不是問題?」
這篇文章。本文所討論的情況是訊息生產者與消費者都在同一個進程,透過 Event
機制來同步, 不會有忙碌迴圈 (busy waiting) 的問題。
LMDB
LMDB, 全名是 Lightning Memory-Mapped
Database,是一個內嵌式資料庫,可歸為 NoSQL 資料庫陣營。 LMDB 的主要特性:
- 支援交易 (transaction),確保資料持久性 (persistence)。
- 資料庫採記憶體映射 (memory-mapped):
利用作業系統的虛擬記憶體管理作為緩衝 (buffer) 機制。 - 可以由多個進程 (process) 共同存取同一個資料庫。這點山姆鍋沒試過,但可作為單一主機進程間的通訊機制。
其它相關資訊,可以參考 LMDB 官方網站 。
Python 的 LMDB 綁定,以 lmdb
最為完整, 文件對大部份需求說明的也夠詳細。
基本上,LMDB 提供有序的 (ordered) key-value
資料庫,在同一個資料庫中記錄會依照鍵值 (keys) 排序。
這個特性很適合訊息佇列,訊息以時間為基礎的鍵值放入資料庫中,然後從最舊的訊息開始取出。
訊息佇列實作
底下程式片段是山姆鍋基於 LMDB 以及 gevent [1]
的簡單實作,只支援單一訊息接收端 (receiver)。
為什麼只支援單一接收端,這跟山姆鍋的需求有關就不詳述。
其中的 clock,請參考 用 Python
實作混合式邏輯時鐘
這篇文章。
佇列物件建構
要建構 LmdbQueue 物件,需要有 LMDB
的環境 (Environment) 物件,且為了讓同一個資料庫可以支援多個佇列 (queue),每個佇列使用一個前置詞 (prefix) 用來區分。
每個放入佇列中的訊息都會給予一個唯一的鍵值 (key),這個鍵值包涵前置詞以及目前的混合式邏輯時間,
確保不會有出現同一鍵值的情況發生。
訊息排序與存取
訊息的鍵值以 <prefix>'T'<timestamp>
這樣的格式組成,其中:
- <prefix> 是佇列的前置詞。
- ‘T’ 是介於 0x00 跟 0xff (255) 之間任意挑選的值,沒有特殊意義。
- <timestamp> 是訊息置入時的邏輯時間。
運用這樣格式的設計,就可以確保所有訊息的鍵值都會介於 `<prefix>0x00`
與 <prefix>0xff
之間。 程式碼中分別使用
begin_key
以及 end_key
來代表這兩個值。
藉由這兩個邊界值,我們就可以很方便的使用 set_range
這個方法找到佇列的第一個訊息,
需要的話也可以用來遍歷 (iterate) 佇列的所有訊息。
訊息格式轉換
訊息的鍵值使用 struct
這個套件來從多個資料項轉成精簡的格式。 訊息在放入佇列之前會經過
MessagePack
序列化 (serialized) 成 bytes; 在取出時會被反序列化成當初的資料型態。
交易 (事務) 處理
預設情況下,每個方法各自在獨立的交易中執行,如果需要讓多個操作在同一個交易執行的話,需要先產生
交易物件再傳入被呼叫的方法。
佇列物件方法
各個方法 (method) 的簡短說明如下:
-
get:
提供同步 (blocking) 的方式來取出佇列中的訊息,可以設定是否要同步,操作逾時 (timeout) 等。
-
get_nowait:
功能類似
get
方法,但沒有訊息的話會,不等待直接回傳
None。 -
put:
允許訊息生產者 (producers) 把訊息放入佇列中。
-
put_multi:
允許一次放入多個訊息到佇列中。
測試用程式碼
使用 pytest 的測試案例 [2] :
1 | # -*- coding: utf-8 -*- |
標記 1: gevent 透過 monkey patch
讓多數標準程式庫不用修改也可以支援並發 (concurrent) 執行。
結語
在一般稍微有點規模的分散式系統,通常會使用訊息中介軟體。如果訊息的遺失無關緊要,其實也就不用自找麻煩。
偏偏總會有些場合,尤其是在開發客戶端應用時,會需要先將要傳遞的訊息暫存在持久性儲存體,如硬碟。
但要確保訊息或其它資料的永久性,並沒有想像中的簡單,需要考慮到很多的例外狀況:
寫到一半系統當機怎麼處理?同時兩個執行緒在寫入怎麼處理?
也因為如此,SQLite 或者 LMDB 這種內嵌式資料庫,
對於需要支援資料永久性的客戶端應用來說,是不可少的工具。