swarm

package module
v0.20.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2024 License: Apache-2.0 Imports: 10 Imported by: 26

README

swarm

Go channels for distributed queueing and event-driven systems

sub-moduledocfeaturesabout
API & Kernel
AWS EventBridge
AWS DynamoDB Stream
AWS S3 Event
AWS SQS Events
AWS SQS and SQS FIFO
AWS WebSocket API
AWS SNS
AWS Kinesis Events
AWS Kinesis
AWS ElastiCache
MQTT


Today's wrong abstractions lead to complexity on maintainability in the future. Usage of synchronous interfaces to reflect asynchronous nature of messaging queues is a good example of inaccurate abstraction. Usage of pure Go channels is a proper solution to distills asynchronous semantic of queueing systems into the idiomatic native Golang code. The library adapts Go channels for various systems and interface. Please let us know via GitHub issues your needs about queuing technologies.

Inspiration

The library encourages developers to use Golang struct for asynchronous communication with peers. It helps engineers to define domain models, write correct, maintainable code. This library (swarm) uses generic programming style to abstract queueing systems into the idiomatic Golang channels chan<- T and <-chan T. See the design pattern Golang channels for distributed event-driven architecture to learn philosophy and use-cases:

  1. readability: application uses pure Go code instead of vendor specific interfaces (learning time)
  2. portability: application is portable between various queuing systems or event brokers in same manner as sockets abstracts networking stacks (exchange queueing transport "on-the-fly" to resolve evolution of requirements)
  3. testability: unit testing focuses on pure biz logic, simplify dependency injections and mocking (pure unit tests).
  4. distribution: idiomatic architecture to build distributed topologies and scale-out Golang applications (clustering).
  5. serverless: of-the-shelf portable patterns for serverless applications (infrastructure as a code, aws cdk).

Getting started

The library requires Go 1.18 or later due to usage of generics.

The latest version of the library is available at main branch of this repository. All development, including new features and bug fixes, take place on the main branch using forking and pull requests as described in contribution guidelines. The stable version is available via Golang modules.

Use go get to retrieve the library and add it as dependency to your application.

go get -u github.com/fogfish/swarm

Quick example

Example below is most simplest illustration of enqueuing and dequeuing message from AWS SQS.

package main

import (
  "log/slog"

  "github.com/fogfish/swarm"
  "github.com/fogfish/swarm/broker/sqs"
  "github.com/fogfish/swarm/enqueue"
  "github.com/fogfish/swarm/dequeue"
)

func main() {
  // create broker for AWS SQS
  q, err := sqs.New("aws-sqs-queue-name")
	if err != nil {
		slog.Error("sqs broker has failed", "err", err)
		return
	}

  // create Golang channels
  rcv, ack := dequeue.Typed[string](q)
  out := swarm.LogDeadLetters(enqueue.Typed[string](q))

  // use Golang channels for I/O
  go func() {
    for msg := range rcv {
      out <- msg.Object
      ack <- msg
    }
  }()

  q.Await()
}

Check the design pattern Distributed event-driven Golang channels for deep-dive into library philosophy. Also note, each supported broker comes with runnable examples that shows the library.

Produce (enqueue) messages

The following code snippet shows a typical flow of producing the messages using the library.

import (
  "github.com/fogfish/swarm/broker/sqs"
  "github.com/fogfish/swarm/enqueue"
)

// Use pure Golang struct to define semantic of messages and events
type Note struct {
  ID   string `json:"id"`
  Text string `json:"text"`
}

// Spawn a new instance of the messaging broker
q, err := sqs.New("name-of-the-queue"), /* config options */)

// creates pair Golang channels dedicated for publishing
// messages of type [Note] through the messaging broker. The first channel
// is dedicated to emit messages. The second one is the dead letter queue that
// contains failed transmissions. 
enq, dlq := enqueue.Typed[Note](q)

// Enqueue message of type Note
enq <- Note{ID: "note", Text: "some text"}

// Close the broker and release all resources
q.Close()

Consume (dequeue) messages

The following code snippet shows a typical flow of consuming the messages using the library.

import (
  "github.com/fogfish/swarm/broker/sqs"
  "github.com/fogfish/swarm/dequeue"
)

// Use pure Golang struct to define semantic of messages and events
type Note struct {
  ID   string `json:"id"`
  Text string `json:"text"`
}

// Spawn a new instance of the messaging broker
q, err := sqs.New("name-of-the-queue", /* config options */)

// Create pair Golang channels dedicated for consuming
// messages of type Note from the messaging broker. The first channel
// is dedicated to receive messages. The second one is the channel to
// acknowledge consumption  
deq, ack := dequeue.Typed[Note](q)

// consume messages and then acknowledge it
go func() {
  for msg := range deq {
    /* ... do something with msg.Object and ack the message ...*/
    ack <- msg
  }
}()

// Await messages from the broker
q.Await()

Configure library behavior

The library uses "option pattern" for the configuration, which is divided into two parts: a generic I/O kernel configuration and a broker-specific configuration. Please note that each configuration option is prefixed with With and implemented in config.go files.

q, err := sqs.New("name-of-the-queue",
  // WithXXX performs broker configuration
  sqs.WithBatchSize(5),
  // WithConfig performs generic kernel configuration
  sqs.WithConfig(
    swarm.WithSource("name-of-my-component"),
    swarm.WithRetryConstant(10 * time.Millisecond, 3),
    swarm.WithPollFrequency(10 * time.Second),
    /* ... */
  ),
)

Message Delivery Guarantees

Usage of Golang channels as an abstraction raises a concern about grade of service on the message delivery guarantees. The library ensures exactly same grade of service as the underlying queueing system or event broker. Messages are delivered according to the promise once they are accepted by the remote side of queuing system. The library's built-in retry logic protects losses from temporary unavailability of the remote peer. However, Golang channels function as sophisticated "in-memory buffers," which can introduce a delay of a few milliseconds between scheduling a message to the channel and dispatching it to the remote peer. To handle catastrophic failures, choose one of the following policies to either accept or safeguard in-flight messages from potential loss.

At Most Once is best effort policy, where a message is published without any formal acknowledgement of receipt, and it isn't replayed. Some messages can be lost as subscribers are not required to acknowledge receipt.

The library implements asymmetric approaches for message handling. In the enqueue path, buffered Golang channels are used for both message emission and managing dead-letter queues. Similarly, the dequeue path uses buffered Golang channels to deliver messages to the consumer.

// Spawn a new instance of the messaging broker using At Most Once policy.
// The policy defines the capacity of Golang channel.
q, err := sqs.New("name-of-the-queue",
  swarm.WithPolicyAtMostOnce(1000),
)

// for compatibility reasons two channels are returned on the enqueue path but
// dead-letter-queue is nil
enq, dlq := enqueue.Typed[Note](q)

// for compatibility reasons two channels are returned on the dequeue path but
// ack channel acts as /dev/null discards any sent message
deq, ack := dequeue.Typed[Note](q)

At Least Once is the default policy used by the library. The policy assume usage of "acknowledgement" protocol, which guarantees a message will be re-sent until it is formally acknowledged by a recipient. Messages should never be lost but it might be delivered more than once causing duplicate work to consumer.

The library also implements asymmetric approaches for message handling. In the enqueue path, unbuffered Golang channels are used to emit messages and manage the dead-letter queue, resulting in a delayed guarantee. This means that enqueuing additional messages is blocked until the dead-letter queue is fully resolved. Alternatively, the application can opt for a synchronous protocol to enqueue messages.

In the dequeue path, buffered Golang channels are used to deliver messages to the consumer and acknowledge their processing. While consumer acknowledgment ensures reliable message delivery, it may lead to message duplication.

// Spawn a new instance of the messaging broker using At Least Once policy.
// At Least Once policy is the default one, no needs to explicitly declare it.
// Use it only if you need to define other capacity for dequeue channel than
// the default one, which creates unbuffered channel
q, err := sqs.New("name-of-the-queue",
  swarm.WithPolicyAtLeastOnce(1000),
)

// both channels are unbuffered
enq, dlq := enqueue.Typed[Note](q)

// buffered channels of capacity n
deq, ack := dequeue.Typed[Note](q)

Exactly Once is not supported by the library yet.

Delayed Guarantee vs Guarantee

Usage of At Least Once policy (unbuffered channels) provides the delayed guarantee for producers. Let's consider the following example. If queue broker fails to send message A then the channel enq is blocked at sending message B until the program consumes message A from the dead-letter queue channel.

enq, dlq := enqueue.Typed[User](q)

enq <- User{ID: "A", Text: "some text by A"} // failed to send
enq <- User{ID: "B", Text: "some text by B"} // blocked until dlq is processed 
enq <- User{ID: "C", Text: "some text by C"}

The delayed guarantee is efficient on batch processing, pipelining but might cause complication at transactional processing. Therefore, the library also support a synchronous variant to producing a message:

// Creates "synchronous" variant of the queue
user := enqueue.NewTyped[User](q)

// Synchronously enqueue the message. It ensure that message is scheduled for
// delivery to remote peer once function successfully returns.
if err := user.Enq(context.Background(), &User{ID: "A", Text: "some text by A"}); err != nil {
  // handle error
}

Order of Messages

The library guarantee ordering of the messages when they are produced over same Golang channel. Let's consider a following example:

user, _ := enqueue.Typed[User](q)
note, _ := enqueue.Typed[Note](q)

user <- &User{ID: "A", Text: "some text by A"}
note <- &Note{ID: "B", Text: "some note A"}
user <- &User{ID: "C", Text: "some text by A"}

The library guarantees following clauses A before C and C after A because both messages are produced to single channel user. It do not guarantee clauses A before B, B before C or C after B because multiple channels are used.

The library does not provide any higher guarantee than underlying message broker. For example, using SQS would not guarantee any ordering while SQS FIFO makes sure that messages of same type is ordered.

Octet Streams

The library support slices of bytes []byte as message type. It opens an opportunity for the many encoding options like JSON, Gob, etc.

import (
  queue "github.com/fogfish/swarm/queue/bytes"
)

enq, dlq := enqueue.Bytes(q, "Note")
deq, ack := enqueue.Bytes(q, "Note")

Please see example about binary producer and consumer.

Generic events

An event represents an immutable fact placed into the queuing system. It is conceptually similar to the Action defined by schema.org.

An action performed by a direct agent and indirect participants upon a direct object.

This type facilitates the development of event-driven solutions that treat data as a collection of immutable facts, which can be queried and processed in real-time. These applications process a logical event log, where each event represents a change to the current state of an object, such as which attributes were inserted, updated, or deleted (essentially a diff). Each event uniquely identifies the affected object using a unique identifier.

Unlike other solutions, this approach does not use an envelope for events. Instead, it pairs metadata and data side by side, making it more extendable.


type Meta struct {
  swarm.Meta
  About string `json:"about"`
}

type User struct {
	ID   string `json:"id"`
	Text string `json:"text"`
}

// creates Golang channels to produce / consume messages
enq, dlq := enqueue.Event[Meta, User](q)
deq, ack := enqueue.Event[Meta, User](q)

Please see example about event producer and consumer.

Error Handling

The error handling on channel level is governed either by dead-letter queue or acknowledge protocol. The library provides swarm.WithStdErr configuration option to pass the side channel to consume global errors. Use it as top level error handler.

stderr := make(chan error)
q, err := sqs.New("swarm-test",
  sqs.WithConfig(
    swarm.WithStdErr(stderr),
  ),
)

for err := range stderr {
  // error handling loop
}

Fail Fast

The existing message routing architecture assumes that a micro-batch of messages is read from the broker, dispatched to channels, and then waits for acknowledgments. A new micro-batch is not read until all messages are acknowledged, or the TimeToFlight timer expires. In time-critical systems or serverless applications, a "fail fast" strategy is more effective (e.g., a Lambda function doesn't need to idle until the timeout).

Send negative acknowledgement to ack channel to indicate error on message processing.

deq, ack := dequeue.Typed[Note](q)

// consume messages and then acknowledge it
for msg := range deq {
  // negative ack on the error
  if err := doSomething(msg.Object); err != nil {
    ack <- msg.Fail(err)
    continue
  } 
  ack <- msg
}

Serverless

The library primarily support development of serverless event-driven application using AWS service. The library provides AWS CDK Golang constructs to spawn consumers. See example of serverless consumer and corresponding AWS CDK application.

It consistently implements a pattern - "create Broker, attach Sinks".

package main

import (
  "github.com/fogfish/scud"
  "github.com/fogfish/swarm/broker/eventbridge"
)

func main() {
  app := awscdk.NewApp(nil)
  stack := awscdk.NewStack(app, jsii.String("swarm-example-eventbridge"),
    &awscdk.StackProps{
      Env: &awscdk.Environment{
        Account: jsii.String(os.Getenv("CDK_DEFAULT_ACCOUNT")),
        Region:  jsii.String(os.Getenv("CDK_DEFAULT_REGION")),
      },
    },
 )

  // create broker
  broker := eventbridge.NewBroker(stack, jsii.String("Broker"), nil)
  broker.NewEventBus(nil)

  broker.NewSink(
    &eventbridge.SinkProps{
      Source: []string{"swarm-example-eventbridge"},
      Function: &scud.FunctionGoProps{
        SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge",
        SourceCodeLambda:  "examples/dequeue/typed",
      },
    },
  )

  app.Synth(nil)
}

Race condition in Serverless

In a serverless environment, performing dequeue and enqueue operations can lead to race conditions. Specifically, the dequeue loop may complete before other emitted messages are processed.

rcv, ack := dequeue.Typed[/* .. */](broker1)
snd, dlq := enqueue.Typed[/* .. */](broker2)

for msg := range rcv {
  snd <- // ...

  // The ack would cause sleep of function in serverless.
  // snd channel might not be flushed before function sleep.
  // The library does not provide yet ultimate solution.  
  ack <- msg   
}

Unfortunately, the library does not provide yet ultimate solution. Either sleep of sync senders are required.

How To Contribute

The library is Apache Version 2.0 licensed and accepts contributions via GitHub pull requests:

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

The build and testing process requires Go version 1.16 or later.

build and test library.

git clone https://github.com/fogfish/swarm
cd swarm
go test ./...

commit message

The commit message helps us to write a good release note, speed-up review process. The message should address two question what changed and why. The project follows the template defined by chapter Contributing to a Project of Git book.

bugs

If you experience any issues with the library, please let us know via GitHub issues. We appreciate detailed and accurate reports that help us to identity and replicate the issue.

Bring Your Own Queue

TBD

License

See LICENSE

Documentation

Index

Constants

View Source
const (
	EnvConfigPollFrequency  = "CONFIG_SWARM_POLL_FREQUENCY"
	EnvConfigTimeToFlight   = "CONFIG_SWARM_TIME_TO_FLIGHT"
	EnvConfigNetworkTimeout = "CONFIG_SWARM_NETWORK_TIMEOUT"
)

Environment variable to config kernel

View Source
const (
	ErrServiceIO = faults.Type("service i/o failed")
	ErrEnqueue   = faults.Type("enqueue is failed")
	ErrDequeue   = faults.Type("dequeue is failed")
)
View Source
const Version = "v0.20.2"

Variables

This section is empty.

Functions

func ErrTimeout added in v0.20.1

func ErrTimeout(op string, timer time.Duration) error

func LogDeadLetters added in v0.13.0

func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T

Consumes dead letter messages

swarm.LogDeadLetters(queue.Enqueue(...))

func TypeOf added in v0.20.0

func TypeOf[T any](category ...string) string

TypeOf returns normalized name of the type T.

Types

type Bag added in v0.4.0

type Bag struct {
	// Message category ~ topic
	Category string

	// Unique brief summary of the message
	Digest string

	// Error on the message processing
	Error error

	// I/O Context of the message, as obtained from broker
	IOContext any

	// Message raw content
	Object []byte
}

Bag is an abstract container for octet stream. Bag is used by the transport to abstract message on the wire.

type Codec added in v0.20.0

type Codec interface {
	Encode([]byte) ([]byte, error)
	Decode([]byte) ([]byte, error)
}

type Config

type Config struct {
	// Source is a direct performer of the event.
	// A software service that emits action to the stream.
	Source string

	// Quality of Service Policy
	Policy Policy

	// Queue capacity (enhance with individual capacities)
	CapOut int
	CapDlq int
	CapRcv int
	CapAck int

	// Retry Policy for service calls
	Backoff Retry

	// Standard Error I/O channel
	StdErr chan<- error

	// Size of poller pool in the system
	PollerPool int

	// Frequency to poll broker api
	PollFrequency time.Duration

	// Time To Flight is a time required by the client to acknowledge the message
	TimeToFlight time.Duration

	// Timeout for any network operations
	NetworkTimeout time.Duration

	// Fail fast the message if category is not known to kernel.
	FailOnUnknownCategory bool

	// PacketCodec for binary packets
	PacketCodec Codec
}

func NewConfig added in v0.9.0

func NewConfig() Config

type Event added in v0.5.0

type Event[M, T any] struct {
	Meta *M `json:"meta,omitempty"`
	Data *T `json:"data,omitempty"`
}

Event defines immutable fact(s) placed into the queueing system. Event resembles the concept of Action as it is defined by schema.org.

> An action performed by a direct agent and indirect participants upon a direct object.

This type supports development of event-driven solutions that treat data as a collection of immutable facts, which are queried and processed in real-time. These applications processes logical log of events, each event defines a change to current state of the object, i.e. which attributes were inserted, updated or deleted (a kind of diff). The event identifies the object that was changed together with using unique identifier.

In contrast with other solutions, the event does not uses envelop approach. Instead, it side-car meta and data each other, making extendible

type Meta added in v0.20.0

type Meta struct {
	//
	// Unique identity for event.
	// It is automatically defined by the library upon the transmission unless
	// defined by sender. Preserving ID across sequence of messages allows
	// building request/response semantic.
	ID string `json:"id,omitempty"`

	//
	// Canonical IRI that defines a type of action.
	// It is automatically defined by the library upon the transmission unless
	// defined by sender.
	Type curie.IRI `json:"type,omitempty"`

	//
	// Direct performer of the event, a software service that emits action to
	// the stream. It is automatically defined by the library upon the transmission
	// unless defined by sender.
	Agent curie.IRI `json:"agent,omitempty"`

	//
	// ISO8601 timestamps when action has been created
	// It is automatically defined by the library upon the transmission
	Created time.Time `json:"created,omitempty"`

	//
	// Indicates target performer of the event, a software service that is able to
	Target curie.IRI `json:"target,omitempty"`

	//
	// Indirect participants, a user who initiated an event.
	Participant curie.IRI `json:"participant,omitempty"`
}

The default metadata associated with event.

type Msg

type Msg[T any] struct {
	// Message category ~ topic
	Category string

	// Unique brief summary of the message
	Digest string

	// Error on the message processing
	Error error

	// I/O Context of the message, as obtained from broker
	IOContext any

	// Message decoded content
	Object T
}

Msg is a generic envelop type for incoming messages. It contains both decoded object and its digest used to acknowledge message.

func ToMsg added in v0.20.0

func ToMsg[T any](bag Bag, object T) Msg[T]

func (Msg[T]) Fail added in v0.12.0

func (msg Msg[T]) Fail(err error) Msg[T]

Fail message with error

type Option added in v0.9.0

type Option func(conf *Config)

Configuration option for queueing broker

func WithConfigFromEnv added in v0.13.2

func WithConfigFromEnv() Option

Configure from Environment, (all timers in seconds) - CONFIG_SWARM_POLL_FREQUENCY - CONFIG_SWARM_TIME_TO_FLIGHT - CONFIG_SWARM_NETWORK_TIMEOUT

func WithFailOnUnknownCategory added in v0.20.1

func WithFailOnUnknownCategory() Option

Fail fast the message if category is not known to kernel.

func WithLogStdErr added in v0.13.0

func WithLogStdErr() Option

Configure broker to log standard errors

func WithNetworkTimeout added in v0.9.0

func WithNetworkTimeout(t time.Duration) Option

Timeout for Network I/O

func WithPacketCodec added in v0.20.1

func WithPacketCodec(codec Codec) Option

Configure codec for binary packets

func WithPolicyAtLeastOnce added in v0.9.0

func WithPolicyAtLeastOnce(n int) Option

AtLeastOnce policy ensures delivery of the message to broker

The policy only impacts behavior of Golang channels created by the broker

func WithPolicyAtMostOnce added in v0.9.0

func WithPolicyAtMostOnce(n int) Option

AtMostOnce is best effort policy, where a message is published without any formal acknowledgement of receipt, and it isn't replayed.

The policy only impacts behavior of Golang channels created by the broker

func WithPollFrequency added in v0.9.0

func WithPollFrequency(t time.Duration) Option

Frequency to poll broker api

func WithPollerPool added in v0.20.2

func WithPollerPool(n int) Option

Number of poller in the system

func WithRetry added in v0.9.0

func WithRetry(backoff Retry) Option

Custom retry policy

func WithRetryConstant added in v0.9.0

func WithRetryConstant(t time.Duration, n int) Option

Retry operation for N times, with T wait time in between

func WithRetryExponential added in v0.9.0

func WithRetryExponential(t time.Duration, n int, f float64) Option

Retry operation for N times, with exponential increments by T on each step

func WithRetryLinear added in v0.9.0

func WithRetryLinear(t time.Duration, n int) Option

Retry operation for N times, with linear increments by T on each step

func WithRetryNo added in v0.9.0

func WithRetryNo() Option

No retires

func WithSource added in v0.9.0

func WithSource(agent string) Option

Source is a direct performer of the event. A software service that emits action to the stream.

func WithStdErr added in v0.9.0

func WithStdErr(stderr chan<- error) Option

Configure broker to route global errors to channel

func WithTimeToFlight added in v0.9.0

func WithTimeToFlight(t time.Duration) Option

Time To Flight for message from broker API to consumer

type Policy added in v0.4.0

type Policy int

Grade of Service Policy

const (
	PolicyAtMostOnce Policy = iota
	PolicyAtLeastOnce
	PolicyExactlyOnce
)

type Retry added in v0.9.0

type Retry interface {
	Retry(f func() error) error
}

Directories

Path Synopsis
broker
eventbridge Module
eventddb Module
events3 Module
eventsqs Module
sqs Module
websocket Module
qtest module
queue module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL