bunnify

package module
v0.0.0-...-571b097 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2025 License: MIT Imports: 13 Imported by: 0

README

Go Report Card GitHub license Tests Coverage Status

Bunnify is a library to publish and consume events for AMQP.

[!IMPORTANT] While from my perspective the library is working just fine, I am still changing things here and there. Even tough the library is tagged as semver, I will start respecting it after the v1.0.0, and I won't guarantee backwards compatibility before that.

Features

Easy setup: Bunnify is designed to be easy to set up and use. Simply reference the library and start publishing and consuming events.

Automatic payload marshaling and unmarshaling: You can consume the same payload you published, without worrying about the details of marshaling and unmarshaling. Bunnify handles these actions for you, abstracting them away from the developer.

Automatic reconnection: If your connection to the AMQP server is interrupted, Bunnify will automatically handle the reconnection for you. This ensures that your events are published and consumed without interruption.

Built-in event metadata handling: The library automatically handles event metadata, including correlation IDs and other important details.

Retries and dead lettering: You can configure how many times an event can be retried and to send the event to a dead letter queue when the processing fails.

Tracing out of the box: Automatically injects and extracts traces when publishing and consuming. Minimal setup required is shown on the tracer test.

Prometheus metrics: Prometheus gatherer will collect automatically the following metrics:

  • amqp_events_received
  • amqp_events_without_handler
  • amqp_events_not_parsable
  • amqp_events_nack
  • amqp_events_processed_duration
  • amqp_events_publish_succeed
  • amqp_events_publish_failed

Only dependencies needed: The intention of the library is to avoid having lots of unneeded dependencies. I will always try to triple check the dependencies and use the least quantity of libraries to achieve the functionality required.

  • github.com/rabbitmq/amqp091-go: Handles the connection with AMQP protocol.
  • github.com/google/uuid: Generates UUID for events ID and correlation ID.
  • go.uber.org/goleak: Used on tests to verify that there are no leaks of routines on the handling of channels.
  • go.opentelemetry.io/otel: Handles the injection and extraction of the traces on the events.
  • github.com/prometheus/client_golang: Used in order to export metrics to Prometheus.

Outbox publisher: There is a submodule that you can refer with go get github.com/ambitiousrep/bunnify/outbox. This publisher is wrapping the default bunnify publisher and stores all events in a database table which will be looped in an async way to be published to AMQP. You can read more here.

Motivation

Every workplace I have been had their own AMQP library. Most of the time the problems that they try to solve are reconnection, logging, correlation, handling the correct body type for events and dead letter. Most of this libraries are good but also built upon some other internal libraries and with some company's specifics that makes them impossible to open source.

Some developers are often spoiled with these as they provide a good dev experience and that is great; but if you cannot use it in side projects or if you start your own company, what is the point?

Bunnify aims to provide a flexible and adaptable solution that can be used in a variety of environments and scenarios. By abstracting away many of the technical details of AMQP publishing and consumption, Bunnify makes it easy to get started with event-driven architecture without needing to be an AMQP expert.

Installation

go get github.com/ambitiousrep/bunnify

Examples

You can find all the working examples under the tests folder.

Consumer

https://github.com/ambitiousrep/bunnify/blob/f356a80625d9dcdaec12d05953447ebcc24a1b13/tests/consumer_publish_test.go#L38-L61

Dead letter consumer

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/dead_letter_receives_event_test.go#L34-L67

Using a default handler

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_test.go#L133-L170

Publisher

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_test.go#L64-L78

Enable Prometheus metrics

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_metrics_test.go#L30-L34

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_metrics_test.go#L70-L76

Enable tracing

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_tracer_test.go#L18-L20

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_tracer_test.go#L49-L58

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/tests/consumer_publish_tracer_test.go#L33-L37

Retries

https://github.com/ambitiousrep/bunnify/blob/53c83127f94da86377ae38630e010b9693f376ef/tests/consumer_retries_test.go#L66-L87

https://github.com/ambitiousrep/bunnify/blob/53c83127f94da86377ae38630e010b9693f376ef/tests/consumer_retries_test.go#L66-L87

Configuration

Both the connection and consumer structs can be configured with the typical functional options. You can find the options below:

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/bunnify/connection.go#L15-L37

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/bunnify/consumerOption.go#L18-L65

When publishing an event, you can override the event or the correlation ID if you need. This is also achievable with options:

https://github.com/ambitiousrep/bunnify/blob/76f7495ef660fd4c802af8e610ffbc9cca0e39ba/bunnify/publishableEvent.go#L22-L36

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddHandlerToConsumer

func AddHandlerToConsumer[T any](consumer *Consumer, routingKey string, handler EventHandler[T])

AddHandlerToConsumer adds a handler for the given routing key. It is another way to add handlers when the consumer is already created and cannot use the options.

func InitMetrics

func InitMetrics(registerer prometheus.Registerer) error

func WithBindingToExchange

func WithBindingToExchange(exchange string) func(*consumerOption)

WithBindingToExchange specifies the exchange on which the queue will bind for the handlers provided.

func WithCorrelationID

func WithCorrelationID(correlationID string) func(*eventOptions)

WithCorrelationID specifies the correlationID to be published if it is not used a random uuid will be generated.

func WithDeadLetterQueue

func WithDeadLetterQueue(queueName string) func(*consumerOption)

WithDeadLetterQueue indicates which queue will receive the events that were NACKed for this consumer.

func WithDefaultHandler

func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption)

WithDefaultHandler specifies a handler that can be use for any type of routing key without a defined handler. This is mostly convenient if you don't care about the specific payload of the event, which will be received as a byte array.

