Delta
Delta is a message queue backed by SQLite for persistence. It provides a simple and efficient way to handle message queuing with the durability and reliability of SQLite.
It is somewhat inspired by nats.io, but is single instance and persistence from the start.
Features
- Persistence: Messages are stored in an SQLite database, ensuring durability.
- Pub/Sub: Supports publish/subscribe messaging pattern.
- Queue: Supports message queuing with load balancing.
- Request/Reply: Supports request/reply messaging pattern.
- Multiple Streams: Allows creating multiple streams for different namespaces.
- Globs: Supports subscribing on a Glob for pattern-based subscriptions.
Installation
To install Delta, use go get
:
go get github.com/modfin/delta
Usage
Creating a Message Queue Instance
package main
import (
"github.com/modfin/delta"
"log/slog"
)
func main() {
mq, err := delta.New("file:delta.db", delta.WithLogger(slog.Default()))
if err != nil {
panic(err)
}
defer mq.Close()
}
Pub/Sub
The publish/subscribe messaging pattern allows multiple subscribers to receive messages from a topic, "One to many" or broadcasting
Publishing Messages
Publish a message to a specific topic using the Publish
method. No need to create the topic on beforehand or declare it.
pub, err := mq.Publish("a.b.c.d", []byte("hello world"))
if err != nil {
panic(err)
}
Subscribing to Messages
Subscribe to a specific topic using the Subscribe
method. The subscription can use a glob pattern to match multiple topics.
Example might be location.*
will match location.us
, location.eu
etc. but not location.us.new-york
. For this we
can use **
which basically translates to prefix match, eg location.**
will match location.us
, location.us.new-york
, location.us.new-york.new-york-city
and so on.
This can be done using a chan or by using the Next
method which will block until a message is received or subscription is closed .
sub, err := mq.Subscribe("a.b.*.d")
if err != nil {
panic(err)
}
go func() {
for msg := range sub.Chan() { // or 'msg, ok := sub.Next()' can be used
fmt.Printf("[Chan] Received message: %s <- %s\n", msg.Topic, string(msg.Payload))
}
}()
Using Queues
A queue is a group of subscribers that receive messages from a topic. Each message is delivered to only one subscriber.
It does not put any constraint how things are being published to the queue, it is only the consumption that is load balanced.
for i := 0; i < 3; i++ {
i:= i
sub, err := mq.Queue("a.b.*.d", "queue1") // queue1 is the name of the queue which is used to identify it
if err != nil {
panic(err)
}
go func() {
for msg := range sub.Chan() {
fmt.Printf("Received message from queue by worker %d: %s\n",
i,
string(msg.Payload),
)
}
}()
}
Request/Reply
The request/reply messaging pattern allows a client to send a request to a service and receive a response.
// test is the name of the queue which is used to identify it
// it is important to use a queue group to ensure that the request is
//not hadled by multiple workers
for i := 0; i < 3; i++ {
i:= i
sub, err := mq.Queue("greet.*", "test")
if err != nil {
panic(err)
}
go func() {
for msg := range sub.Chan() {
_, name, _ := strings.Cut(msg.Topic, ".")
_, err := msg.Reply([]byte(fmt.Sprintf("from worker %d > hello %s, ", i, name))
if err != nil {
panic(err)
}
}
}()
}
resp, err := mq.Request(context.Background(), "greet.alice", nil)
if err != nil {
panic(err)
}
msg, ok := resp.Next()
if ok {
fmt.Printf("Received reply: %s\n", string(msg.Payload))
}
Subscription From
The SubscribeFrom
method allows you to subscribe to messages from a specific topic starting from a given historical time. This is useful when you want to process messages that were published after a certain point in time or you want to re-process messages.
Example Usage
// Publish some messages
for i := 0; i < 10; i++ {
payload := []byte("message " + strconv.Itoa(i))
_, err := mq.Publish("example.topic", payload)
if err != nil {
panic(err)
}
}
// Subscribe from a specific time
from := time.Now()
// Publish more messages
for i := 10; i < 20; i++ {
payload := []byte("message " + strconv.Itoa(i))
_, err := mq.Publish("example.topic", payload)
if err != nil {
panic(err)
}
}
sub, err := mq.SubscribeFrom("example.topic", from)
if err != nil {
panic(err)
}
// Read messages from the subscription
for i := 10; i < 20; i++ {
msg, ok := sub.Next()
if !ok {
panic("failed to read message")
}
fmt.Println("Received historic message:", string(msg.Payload))
}
Benchmarks
Some benchmarks, but remember, generic benchmarks are shit :) Anyway, it seems it performs decently.
It fans out with very little overhead and seems and writes per second seems to be pretty stable (since its singe threaded.).
// Performed with waking up readers, which takes a performance hit on writers.
BenchmarkMultipleSubscribers/1-22 8_242 read-msg/s 9_196 write-msg/s
BenchmarkMultipleSubscribers/4-22 31_810 read-msg/s 8_037 write-msg/s
BenchmarkMultipleSubscribers/num-cpu_(22)-22 212_433 read-msg/s 9_774 write-msg/s
BenchmarkMultipleSubscribers/2x_num-cpu_(44)-22 411_696 read-msg/s 9_631 write-msg/s
BenchmarkMultipleSubscribersSize/_0.1mb-22 1_313 read-MB/s 13_130 read-msg/s 59.7 write-MB/s 597.3 write-msg/s
BenchmarkMultipleSubscribersSize/_1.0mb-22 3_517 read-MB/s 3_517 read-msg/s 160.6 write-MB/s 160.6 write-msg/s
BenchmarkMultipleSubscribersSize/10.0mb-22 3_856 read-MB/s 386 read-msg/s 184.5 write-MB/s 18.5 write-msg/s
License
This project is licensed under the MIT License. See the LICENSE
file for details.