opinionatedevents

package module
v0.1.0-beta.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: MIT Imports: 20 Imported by: 3

README

⚠️ Work in Progress

This package is still work in progress and is not recommended to be used.

Opinionated Events

Go Reference

Table of Contents

  1. Install
  2. The problem
  3. This solution
  4. Quickstart
    1. Local
    2. Postgres
    3. Custom

Install

go get github.com/markusylisiurunen/go-opinionatedevents

The problem

When building real-world applications, we often have situations where we need to perform multiple, dependent tasks in response to some trigger. For example, when a new user is created, we might want to do any number of the following actions:

  • Create a new user in our database.
  • Send the user a verification email.
  • Add the user's information to our CRM.
  • Add the new user to an emailing list.
  • Create a corresponding customer in Stripe.

These were just a few examples of one possible situation. In general, we can describe such a situation with a graph-like structure of different actions that need to happen in a particular order. Still, they may not necessarily need to block the "primary" action, nor do they need to be executed within a single transaction.

Event-driven architectures can help solve this problem by decoupling the primary action from the secondary or tertiary actions by introducing a layer in the middle. This new layer will take care of broadcasting events to all interested parties, retrying failed deliveries with a backoff, persisting events over system outages, and so on. Our responsibility is only to publish the events of interest to the event bus and implementing the logic for handling these events.

Many existing services can be used as a message broker. In addition to open-source solutions like RabbitMQ, some cloud providers have their own managed services, such as AWS SNS and Cloud Pub/Sub. Without a proper abstraction layer between our code and the chosen message broker, we might run into issues when setting up our local development environment or testing our code. It would be good not to depend on any specific message broker implementation but rather to be able to switch between different ways of delivering (or not delivering at all) events from one service to another.

This package tries to provide a minimal abstraction between our code and the chosen message broker. The goal is to allow zero local dependencies and allow testing asynchronous, event-based systems with ease.

This solution

This solution is split into three separate concepts, a publisher, a bridge, and a destination.

Destination is the component which receives a single message to be delivered to a destination. Its only concern is to try to send the received message to the destination and nothing more, making it a very dumb component. The meaning of "delivered" depends on which destination we are using. For example, a message is delivered to Cloud Pub/Sub whenever Cloud Pub/Sub has acknowledged of receiving it.

The bridge's responsibility is to use the given destination to either synchronously or asynchronously deliver a published message to the destination. Depending on which type of bridge is used, it may retry failed attempts, buffer messages before sending them, or have any other logic between publishing the message from the code to actually sending the message to the destination.

The final piece is the publisher. This is the client-facing component of which API is used to publish the actual messages to the event bus.

Quickstart

Local

func GetLocalPublisher() *events.Publisher {
    // define the local destination(s) (i.e. the services you have running locally, including the current service)
    destOne := events.NewHTTPDestination("http://localhost:8080/_events/local")
    destTwo := events.NewHTTPDestination("http://localhost:8081/_events/local")

    // initialise the publisher with an async bridge
    publisher, err := events.NewPublisher(
        events.PublisherWithAsyncBridge(10, 200, destOne, destTwo),
    )
    if err != nil {
        panic(err)
    }

    return publisher
}

Postgres

For PostgreSQL, you can setup the publisher like so.

const (
    connectionString string = "<database connection string>"
)

func GetPostgresPublisher() *events.Publisher {
    // create a new database connection
    db, err := sql.Open("postgres", connectionString)
    if err != nil {
        panic(err)
    }

    // make a new postgres destination
    destination, err := events.NewPostgresDestination(db)

    // initialise the publisher with a sync bridge
    publisher, err := events.NewPublisher(
        // IMPORTANT: you must use the sync bridge with Postgres destination
        events.PublisherWithSyncBridge(destination),
    )
    if err != nil {
        panic(err)
    }

    return publisher
}

Now, if you want to publish events within a database transaction, you can do it like so.

func PublishWithTransaction(ctx context.Context, db *sql.DB) {
    publisher := GetPostgresPublisher()
    tx, _ := db.Begin()

    msg, _ := events.NewMessage("test.test", nil)
    publisher.Publish(events.WithTx(ctx, tx), msg)

    tx.Commit()
}

Custom

type StdOutDestination struct{}

func (d *StdOutDestination) Deliver(msg *events.Message) error {
    fmt.Printf("received a message: %s\n", msg.GetName())
    return nil
}

func NewStdOutDestination() *StdOutDestination {
    return &StdOutDestination{}
}

func GetCustomPublisher() *events.Publisher {
    dest := NewStdOutDestination()

    publisher, err := events.NewPublisher(
        events.PublisherWithAsyncBridge(10, 200, dest),
    )
    if err != nil {
        panic(err)
    }

    return publisher
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConstantBackoff

func ConstantBackoff(delay time.Duration) *constantBackoff

func ExponentialBackoff

func ExponentialBackoff(c float64, a float64, b float64, l time.Duration) *exponentialBackoff

func Fatal

func Fatal(err error) error

func IsFatal

func IsFatal(err error) bool

func LinearBackoff

func LinearBackoff(c float64, k float64, l time.Duration) *linearBackoff

func NewHTTPDestination

func NewHTTPDestination(endpoint string) *httpDestination

func NewPostgresDestination

func NewPostgresDestination(db *sql.DB, options ...postgresDestinationOption) (*postgresDestination, error)

func NewPostgresSource

func NewPostgresSource(db *sql.DB, options ...postgresSourceOption) (*postgresSource, error)

func PostgresDestinationWithSchema

func PostgresDestinationWithSchema(schema string) postgresDestinationOption

func PostgresSourceWithIntervalTrigger

func PostgresSourceWithIntervalTrigger(interval time.Duration) postgresSourceOption

func PostgresSourceWithMaxWorkers

func PostgresSourceWithMaxWorkers(maxWorkers uint) postgresSourceOption

func PostgresSourceWithNotifyTrigger

func PostgresSourceWithNotifyTrigger(connectionString string) postgresSourceOption

func PostgresSourceWithSchema

func PostgresSourceWithSchema(schema string) postgresSourceOption

func PublisherWithAsyncBridge

func PublisherWithAsyncBridge(
	maxAttempts int,
	waitBetweenAttempts int,
	destinations ...Destination,
) publisherOption

func PublisherWithSyncBridge

func PublisherWithSyncBridge(destinations ...Destination) publisherOption

func ReceiverWithSource

func ReceiverWithSource(source Source) receiverOption

func WithTx

func WithTx(ctx context.Context, tx sqlTx) context.Context

Types

type Backoff

type Backoff interface {
	DeliverAfter(attempt int) time.Duration
}

type Delivery

type Delivery interface {
	GetAttempt() int
	GetQueue() string
	GetMessage() *Message
}

type Destination

type Destination interface {
	Deliver(ctx context.Context, batch []*Message) error
}

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(name string, payload any, options ...MessageOption) (*Message, error)

func (*Message) GetDeliverAt

func (msg *Message) GetDeliverAt() time.Time

func (*Message) GetName

func (msg *Message) GetName() string

func (*Message) GetPayload

func (msg *Message) GetPayload(payload any) error

func (*Message) GetPublishedAt

func (msg *Message) GetPublishedAt() time.Time

func (*Message) GetTopic

func (msg *Message) GetTopic() string

func (*Message) GetUUID

func (msg *Message) GetUUID() string

func (*Message) MarshalJSON

func (msg *Message) MarshalJSON() ([]byte, error)

func (*Message) UnmarshalJSON

func (msg *Message) UnmarshalJSON(data []byte) error

type MessageOption

type MessageOption func(*Message)

func WithDeliverAt

func WithDeliverAt(when time.Time) MessageOption

type OnMessageHandler

type OnMessageHandler func(ctx context.Context, delivery Delivery) error

type OnMessageMiddleware

type OnMessageMiddleware func(next OnMessageHandler) OnMessageHandler

func WithBackoff

func WithBackoff(backoff Backoff) OnMessageMiddleware

func WithLimit

func WithLimit(limit int) OnMessageMiddleware

type PostgresSourceQueueDeclareParams

type PostgresSourceQueueDeclareParams struct {
	Topic string
	Queue string
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(opts ...publisherOption) (*Publisher, error)

func (*Publisher) Drain

func (p *Publisher) Drain()

func (*Publisher) OnDeliveryFailure

func (p *Publisher) OnDeliveryFailure(handler func(batch []*Message)) func()

func (*Publisher) PublishMany

func (p *Publisher) PublishMany(ctx context.Context, batch []*Message) error

func (*Publisher) PublishOne

func (p *Publisher) PublishOne(ctx context.Context, msg *Message) error

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func NewReceiver

func NewReceiver(opts ...receiverOption) (*Receiver, error)

func (*Receiver) Deliver

func (r *Receiver) Deliver(ctx context.Context, delivery Delivery) error

func (*Receiver) GetMessagesWithHandlers

func (r *Receiver) GetMessagesWithHandlers(queue string) []string

func (*Receiver) GetQueuesWithHandlers

func (r *Receiver) GetQueuesWithHandlers() []string

func (*Receiver) On

func (r *Receiver) On(queue string, name string, onMessage OnMessageHandler) error

func (*Receiver) Start

func (r *Receiver) Start(ctx context.Context) error

type Source

type Source interface {
	Start(ctx context.Context, receiver *Receiver) error
}

Directories

Path Synopsis
examples
publisher Module
receiver Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL