我是山姆鍋

有時候在需要訊息傳遞的應用場景,因為需求簡單不想使用到像 RabbitMQ
這種企業級的訊息中介軟體 (middleware), 使用 Redis
雖然夠簡單但畢竟還是需要執行另一個進程 (process),總還是有殺雞用牛刀的感覺。
但是另一方面又有訊息不能遺失的需求,自己處理訊息持久性 (persistence) 實在麻煩, 畢竟訊息儲存還是要支援 ACID
特性才能號稱做到不遺失。

說到這,山姆鍋相信有些人已經想到使用內嵌式資料庫 (embedded
database) 來實現,例如 SQLite。 雖然山姆鍋也會用 SQLite
做其它用途,但本文山姆鍋使用的是另一個比較少人知道或使用的資料庫:LMDB。

對於懷疑使用資料庫作為訊息佇列是否恰當的人,可以參考
「為什麼使用資料庫當作訊息佇列不是問題?」
這篇文章。本文所討論的情況是訊息生產者與消費者都在同一個進程,透過 Event
機制來同步, 不會有忙碌迴圈 (busy waiting) 的問題。

LMDB

LMDB, 全名是 Lightning Memory-Mapped
Database,是一個內嵌式資料庫,可歸為 NoSQL 資料庫陣營。 LMDB 的主要特性:

  1. 支援交易 (transaction),確保資料持久性 (persistence)。
  2. 資料庫採記憶體映射 (memory-mapped):
    利用作業系統的虛擬記憶體管理作為緩衝 (buffer) 機制。
  3. 可以由多個進程 (process) 共同存取同一個資料庫。這點山姆鍋沒試過,但可作為單一主機進程間的通訊機制。

其它相關資訊,可以參考 LMDB 官方網站

Python 的 LMDB 綁定,以 lmdb
最為完整, 文件對大部份需求說明的也夠詳細。

基本上,LMDB 提供有序的 (ordered) key-value
資料庫,在同一個資料庫中記錄會依照鍵值 (keys) 排序。
這個特性很適合訊息佇列,訊息以時間為基礎的鍵值放入資料庫中,然後從最舊的訊息開始取出。

訊息佇列實作

底下程式片段是山姆鍋基於 LMDB 以及 gevent [1]
的簡單實作,只支援單一訊息接收端 (receiver)。
為什麼只支援單一接收端,這跟山姆鍋的需求有關就不詳述。

其中的 clock,請參考 用 Python
實作混合式邏輯時鐘

這篇文章。

佇列物件建構

要建構 LmdbQueue 物件,需要有 LMDB
的環境 (Environment) 物件,且為了讓同一個資料庫可以支援多個佇列 (queue),每個佇列使用一個前置詞 (prefix) 用來區分。
每個放入佇列中的訊息都會給予一個唯一的鍵值 (key),這個鍵值包涵前置詞以及目前的混合式邏輯時間,
確保不會有出現同一鍵值的情況發生。

訊息排序與存取

訊息的鍵值以 <prefix>'T'<timestamp> 這樣的格式組成,其中:

  1. <prefix> 是佇列的前置詞。
  2. ‘T’ 是介於 0x00 跟 0xff (255) 之間任意挑選的值,沒有特殊意義。
  3. <timestamp> 是訊息置入時的邏輯時間。

運用這樣格式的設計,就可以確保所有訊息的鍵值都會介於 `<prefix>0x00`
<prefix>0xff 之間。 程式碼中分別使用
begin_key 以及 end_key 來代表這兩個值。
藉由這兩個邊界值,我們就可以很方便的使用 set_range
這個方法找到佇列的第一個訊息,
需要的話也可以用來遍歷 (iterate) 佇列的所有訊息。

訊息格式轉換

訊息的鍵值使用 struct
這個套件來從多個資料項轉成精簡的格式。 訊息在放入佇列之前會經過
MessagePack
序列化 (serialized) 成 bytes; 在取出時會被反序列化成當初的資料型態。

交易 (事務) 處理

預設情況下,每個方法各自在獨立的交易中執行,如果需要讓多個操作在同一個交易執行的話,需要先產生
交易物件再傳入被呼叫的方法。

佇列物件方法

各個方法 (method) 的簡短說明如下:

  1. get:

    提供同步 (blocking) 的方式來取出佇列中的訊息,可以設定是否要同步,操作逾時 (timeout) 等。

  2. get_nowait:

    功能類似 get 方法,但沒有訊息的話會,不等待直接回傳
    None。

  3. put:

    允許訊息生產者 (producers) 把訊息放入佇列中。

  4. put_multi:

    允許一次放入多個訊息到佇列中。

測試用程式碼

使用 pytest 的測試案例 [2]

test_queue.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

from gevent import monkey
monkey.patch_all(thread=False) # 1

import gevent
import lmdb
import pytest

from ava.queue import LmdbQueue


@pytest.fixture
def db_env(tmpdir):
dir = tmpdir.mkdir("data")
_db_env = lmdb.Environment(dir.dirname)
return _db_env


class TestLmdbQueue(object):

def test_create_queue(self, db_env):
queue = LmdbQueue(db_env)

def test_get_and_put(self, db_env):
queue = LmdbQueue(db_env)
queue.put(2)
queue.put(3)
queue.put(1)

# queue.dump()
assert 2 == queue.get_nowait()
assert 3 == queue.get_nowait()
assert 1 == queue.get_nowait()
assert queue.get_nowait() is None

def test_get_with_timeout(self, db_env):
queue = LmdbQueue(db_env)

def producer():
gevent.sleep(0.5)
queue.put(1)

gevent.spawn(producer)
assert 1 == queue.get(timeout=1)

def test_get_indefinitely(self, db_env):
queue = LmdbQueue(db_env)

def producer():
gevent.sleep(0.5)
queue.put(1)

gevent.spawn(producer)
assert 1 == queue.get()

def test_put_with_transaction(self, db_env):
queue = LmdbQueue(db_env)

with db_env.begin(write=True) as txn:
queue.put(1, txn)
queue.put(1, txn)

assert 1 == queue.get_nowait()
assert 1 == queue.get_nowait()

with pytest.raises(RuntimeError):
with db_env.begin(write=True) as txn:
queue.put(2, txn)
queue.put(2, txn)
raise RuntimeError()

assert None == queue.get_nowait()

def test_queue_with_prefix(self, db_env):
queue1 = LmdbQueue(db_env, prefix=b'myqueue1')
queue2 = LmdbQueue(db_env, prefix=b'myqueue2')

with db_env.begin(write=True) as txn:
queue1.put(1, txn)
queue1.put(1, txn)
queue2.put(2, txn)
queue2.put(2, txn)

assert 1 == queue1.get_nowait()
assert 2 == queue2.get_nowait()
assert 1 == queue1.get_nowait()
assert 2 == queue2.get_nowait()

def test_put_multi(self, db_env):
queue = LmdbQueue(db_env)

items = [1, 2, '3']
with db_env.begin(write=True) as txn:
queue.put_multi(items, txn)

assert 1 == queue.get_nowait()
assert 2 == queue.get_nowait()
assert '3' == queue.get_nowait()

標記 1: gevent 透過 monkey patch
讓多數標準程式庫不用修改也可以支援並發 (concurrent) 執行。

結語

在一般稍微有點規模的分散式系統,通常會使用訊息中介軟體。如果訊息的遺失無關緊要,其實也就不用自找麻煩。
偏偏總會有些場合,尤其是在開發客戶端應用時,會需要先將要傳遞的訊息暫存在持久性儲存體,如硬碟。
但要確保訊息或其它資料的永久性,並沒有想像中的簡單,需要考慮到很多的例外狀況:
寫到一半系統當機怎麼處理?同時兩個執行緒在寫入怎麼處理?
也因為如此,SQLite 或者 LMDB 這種內嵌式資料庫,
對於需要支援資料永久性的客戶端應用來說,是不可少的工具。



  1. 應該也可以適用在多執行緒的程式,只是山姆鍋大多時候是使用 gevent。對於需要執行大量並發 (concurrent) 的 I/O 工作,又不想要非同步編程模式 (使用 callbacks) 複雜性的人,gevent 值得考慮。 ↩︎

  2. 以現在的標準來說,不寫測試程式會被歸類為不好的 (bad) 的工程師,山姆鍋自然要提供一下 :D。 ↩︎