func WithEventID

func WithEventID(eventID string) func(*eventOptions)

WithEventID specifies the eventID to be published if it is not used a random uuid will be generated.

func WithHandler

func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption)

WithHandler specifies under which routing key the provided handler will be invoked. The routing key indicated here will be bound to the queue if the WithBindingToExchange is supplied.

func WithNotificationChannel

func WithNotificationChannel(notificationCh chan<- Notification) func(*connectionOption)

WithNotificationChannel specifies a go channel to receive messages such as connection established, reconnecting, event published, consumed, etc.

func WithQoS

func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption)

WithQoS specifies the prefetch count and size for the consumer.

func WithQuorumQueue

func WithQuorumQueue() func(*consumerOption)

WithQuorumQueue specifies that the queue to consume will be created as quorum queue. Quorum queues are used when data safety is the priority.

func WithReconnectInterval

func WithReconnectInterval(interval time.Duration) func(*connectionOption)

WithReconnectInterval establishes how much time to wait between each attempt of connection.

func WithRetries

func WithRetries(retries int) func(*consumerOption)

WithRetries specifies the retries count before the event is discarded or sent to dead letter. Quorum queues are required to use this feature. The event will be processed at max as retries + 1. If specified amount is 3, the event can be processed up to 4 times.

func WithURI

func WithURI(URI string) func(*connectionOption)

WithURI allows the consumer to specify the AMQP Server. It should be in the format of amqp://0.0.0.0:5672

func XnWGQvXK

func XnWGQvXK() error

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents a connection towards the AMQP server. A single connection should be enough for the entire application as the consuming and publishing is handled by channels.

func NewConnection

func NewConnection(opts ...func(*connectionOption)) *Connection

NewConnection creates a new AMQP connection using the indicated options. If the consumer does not supply options, it will by default connect to a localhost instance on and try to reconnect every 10 seconds.

func (*Connection) Close

func (c *Connection) Close() error

Closes connection with towards the AMQP server

func (*Connection) NewConsumer

func (c *Connection) NewConsumer(
	queueName string,
	opts ...func(*consumerOption)) Consumer

NewConsumer creates a consumer for a given queue using the specified connection. Information messages such as channel status will be sent to the notification channel if it was specified on the connection struct. If no QoS is supplied the prefetch count will be of 20.

func (*Connection) NewPublisher

func (c *Connection) NewPublisher() *Publisher

NewPublisher creates a publisher using the specified connection.

func (*Connection) Start

func (c *Connection) Start() error

Start establishes the connection towards the AMQP server. Only returns errors when the uri is not valid (retry won't do a thing)

type ConsumableEvent

type ConsumableEvent[T any] struct {
	Metadata
	DeliveryInfo DeliveryInfo
	Payload      T
}

ConsumableEvent[T] represents an event that can be consumed. The type parameter T specifies the type of the event's payload.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is used for consuming to events from an specified queue.

func (*Consumer) Consume

func (c *Consumer) Consume() error

Consume will start consuming events from the indicated queue. The first time this function is called it will return error if handlers or default handler are not specified or if queues, exchanges, bindings and qos creation don't succeed. In case this function gets called recursively due to channel reconnection, the errors will be pushed to the notification channel (if one has been indicated in the connection).

func (*Consumer) ConsumeParallel

func (c *Consumer) ConsumeParallel() error

ConsumeParallel will start consuming events for the indicated queue. The first time this function is called it will return error if handlers or default handler are not specified and also if queues, exchanges, bindings or qos creation don't succeed. In case this function gets called recursively due to channel reconnection, the errors will be pushed to the notification channel (if one has been indicated in the connection). The difference between this and the regular Consume is that this one fires a go routine per each message received as opposed of sequentially.

type DeliveryInfo

type DeliveryInfo struct {
	Queue      string
	Exchange   string
	RoutingKey string
}

DeliveryInfo holds information of original queue, exchange and routing keys.

type EventHandler

type EventHandler[T any] func(ctx context.Context, event ConsumableEvent[T]) error

EventHandler is the type definition for a function that is used to handle events of a specific type.

type Metadata

type Metadata struct {
	ID            string    `json:"id"`
	CorrelationID string    `json:"correlationId"`
	Timestamp     time.Time `json:"timestamp"`
}

Metadata holds the metadata of an event.

type Notification

type Notification struct {
	Message string
	Type    NotificationType
	Source  NotificationSource
}

func (Notification) String

func (n Notification) String() string

type NotificationSource

type NotificationSource string
const (
	NotificationSourceConnection NotificationSource = "CONNECTION"
	NotificationSourceConsumer   NotificationSource = "CONSUMER"
	NotificationSourcePublisher  NotificationSource = "PUBLISHER"
)

type NotificationType

type NotificationType string
const (
	NotificationTypeInfo  NotificationType = "INFO"
	NotificationTypeError NotificationType = "ERROR"
)

type PublishableEvent

type PublishableEvent struct {
	Metadata
	Payload any `json:"payload"`
}

PublishableEvent represents an event that can be published. The Payload field holds the event's payload data, which can be of any type that can be marshal to json.

func NewPublishableEvent

func NewPublishableEvent(payload any, opts ...func(*eventOptions)) PublishableEvent

NewPublishableEvent creates an instance of a PublishableEvent. In case the ID and correlation ID are not supplied via options random uuid will be generated.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is used for publishing events.

func (*Publisher) Publish

func (p *Publisher) Publish(
	ctx context.Context,
	exchange, routingKey string,
	event PublishableEvent) error

Publish publishes an event to the specified exchange. If the channel is closed, it will retry until a channel is obtained.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL