blockqueue

package module
v0.0.4-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

README

Block Queue

Block Queue is a lightweight and cost-effective queue messaging system with pub/sub mechanism for a cheap, robust, reliable, and durable messaging system.

Built on the sturdy foundations of SQLite3, NutsDB, and now supporting the Turso Database, Block Queue prioritizes efficiency by minimizing network latency and ensuring cost-effectiveness.

Block Queue now also supports PostgreSQL, expanding its versatility and compatibility.

Why BlockQueue

While Kafka, Redis, or SQS is an excellent product, it is quite complex and requires a lot of resources. My purpose is to build this BlockQueue for simplicity, low resources, and cheap.

Features

  • 💸 Cost-Effective: Designed with affordability in mind, Block Queue provides a budget-friendly solution for messaging needs.
  • 📢 Pub/Sub Mechanism: The inclusion of a publish/subscribe mechanism allows for easy communication and real-time updates.
  • 📶 Less Network Latency: Prioritizing efficiency, Block Queue minimizes network latency to persistence to enhance overall performance.

How to Install

Binary

You can read it on our wiki page at: https://github.com/yudhasubki/blockqueue/wiki/Welcome-to-BlockQueue

Driver

BlockQueue supports drivers using SQLite or Turso. You can define the driver in the config.yaml under the http.driver setting (either sqlite or turso).

Running on Go
go get -u github.com/yudhasubki/blockqueue

Using SQLite:

    // github.com/yudhasubki/blockqueue/pkg/sqlite or you can define your own
    sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
		BusyTimeout: cfg.SQLite.BusyTimeout,
	})
    if err != nil {
        return err
    }

    // github.com/yudhasubki/blockqueue/pkg/etcd or you can define your own
    etcd, err := etcd.New(
		cfg.Etcd.Path,
		etcd.WithSync(cfg.Etcd.Sync),
	)
    if err != nil {
        return err
    }

    stream := blockqueue.New(sqlite, etcd)
    err = stream.Run(ctx)
    if err != nil {
        return err
    }

Using Turso:

    // github.com/yudhasubki/blockqueue/pkg/sqlite or you can define your own
    sqlite, err := turso.New("libsql://dbname-username.turso.io?authToken=[TOKEN]")
    if err != nil {
        return err
    }

    // github.com/yudhasubki/blockqueue/pkg/etcd or you can define your own
    etcd, err := etcd.New(
		cfg.Etcd.Path,
		etcd.WithSync(cfg.Etcd.Sync),
	)
    if err != nil {
        return err
    }

    stream := blockqueue.New(sqlite, etcd)
    err = stream.Run(ctx)
    if err != nil {
        return err
    }

Architecture

Publish Architecture

Consumer Architecture

Failed Redelivery Architecture

How it works

Create Topic
curl --location 'http://your-host/topics' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cart",
    "subscribers": [
        {
            "name": "counter",
            "option": {
                "max_attempts": 5,
                "visibility_duration": "5m"
            }
        },
        {
            "name": "created",
            "option": {
                "max_attempts": 5,
                "visibility_duration": "5m"
            }
        }
    ]
}'
Subscriber Options
Key Value Description
max_attempts 1, 2, 3 max redeliver message
visibility_duration 5m, 6m, 1h if message not ack yet message, will send now() + visibility_duration
Create New Subscribers
curl --location 'http://your-host/topics/cart/subscribers' \
--header 'Content-Type: application/json' \
--data '[
    {
        "name": "counter",
        "option": {
            "max_attempts": 5,
            "visibility_duration": "5m"
        }
    }
]
'
Delete Subscriber
curl --location --request DELETE ''http://your-host/topics/{topic_name}/subscribers/{subscriber_name}'
Publish Message
curl --location 'http://your-host/topics/{topic_name}/messages' \
--header 'Content-Type: application/json' \
--data '{
    "message": "hi message from topic {topic_name}"
}'
Read Message

To read a message, you just need to pass the subscriber name into URL Path and with timeout. This ensures horizontal scalability and guarantees that the message is sent once.

curl --location 'http://your-host/topics/{topic_name}/subscribers/{subscriber_name}?timeout=5s'

Note: A message at-least-once message delivery.

Delete Message

After reading and successfully processing a message, you must delete it, as the message will persist based on queue retry policy on subscriber option.

curl --location --request DELETE 'http://your-host/topics/{topic_name}/subscribers/{subscriber_name}/messages/{message_id}'
Subscriber Message Status

If you want to check how many unpublished or unacked message, you can immediately hit this endpoint

curl --location 'localhost:8080/topics/{your_topic}/subscribers'

Benchmark

Macbook Pro M1, Apple Chip Memory 8GiB, here is some benchmarks:

Publish 10 Virtual Users
k6 run --vus=10 --duration=30s --summary-trend-stats="med,p(95),p(99.9)" publish.js

data_received..............: 187 MB  6.2 MB/s
data_sent..................: 217 MB  7.2 MB/s
http_req_blocked...........: med=0s    p(95)=1µs      p(99.9)=68µs
http_req_connecting........: med=0s    p(95)=0s       p(99.9)=0s
http_req_duration..........: med=135µs p(95)=437µs    p(99.9)=7.63ms
http_req_failed............: 100.00% ✓ 1142146      ✗ 0
http_req_receiving.........: med=4µs   p(95)=21µs     p(99.9)=1.62ms
http_req_sending...........: med=2µs   p(95)=10µs     p(99.9)=344µs
http_req_tls_handshaking...: med=0s    p(95)=0s       p(99.9)=0s
http_req_waiting...........: med=124µs p(95)=407µs    p(99.9)=5.48ms
http_reqs..................: 1142146 38067.366226/s
iteration_duration.........: med=162µs p(95)=507.66µs p(99.9)=11.55ms
iterations.................: 1142146 38067.366226/s
vus........................: 10      min=10         max=10
vus_max....................: 10      min=10         max=10

38K message per seconds.
Publish 100 Virtual Users
k6 run --vus=100 --duration=30s --summary-trend-stats="med,p(95),p(99.9)" publish.js

100 Virtual Users

data_received..............: 195 MB  6.5 MB/s
data_sent..................: 226 MB  7.5 MB/s
http_req_blocked...........: med=0s     p(95)=2µs    p(99.9)=111µs
http_req_connecting........: med=0s     p(95)=0s     p(99.9)=0s
http_req_duration..........: med=1.17ms p(95)=7.38ms p(99.9)=59.32ms
http_req_failed............: 100.00% ✓ 1186606      ✗ 0
http_req_receiving.........: med=5µs    p(95)=32µs   p(99.9)=10.6ms
http_req_sending...........: med=2µs    p(95)=14µs   p(99.9)=905.18µs
http_req_tls_handshaking...: med=0s     p(95)=0s     p(99.9)=0s
http_req_waiting...........: med=1.15ms p(95)=7.18ms p(99.9)=55.42ms
http_reqs..................: 1186606 39541.018601/s
iteration_duration.........: med=1.27ms p(95)=8.58ms p(99.9)=66.45ms
iterations.................: 1186606 39541.018601/s
vus........................: 100     min=100        max=100
vus_max....................: 100     min=100        max=100

39,5k message per seconds.

Roadmap

  • Protocol
    • HTTP
    • TCP
  • Metrics
  • WAL
  • SDK
    • Go
    • PHP
  • Perfomance Test

Acknowledgment

This package is inspired by the following:

License

The BlockQueue is open-sourced software licensed under the Apache 2.0 license.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrListenerShutdown             = errors.New("listener shutdown")
	ErrListenerNotFound             = errors.New("listener not found")
	ErrListenerDeleted              = errors.New("listener was deleted")
	ErrListenerRetryMessageNotFound = errors.New("error ack message. message_id not found")
)
View Source
var (
	ErrJobNotFound = errors.New("job not found")
)

Functions

This section is empty.

Types

type BlockQueue

type BlockQueue[V chan bqio.ResponseMessages] struct {
	// contains filtered or unexported fields
}

func New

func New[V chan bqio.ResponseMessages](db Driver, kv *etcd.Etcd) *BlockQueue[V]

func (*BlockQueue[V]) Ack

func (q *BlockQueue[V]) Ack(ctx context.Context, topic core.Topic, subscriberName, messageId string) error

func (*BlockQueue[V]) AddJob

func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error

func (*BlockQueue[V]) AddSubscriber

func (q *BlockQueue[V]) AddSubscriber(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error

func (*BlockQueue[V]) Close

func (q *BlockQueue[V]) Close()

func (*BlockQueue[V]) DeleteJob

func (q *BlockQueue[V]) DeleteJob(topic core.Topic) error

func (*BlockQueue[V]) DeleteSubscriber

func (q *BlockQueue[V]) DeleteSubscriber(ctx context.Context, topic core.Topic, subcriber string) error

func (*BlockQueue[V]) GetSubscribersStatus

func (q *BlockQueue[V]) GetSubscribersStatus(ctx context.Context, topic core.Topic) (bqio.SubscriberMessages, error)

func (*BlockQueue[V]) GetTopics

func (q *BlockQueue[V]) GetTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error)

func (*BlockQueue[V]) Publish

func (q *BlockQueue[V]) Publish(ctx context.Context, topic core.Topic, request bqio.Publish) error

func (*BlockQueue[V]) Read

func (q *BlockQueue[V]) Read(ctx context.Context, topic core.Topic, subscriber string) (bqio.ResponseMessages, error)

func (*BlockQueue[V]) Run

func (q *BlockQueue[V]) Run(ctx context.Context) error

type Driver

type Driver interface {
	Conn() *sqlx.DB
	Close() error
}

type Http

type Http struct {
	Stream *BlockQueue[chan io.ResponseMessages]
}

func (*Http) Router

func (h *Http) Router() http.Handler

type Job

type Job[V chan io.ResponseMessages] struct {
	Id   uuid.UUID
	Name string
	// contains filtered or unexported fields
}

type Listener

type Listener[V chan blockio.ResponseMessages] struct {
	Id            string
	JobId         string
	PriorityQueue *pqueue.PriorityQueue[V]
	// contains filtered or unexported fields
}

Directories

Path Synopsis
cmd
example
pkg
cas
io

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL