alice

package module
v0.1.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: MIT Imports: 8 Imported by: 7

README

Follow me down the rabbit hole...

“I'm late! I'm late! For a very important date!" - White Rabbit.

Alice

GoDoc

Alice is a wrapper around the Streadway amqp package, designed to be easier to use and offer automatic handling of many errors out of the box.


Credit for the cute Gopher goes to Ian Derksen

Features

  • Automatic broker reconnect (attempted at a user-defined interval)
  • Automatic producer and consumer reconnect upon channel error
  • Every message handled in a new routine
  • Separate TCP connections for producers and consumers
  • Queues and exchanges are objects, which can be reused for multiple consumers and/or producers
  • Fully mocked broker, consumer and producer for testing

Installation

go get github.com/christinoleo/alice

Quickstart

package main

import (
	"fmt"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/christinoleo/alice"
)

func main() {
	// Turn on logging
	alice.SetLogging()

	// Create a connection configuration
	connectionConfig := alice.CreateConfig(
		"guest",
		"guest",
		"localhost:5672",
		5672,
		true,
		time.Second*10,
		alice.DefaultErrorHandler,
	)

	// Create a broker using the connection config
	broker, err := alice.CreateBroker(connectionConfig)
	if err != nil {
		log.Println(err)
	}

	// Create an exchange called 'test-exchange' using direct routing
	exchange, err := alice.CreateExchange("test-exchange", alice.Direct, false, true, false, false, nil)
	if err != nil {
		log.Println(err)
	}

	// Create a queue called 'test-queue'
	q := alice.CreateQueue(exchange, "test-queue", false, false, true, false, nil)

	// Create a consumer bound to this queue, listening for messages with routing key 'key'
	c, err := broker.CreateConsumer(q, "key", "consumer-tag", alice.DefaultConsumerErrorHandler)

	// Start consuming messages
	// Every received message is passed to the handleMessage function
	go c.ConsumeMessages(nil, false, handleMessage)

	select {}
}

func handleMessage(msg amqp.Delivery) {
	fmt.Println(msg.Body)
	msg.Ack(true)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfig = &ConnectionConfig{
	user:           "guest",
	password:       "guest",
	host:           "localhost",
	port:           5672,
	autoReconnect:  true,
	reconnectDelay: time.Second * 10,
}

DefaultConfig is the default configuration for RabbitMQ.

User: "guest", password: "guest", host: "localhost", port: 5672, autoReconnect: true, reconnectDelay: time.Second * 10

Functions

func SetLogLevel

func SetLogLevel(level int)

SetLogLevel sets the level of logging Alice uses Levels are:

-2 Off
-1 Trace
0 Debug
1 Info
2 Warning
3 Error
4 Fatal
5 Panic

Types

type Broker

type Broker interface {
	CreateConsumer(queue *Queue, bindingKey string, consumerTag string) (Consumer, error)
	CreateProducer(exchange *Exchange) (Producer, error)
}

A Broker models a broker

func CreateBroker

func CreateBroker(config *ConnectionConfig) (Broker, error)

CreateBroker creates a broker

config: *ConnectionConfig, the connection configuration that should be used to connect to the broker
Returns Broker and a possible error

func CreateMockBroker

func CreateMockBroker() Broker

CreateMockBroker creates a new MockBroker (mock)

type ConnectionConfig

type ConnectionConfig struct {
	// contains filtered or unexported fields
}

ConnectionConfig is a config structure to use when setting up a RabbitMQ connection

func CreateConfig

func CreateConfig(user string, password string, host string, port int, autoReconnect bool, reconnectDelay time.Duration) *ConnectionConfig

CreateConfig creates a connection configuration with the supplied parameters

func (*ConnectionConfig) SetAutoReconnect

func (config *ConnectionConfig) SetAutoReconnect(autoReconnect bool)

SetAutoReconnect sets whether the connection should try reconnecting

func (*ConnectionConfig) SetHost

func (config *ConnectionConfig) SetHost(host string)

SetHost sets the broker connection host URI

func (*ConnectionConfig) SetPassword

func (config *ConnectionConfig) SetPassword(password string)

SetPassword sets the password to use for the connection to the broker

func (*ConnectionConfig) SetPort

func (config *ConnectionConfig) SetPort(port int)

SetPort sets the broker connection port

func (*ConnectionConfig) SetReconnectDelay

func (config *ConnectionConfig) SetReconnectDelay(reconnectDelay time.Duration)

SetReconnectDelay sets the delay between reconnection attempts

func (*ConnectionConfig) SetUser

func (config *ConnectionConfig) SetUser(user string)

SetUser sets the user to use for the connection to the broker

type Consumer

type Consumer interface {
	ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery))
	Shutdown() error
}

A Consumer models a broker consumer

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange models a RabbitMQ exchange

func CreateDefaultExchange

func CreateDefaultExchange(name string, exchangeType ExchangeType) (*Exchange, error)

CreateDefaultExchange returns an exchange with the following specifications:

durable: true, autodelete: false, internal: false, noWait: false, args: nil

func CreateExchange

func CreateExchange(name string, exchangeType ExchangeType, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) (*Exchange, error)

CreateExchange creates an exchange according to the specified arguments

func (*Exchange) SetArgs

func (e *Exchange) SetArgs(args amqp.Table)

SetArgs sets additional exchange arguments

func (*Exchange) SetAutoDelete

func (e *Exchange) SetAutoDelete(autoDelete bool)

SetAutoDelete sets exchange autoDelete

func (*Exchange) SetDurable

func (e *Exchange) SetDurable(durable bool)

SetDurable sets exchange durability

func (*Exchange) SetInternal

func (e *Exchange) SetInternal(internal bool)

SetInternal sets whether exchange is internal

func (*Exchange) SetName

func (e *Exchange) SetName(name string)

SetName sets the exchange name

func (*Exchange) SetNoWait

func (e *Exchange) SetNoWait(noWait bool)

SetNoWait sets the noWait flag

func (*Exchange) SetType

func (e *Exchange) SetType(exchangeType ExchangeType) error

SetType sets the exchange type

type ExchangeType

type ExchangeType string

ExchangeType denotes the types of exchanges RabbitMQ has

const (
	// Direct delivers messages to queues based on the message routing key
	Direct ExchangeType = "direct"

	// Fanout delivers messages to all connected queues
	Fanout ExchangeType = "fanout"

	// Topic delivers messages based on the matching between the message routing key and the pattern used to bind queues to the exchange
	Topic ExchangeType = "topic"

	// Headers delivers messages based on header values, similar to direct routing
	Headers ExchangeType = "headers"
)

func (*ExchangeType) IsValid

func (t *ExchangeType) IsValid() bool

IsValid determines whether an exchangeType is valid

func (ExchangeType) String

func (t ExchangeType) String() string

type MockBroker

type MockBroker struct {
	Messages map[*Queue]chan amqp.Delivery // The messages sent in a queue
	// contains filtered or unexported fields
}

MockBroker implements the Broker interface (mock)

func (*MockBroker) CreateConsumer

func (b *MockBroker) CreateConsumer(queue *Queue, bindingKey string, consumerTag string) (Consumer, error)

CreateConsumer creates a new consumer (mock)

func (*MockBroker) CreateProducer

func (b *MockBroker) CreateProducer(exchange *Exchange) (Producer, error)

CreateProducer creates a new producer (mock)

type MockConsumer

type MockConsumer struct {
	ReceivedMessages []amqp.Delivery
	// contains filtered or unexported fields
}

A MockConsumer implements the Consumer interface

func (*MockConsumer) ConsumeMessages

func (c *MockConsumer) ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery))

ConsumeMessages consumes messages sent to the consumer

func (*MockConsumer) Shutdown

func (c *MockConsumer) Shutdown() error

Shutdown shuts down the consumer

type MockProducer

type MockProducer struct {
	// contains filtered or unexported fields
}

A MockProducer implements the Producer interface

func (*MockProducer) PublishMessage

func (p *MockProducer) PublishMessage(msg []byte, key *string, headers *amqp.Table)

PublishMessage publishes a message

func (*MockProducer) PublishMessageJsonHeaders

func (p *MockProducer) PublishMessageJsonHeaders(body *[]byte, routingKey string, headers *map[string]interface{})

PublishMessageJsonHeaders publishes a message with JSON headers

func (*MockProducer) Shutdown

func (p *MockProducer) Shutdown() error

Shutdown shuts this producer down

type Producer

type Producer interface {
	PublishMessageJsonHeaders(body *[]byte, routingKey string, headers *map[string]interface{})
	PublishMessage(msg []byte, key *string, headers *amqp.Table)
	Shutdown() error
}

A Producer models a broker producer

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue models a RabbitMQ queue

func CreateDefaultQueue

func CreateDefaultQueue(exchange *Exchange, name string) *Queue

CreateDefaultQueue creates and returns a queue with the following parameters:

durable: true, exclusive, false, autoDelete: false, noWait: false, args: nil

func CreateQueue

func CreateQueue(exchange *Exchange, name string, durable bool, exclusive bool, autoDelete bool, noWait bool, arguments amqp.Table) *Queue

CreateQueue returns a queue created with the accompanied specifications

func (*Queue) SetArgs

func (q *Queue) SetArgs(args amqp.Table)

SetArgs sets additional queue arguments

func (*Queue) SetAutoDelete

func (q *Queue) SetAutoDelete(autoDelete bool)

SetAutoDelete sets queue auto deletion

func (*Queue) SetDurable

func (q *Queue) SetDurable(durable bool)

SetDurable set exchange durability

func (*Queue) SetExclusive

func (q *Queue) SetExclusive(exclusive bool)

SetExclusive sets queue exclusivity

func (*Queue) SetName

func (q *Queue) SetName(name string)

SetName sets the queue name

func (*Queue) SetNoWait

func (q *Queue) SetNoWait(noWait bool)

SetNoWait sets the noWait flag

type RabbitBroker

type RabbitBroker struct {
	// contains filtered or unexported fields
}

A RabbitBroker implements the Broker interface

func (*RabbitBroker) CreateConsumer

func (b *RabbitBroker) CreateConsumer(queue *Queue, bindingKey string, consumerTag string) (Consumer, error)

CreateConsumer creates a consumer

queue: *Queue, the queue this consumer should bind to
bindingKey: string, the key with which this consumer binds to the queue
errorHandler: func(error), the function to handle possible consumer errors
Returns: Consumer and a possible error

func (*RabbitBroker) CreateProducer

func (b *RabbitBroker) CreateProducer(exchange *Exchange) (Producer, error)

CreateProducer creates a producer

exchange: *Exchange, the exchange this producer will produce to
errorHandler: func(ProducerError), the errorhandler for this producer
Returns: Producer and a possible error

type RabbitConsumer

type RabbitConsumer struct {
	// contains filtered or unexported fields
}

RabbitConsumer models a RabbitMQ consumer

func (*RabbitConsumer) ConsumeMessages

func (c *RabbitConsumer) ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery))

ConsumeMessages starts the consumption of messages from the queue the consumer is bound to

args: amqp.Table, additional arguments for this consumer
autoAck: bool, whether to automatically acknowledge messages
messageHandler: func(amqp.Delivery), a handler for incoming messages. Every message the handler is called in a new goroutine

func (*RabbitConsumer) ReconnectChannel

func (c *RabbitConsumer) ReconnectChannel() error

ReconnectChannel tries to re-open this consumers channel

func (*RabbitConsumer) Shutdown

func (c *RabbitConsumer) Shutdown() error

Shutdown shuts down the consumer

type RabbitProducer

type RabbitProducer struct {
	// contains filtered or unexported fields
}

RabbitProducer models a RabbitMQ producer

func (*RabbitProducer) PublishMessage

func (p *RabbitProducer) PublishMessage(msg []byte, key *string, headers *amqp.Table)

PublishMessage publishes a message with the given routing key

func (*RabbitProducer) PublishMessageJsonHeaders

func (p *RabbitProducer) PublishMessageJsonHeaders(body *[]byte, routingKey string, headers *map[string]interface{})

PublishMessageJsonHeaders publishes a message with the specified routing key

body: *[]byte, the message to send, most likely a marshalled JSON object
routingKey: string, the key by which this message will be routed on the exchange
headers: *map[string]interface{}, the headers for this message, containing things like userID and sessionID

func (*RabbitProducer) ReconnectChannel

func (p *RabbitProducer) ReconnectChannel()

ReconnectChannel tries to re-open this producer's channel

func (*RabbitProducer) Shutdown

func (p *RabbitProducer) Shutdown() error

Shutdown closes this producer's channel

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL