gq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2021 License: GPL-3.0 Imports: 11 Imported by: 0

README

gq

Go Reference GitHub go.mod Go version of a Go module Go Report Card

gq is a lightweight, scalable message queue backed by any of the most popular SQL DBs. It makes adding a scalable message queue to your SQL-backed application as easy as importing a module; no need to integrate and maintain extra infrastructure.

Supported SQL DBs backends

gq currently supports MySQL, Postgres, and CockroachDB.

Features

  • Flexible: Messages in gq are byte slices ([]byte), giving you the flexibility to marshal your message data to Protobuf, JSON, or any other binary encoding.
  • Scalable: gq employs techniques such as message batching and concurrent message processing to ensure it remains performant under high load.
  • Resilient: gq will automatically retry message pushing and processing a configurable number of times.
  • Configurable: gq Consumers and Producers are highly configurable, enabling you to balance throughput, latency and resource/network usage to achieve the desired tradeoffs for your use case.
  • Lightweight: gq is a small package, less than 1000 lines of code with few dependencies, making it a lightweight addition to your binary.

Definitions

  • Message queue: A First In, First Out (FIFO) queue which can be pushed to and popped from by multiple agents simultaneously.
  • Client: The Client maintains the connection to the database, and sets up the required database tables if they don't yet exist
  • Message: A Message is an ordinary byte slice ([]byte). This allows for maximum flexibility, as you can marshal data of any type into a byte slice using the encoding of your choice.
  • Producer: Producers are used to push messages onto the queue through their Push(message []byte) method.
  • Consumer: Consumers pops messages from the queue and process them using a user-provided callback of the form func process(message []byte) error.

User Guide

Usage
Creating a new Client

To create a new Client, call gq.NewClient(db *sql.DB, driverName string):

db, err := sql.Open("mysql", "user:password@...")
client, err := gq.NewClient(db, "mysql")
Creating a new Producer

To create a new Producer, call gq.Client.NewProducer(ctx context.Context):

producer, err := client.NewProducer(ctx)
Pushing messages

To push a message, marshal your message and then pass it to the Producer's Push method. Here's an example using a Protobuf encoding:

email := &pb.EmailMessage{
	To: "receiver@example.com",
	From: "sender@example.com",
	Body: "..."
}
msg, err := proto.Marshal(email)
producer.Push(msg)

Push is non-blocking, making it safe to use in your request handlers. Behind the scenes, Push sends the message onto a channel and returns. A group of message-pushing goroutines which start when the Producer is instantiated receives the message off the channel and pushes it onto the queue.

Creating a new Consumer

To create a new Consumer, call gq.Client.NewConsumer(ctx context.Context, process ProcessFunc):

sendEmail := func(message []byte) error {
	email := &pb.EmailMessage{}
	if err := proto.Unmarshal(message, email); err != nil {
		return fmt.Errorf("Failed to parse email: %s", err)
	}
	if err := SendEmail(email); err != nil {
		return fmt.Errorf("Failed to send email: %s", err)
	}
	return nil
}
consumer, err := client.NewConsumer(ctx, sendEmail)

The Consumer will start asynchronously pulling and processing messages immediately. Messages which return error from the process function will be requeued and retried a configurable number of times (3 by default).

Documentation

For detailed documentation, including more advanced Producer/Consumer configuration, refer to the go-docs.

Benchmarking against Redis

Results obtained using mq_benchmark

Throughput
❯ go run main.go redis false 10000 10000
2021/02/27 20:46:43 Begin redis test
2021/02/27 20:46:43 Received 10000 messages in 199.981995 ms
2021/02/27 20:46:43 Received 50004.500000 per second
2021/02/27 20:46:43 Sent 10000 messages in 200.138000 ms
2021/02/27 20:46:43 Sent 49965.523438 per second
2021/02/27 20:46:43 End redis test
❯ go run main.go gq false 10000 10000
2021/02/27 20:46:50 Begin gq test
2021/02/27 20:46:50 Sent 10000 messages in 14.238000 ms
2021/02/27 20:46:50 Sent 702345.812500 per second
2021/02/27 20:46:50 Received 10000 messages in 170.003998 ms
2021/02/27 20:46:50 Received 58822.144531 per second
2021/02/27 20:46:50 End gq test
Latency
❯ go run main.go redis true 10000 10000
2021/02/27 20:48:49 Begin redis test
2021/02/27 20:48:49 Sent 10000 messages in 66.870003 ms
2021/02/27 20:48:49 Sent 149543.890625 per second
2021/02/27 20:48:49 Mean latency for 10000 messages: 0.153254 ms
2021/02/27 20:48:49 End redis test
❯ go run main.go gq true 10000 10000
2021/02/27 20:48:36 Begin gq test
2021/02/27 20:48:36 Sent 10000 messages in 12.475000 ms
2021/02/27 20:48:36 Sent 801603.187500 per second
2021/02/27 20:48:36 Mean latency for 10000 messages: NaN ms
2021/02/27 20:48:36 End gq test

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a client of the message queue. It can be used to spawn any number of consumers or producers.

func NewClient

func NewClient(db *sql.DB, driverName string) (*Client, error)

NewClient creates a new Client, generating the database schema if it doesn't exist

func (Client) NewConsumer

func (c Client) NewConsumer(ctx context.Context, p ProcessFunc) (*Consumer, error)

NewConsumer creates a new gq Consumer. It begins pulling messages immediately, and passes each one to the supplied process function

func (Client) NewConsumerWithOptions added in v0.1.5

func (c Client) NewConsumerWithOptions(ctx context.Context, p ProcessFunc, opts ConsumerOptions) (*Consumer, error)

NewConsumerWithOptions creates a new gq Consumer with the supplied options.

func (Client) NewProducer

func (c Client) NewProducer(ctx context.Context) (*Producer, error)

NewProducer creates a new gq Producer

func (Client) NewProducerWithOptions added in v0.1.5

func (c Client) NewProducerWithOptions(ctx context.Context, opts ProducerOptions) (*Producer, error)

NewProducerWithOptions creates a new gq Producer with the supplied options

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a gq consumer

type ConsumerOptions

type ConsumerOptions struct {
	// PullPeriod is the period messages should be pulled at (default: 50ms).
	// This can be tuned to achieve the desired throughput/latency tradeoff
	PullPeriod time.Duration
	// MaxPullSize is the maximum number of messages to be pulled in one batch (default: 50)
	MaxBatchSize int
	// MaxProcessingRetries is the maximum number of times that a message will be requeued for re-processing after processing fails (default: 3)
	MaxProcessingRetries int
	// Concurrency is the number of concurrent goroutines to pull messages from (default: num cpus)
	Concurrency int
}

ConsumerOptions represents the options which can be used to tailor producer behaviour

type ProcessFunc

type ProcessFunc func(message []byte) error

ProcessFunc represents a function which is passed a message to process

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a message queue producer

func (*Producer) Push

func (p *Producer) Push(message []byte)

Push pushes a message onto the queue

type ProducerOptions

type ProducerOptions struct {
	// PushPeriod is the period with which messages should be pushed (default: 50ms).
	// This can be tuned to achieve the desired throughput/latency tradeoff
	PushPeriod time.Duration
	// MaxRetryPeriods is the maximum number of push periods to retry a batch of messages for before discarding them (default: 3)
	MaxRetryPeriods int
	// Concurrency is the number of concurrent goroutines to push messages from (default: num cpus)
	Concurrency int
}

ProducerOptions represents the options which can be used to tailor producer behaviour

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL