delta

package module
v0.0.0-...-4c637e6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2024 License: MIT Imports: 17 Imported by: 0

README

Delta

goreportcard.com PkgGoDev

Delta is a message queue backed by SQLite for persistence. It provides a simple and efficient way to handle message queuing with the durability and reliability of SQLite.

It is somewhat inspired by nats.io, but is single instance and persistence from the start.

Features

  • Persistence: Messages are stored in an SQLite database, ensuring durability.
  • Pub/Sub: Supports publish/subscribe messaging pattern.
  • Queue: Supports message queuing with load balancing.
  • Request/Reply: Supports request/reply messaging pattern.
  • Multiple Streams: Allows creating multiple streams for different namespaces.
  • Globs: Supports subscribing on a Glob for pattern-based subscriptions.

Installation

To install Delta, use go get:

go get github.com/modfin/delta

Usage

Creating a Message Queue Instance
package main

import (
	"github.com/modfin/delta"
	"log/slog"
)

func main() {
	mq, err := delta.New("file:delta.db", delta.WithLogger(slog.Default()))
	if err != nil {
		panic(err)
	}
	defer mq.Close()
}
Pub/Sub

The publish/subscribe messaging pattern allows multiple subscribers to receive messages from a topic, "One to many" or broadcasting

Publishing Messages

Publish a message to a specific topic using the Publish method. No need to create the topic on beforehand or declare it.

pub, err := mq.Publish("a.b.c.d", []byte("hello world"))
if err != nil {
    panic(err)
}
Subscribing to Messages

Subscribe to a specific topic using the Subscribe method. The subscription can use a glob pattern to match multiple topics. Example might be location.* will match location.us, location.eu etc. but not location.us.new-york. For this we can use ** which basically translates to prefix match, eg location.** will match location.us, location.us.new-york, location.us.new-york.new-york-city and so on.

This can be done using a chan or by using the Next method which will block until a message is received or subscription is closed .

sub, err := mq.Subscribe("a.b.*.d")
if err != nil {
    panic(err)
}

go func() {
    for msg := range sub.Chan() { // or 'msg, ok := sub.Next()' can be used
        fmt.Printf("[Chan] Received message: %s <- %s\n", msg.Topic, string(msg.Payload))
    }
}()


Using Queues

A queue is a group of subscribers that receive messages from a topic. Each message is delivered to only one subscriber. It does not put any constraint how things are being published to the queue, it is only the consumption that is load balanced.


for i := 0; i < 3; i++ {
	i:= i
    sub, err := mq.Queue("a.b.*.d", "queue1") // queue1 is the name of the queue which is used to identify it
    if err != nil {
        panic(err)
    }

    go func() {
        for msg := range sub.Chan() {
            fmt.Printf("Received message from queue by worker %d: %s\n", 
				i, 
				string(msg.Payload),
			)
        }
    }()
}

Request/Reply

The request/reply messaging pattern allows a client to send a request to a service and receive a response.


// test is the name of the queue which is used to identify it
// it is important to use a queue group to ensure that the request is 
//not hadled by multiple workers
for i := 0; i < 3; i++ {
    i:= i
    sub, err := mq.Queue("greet.*", "test") 
    if err != nil {
        panic(err)
    }
    go func() {
        for msg := range sub.Chan() {
            _, name, _ := strings.Cut(msg.Topic, ".")
            _, err := msg.Reply([]byte(fmt.Sprintf("from worker %d > hello %s, ", i, name))
            if err != nil {
                panic(err)
            }
        }
    }()
}


resp, err := mq.Request(context.Background(), "greet.alice", nil)
if err != nil {
    panic(err)
}
msg, ok := resp.Next()
if ok {
    fmt.Printf("Received reply: %s\n", string(msg.Payload))
}
Subscription From

The SubscribeFrom method allows you to subscribe to messages from a specific topic starting from a given historical time. This is useful when you want to process messages that were published after a certain point in time or you want to re-process messages.

Example Usage

// Publish some messages
for i := 0; i < 10; i++ {
    payload := []byte("message " + strconv.Itoa(i))
    _, err := mq.Publish("example.topic", payload)
    if err != nil {
        panic(err)
    }
}

// Subscribe from a specific time
from := time.Now()

// Publish more messages
for i := 10; i < 20; i++ {
    payload := []byte("message " + strconv.Itoa(i))
    _, err := mq.Publish("example.topic", payload)
    if err != nil {
        panic(err)
    }
}

sub, err := mq.SubscribeFrom("example.topic", from)
if err != nil {
    panic(err)
}
// Read messages from the subscription
for i := 10; i < 20; i++ {
    msg, ok := sub.Next()
    if !ok {
        panic("failed to read message")
    }
    fmt.Println("Received historic message:", string(msg.Payload))
}

Benchmarks

Some benchmarks, but remember, generic benchmarks are shit :) Anyway, it seems it performs decently. It fans out with very little overhead and seems and writes per second seems to be pretty stable (since its singe threaded.).


// Performed with waking up readers, which takes a performance hit on writers.

BenchmarkMultipleSubscribers/1-22                     8_242 read-msg/s    9_196 write-msg/s
BenchmarkMultipleSubscribers/4-22                    31_810 read-msg/s    8_037 write-msg/s
BenchmarkMultipleSubscribers/num-cpu_(22)-22        212_433 read-msg/s    9_774 write-msg/s
BenchmarkMultipleSubscribers/2x_num-cpu_(44)-22     411_696 read-msg/s    9_631 write-msg/s


BenchmarkMultipleSubscribersSize/_0.1mb-22   1_313 read-MB/s     13_130 read-msg/s     59.7 write-MB/s     597.3 write-msg/s
BenchmarkMultipleSubscribersSize/_1.0mb-22   3_517 read-MB/s      3_517 read-msg/s    160.6 write-MB/s     160.6 write-msg/s
BenchmarkMultipleSubscribersSize/10.0mb-22   3_856 read-MB/s        386 read-msg/s    184.5 write-MB/s      18.5 write-msg/s

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	OptimizeLatency = iota
	OptimizeThroughput
)
View Source
const DEFAULT_STREAM = "default"

Variables

This section is empty.

Functions

func RemoveStore

func RemoveStore(uri string, logger *slog.Logger) error

func URIFromPath

func URIFromPath(path string) string

func URITemp

func URITemp() string

Types

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

func New

func New(uri string, op ...Op) (*MQ, error)

func (*MQ) Close

func (c *MQ) Close() error

Close closes the cache and all its namespaces

func (*MQ) CurrentStream

func (c *MQ) CurrentStream() string

func (*MQ) Publish

func (mq *MQ) Publish(topic string, payload []byte) (*Publication, error)

func (*MQ) PublishAsync

func (mq *MQ) PublishAsync(topic string, payload []byte) *Publication

func (*MQ) Queue

func (mq *MQ) Queue(topic string, key string) (*Subscription, error)

func (*MQ) Request

func (mq *MQ) Request(ctx context.Context, topic string, payload []byte) (*Subscription, error)

func (*MQ) Stream

func (c *MQ) Stream(stream string, ops ...Op) (*MQ, error)

func (*MQ) Subscribe

func (mq *MQ) Subscribe(topic string) (*Subscription, error)

func (*MQ) SubscribeFrom

func (mq *MQ) SubscribeFrom(topic string, from time.Time) (*Subscription, error)

type Msg

type Msg struct {
	MessageId uint64
	Topic     string
	Payload   []byte
	At        time.Time
	// contains filtered or unexported fields
}

func (*Msg) Ack

func (m *Msg) Ack() error

func (*Msg) Reply

func (m *Msg) Reply(payload []byte) (Msg, error)

type Op

type Op func(*MQ) error

func DBRemoveOnClose

func DBRemoveOnClose() Op

DBRemoveOnClose is a helper function to remove the database files on close

func DBSyncOff

func DBSyncOff() Op

DBSyncOff is a helper function to set

synchronous = off

this is useful for write performance but effects read performance and durability

func WithLogger

func WithLogger(log *slog.Logger) Op

WithLogger sets the logger for the cache

type Publication

type Publication struct {
	Msg
	Err error
	// contains filtered or unexported fields
}

func (*Publication) Done

func (p *Publication) Done() <-chan struct{}

type Subscription

type Subscription struct {
	Unsubscribe func()
	// contains filtered or unexported fields
}

func (*Subscription) Chan

func (s *Subscription) Chan() <-chan Msg

func (*Subscription) Id

func (s *Subscription) Id() string

func (*Subscription) Next

func (s *Subscription) Next() (Msg, bool)

func (*Subscription) Topic

func (s *Subscription) Topic() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL