image (pic: source from here)

以上的圖是利用Redis 達到 Message Queue 的方式,也是Disque要達到的事情.

##前言

這一兩個月,比較常聽到大家討論的就是Disque的使用方式與疑問.本來我對於Message Queue的系統(尤其是backend那一塊)比較不熟.於是還是花了一點時間把Disque裝起來,並且把sample code跑了一下.希望對於基本架構有一些了解.

##關於Disque

DisqueRedis原作者Salvatore Sanfilippo根據大家在Redis上面針對Message Queue處理的部份來加強,並且下去拿Redis的source code加以修改,改造出這套專門處理Message Queue的系統.

主要特色如下: (參考這裡)

  • 消息發送可以選擇至少一次或者最多一次。
  • 消息需要消費者確認。
  • 如果沒有確認,會一直重發,直至到期。確認信息會廣播給擁有消息副本的所有結點,然後消息會被垃圾收集或者刪除。
  • 隊列是持久的。
  • Disque默認只運行在內存裡,持久性是通過同步備份實現的。
  • 隊列為了保證最大吞吐量,不是全局一致的,但會儘力提供排序。
  • 在壓力大的時候,消息不會丟棄,但會拒絶新的消息。
  • 消費者和生產者可以通過命令查看隊列中的消息。
  • 隊列盡力提供FIFO。
  • 一組master作為中介,客戶端可以與任一結點通信。
  • 中介有命名的隊列,無需消費者和生產者干預。
  • 消息發送是事務性的,保證集群中會有所需數量的副本。
  • 消息接收不是事務性的。
  • 消費者默認是接收時是阻塞的,但也可以選擇查看新消息。
  • 生產者在隊列滿時發新消息可以得到錯誤信息,也可以讓集群非同步地複製消息。
  • 支持延遲作業,粒度是秒,最久可以長達數年。但需要消耗內存。
  • 消費者和生產者可以連接不同的結點。

###優缺點與比較:

優點其實蠻容易被瞭解的:

  • 容易安裝使用,而且小.本身就有資料庫(類似 redis)
  • 當不需要太複雜的傳輸格式與介面的時候,disque效能應該不差 (base on Redis v.s. RabbitMQ)

幾個缺點,可能需要注意:

  • disque 目前還是alpha
  • disque 目前只有單線程

關於與Kafka的比較,Salvatore Sanfilippo在他的推特有以下有趣的回應:

    Salvatore Sanfilippo: Disque VS Kafka is the new Redis VS PosgreSQL which was the new Apple VS Orange :-)

##安裝與使用

###安裝與使用Disque

  • git clone https://github.com/antirez/disque
  • make
  • make test (make sure port and compiling result success.)
  • cd src
  • run disque-server

###簡單的操作

當你跑disque之後,就會自動連接local disque server.

    //Add a job by producer
    ADDJOB job1 "This is sample job1" 0   #create job1 with comment
    ->DI3b2954204f8f86168198266221515fb011a1eea005a0SQ #server response task id.
    
    //Get a job by consumer
    GETJOB from job1
    ->1) 1) "job1"
    ->2) "DI3b2954204f8f86168198266221515fb011a1eea005a0SQ"
    ->3) "This is sample job1"

透過Golang 來測試job-queue

主要是使用go-disque的source code 來當作job-queue 的機制.

以下有三段程式碼,分別是兩個worker跟一個發送者,發送者(disque-enque)會發送兩個訊息排程給disque,當worker上線或是開始抓取訊息後,就會執行該訊息定義的事項.

主要程式碼並沒有做太大修改:

Worker(Consumer 消費者): Downloader : 會下載指定網頁的資料

    package main
    
    import (
    	"github.com/EverythingMe/go-disque/tasque"
    
    	"crypto/md5"
    	"fmt"
    	"io"
    	"net/http"
    	"os"
    	// "time"
    )
    
    // Step 1: Define a handler that has a unique name
    var Downloader = tasque.FuncHandler(func(t *tasque.Task) error {
    
    	u := t.Params["url"].(string)
    	res, err := http.Get(u)
    	if err != nil {
    		return err
    	}
    	defer res.Body.Close()
    
    	fp, err := os.Create(fmt.Sprintf("/tmp/%x", md5.Sum([]byte(u))))
    	if err != nil {
    		return err
    	}
    	defer fp.Close()
    
    	if _, err := io.Copy(fp, res.Body); err != nil {
    		return err
    	}
    	fmt.Printf("Downloaded %s successfully\n", u)
    
    	return nil
    
    }, "download")
    
    // Step 2: Registering the handler and starting a Worker
    
    func main() {
    
    	// Worker with 10 concurrent goroutines. In real life scenarios set this to much higher values...
    	worker := tasque.NewWorker(10, "127.0.0.1:7711")
    
    	// register our downloader
    	worker.Handle(Downloader)
    
    	// Run the worker
    	worker.Run()
    
    }

Worker(Consumer 消費者): foo : 顯示指定字串

    package main
    
    import (
    	"fmt"
    	"github.com/EverythingMe/go-disque/tasque"
    )
    
    // Step 1: Define a handler that has a unique name
    var fooWorker = tasque.FuncHandler(func(t *tasque.Task) error {
    	u := t.Params["text"].(string)
    	fmt.Printf("foo worker runs, param is %s\n", u)
    	return nil
    
    }, "foo")
    
    // Step 2: Registering the handler and starting a Worker
    
    func main() {
    
    	// Worker with 10 concurrent goroutines. In real life scenarios set this to much higher values...
    	worker := tasque.NewWorker(10, "127.0.0.1:7711")
    
    	// register our downloader
    	worker.Handle(fooWorker)
    
    	// Run the worker
    	worker.Run()
    
    }

最後就是把工作派發出去的Enque (角色上被稱為是製造者 Producer)

package main

import (
	"fmt"
	"github.com/EverythingMe/go-disque/tasque"
	"time"
)

func main() {

	client := tasque.NewClient(5*time.Second, "127.0.0.1:7711")
	task := tasque.NewTask("download").Set("url", "http://google.com")
	err := client.Do(task)
	if err != nil {
		panic(err)
	}
	fmt.Println("First work queue run..")

	client = tasque.NewClient(5*time.Second, "127.0.0.1:7711")
	task = tasque.NewTask("foo").Set("text", "I am the kind of world.")
	err = client.Do(task)
	if err != nil {
		panic(err)
	}
	fmt.Println("Second work queue run..")
}

##心得

Disque善用了Redis的特性,並且幫大家把一些基本功能都勾勒出來.簡單講就是看到太多人把Messagq Queue弄在上面,原作者才會這樣改.

不過由於我本身用到的地方比較少,我相信有機會要使用的時後應該會更容易上手才對.

##相關文章


Buy Me A Coffee

Evan

Attitude is everything