Documentation ¶
Index ¶
- Constants
- func NoOPLoggerFN(message string, fields Fields)
- func StartSupervisor(rabbids *Rabbids, intervalChecks time.Duration) (stop func(), err error)
- type Binding
- type Config
- type Connection
- type Consumer
- type ConsumerConfig
- type DeadLetter
- type ExchangeConfig
- type Fields
- type File
- type LoggerFN
- type Message
- type MessageHandler
- type MessageHandlerFunc
- type Options
- type Producer
- type ProducerOption
- type Publishing
- type PublishingError
- type PublishingOption
- type QueueConfig
- type Rabbids
- type Serializer
Constants ¶
const ( Version = "0.0.1" DefaultTimeout = 2 * time.Second DefaultSleep = 500 * time.Millisecond DefaultRetries = 5 )
Variables ¶
This section is empty.
Functions ¶
func NoOPLoggerFN ¶
func StartSupervisor ¶
StartSupervisor init a new supervisor that will start all the consumers from Rabbids and check if the consumers are alive, if not alive it will be restarted. It returns the stop function to gracefully shutdown the consumers and an error if fail to create the consumers the first time.
Types ¶
type Binding ¶
type Binding struct { Exchange string `mapstructure:"exchange"` RoutingKeys []string `mapstructure:"routing_keys"` Options Options `mapstructure:"options"` }
Binding describe how a queue connects to a exchange.
type Config ¶
type Config struct { // Connections describe the connections used by consumers. Connections map[string]Connection `mapstructure:"connections"` // Exchanges have all the exchanges used by consumers. // This exchanges are declared on startup of the rabbids client. Exchanges map[string]ExchangeConfig `mapstructure:"exchanges"` // DeadLetters have all the deadletters queues used internally by other queues // This will be declared at startup of the rabbids client. DeadLetters map[string]DeadLetter `mapstructure:"dead_letters"` // Consumers describes configuration list for consumers. Consumers map[string]ConsumerConfig `mapstructure:"consumers"` // Registered Message handlers used by consumers Handlers map[string]MessageHandler }
Config describes all available options to declare all the components used by rabbids Consumers and Producers.
func ConfigFromFile ¶
ConfigFromFilename read a YAML file and convert it into a Config struct with all the configuration to build the Consumers and producers. Also, it Is possible to use environment variables values inside the YAML file. The syntax is like the syntax used inside the docker-compose file. To use a required variable just use like this: ${ENV_NAME} and to put an default value you can use: ${ENV_NAME:=some-value} inside any value. If a required variable didn't exist, an error will be returned.
func ConfigFromFilename ¶
ConfigFromFilename is a wrapper to open the file and pass to ConfigFromFile.
func (*Config) RegisterHandler ¶
func (c *Config) RegisterHandler(consumerName string, h MessageHandler)
RegisterHandler is used to set the MessageHandler used by one Consumer. The consumerName MUST be equal as the name used by the Consumer (the key inside the map of consumers).
type Connection ¶
type Connection struct { DSN string `mapstructure:"dsn"` Timeout time.Duration `mapstructure:"timeout"` Sleep time.Duration `mapstructure:"sleep"` Retries int `mapstructure:"retries"` }
Connection describe a config for one connection.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a high level rabbitMQ consumer.
type ConsumerConfig ¶
type ConsumerConfig struct { Connection string `mapstructure:"connection"` Workers int `mapstructure:"workers"` PrefetchCount int `mapstructure:"prefetch_count"` DeadLetter string `mapstructure:"dead_letter"` Queue QueueConfig `mapstructure:"queue"` Options Options `mapstructure:"options"` }
ConsumerConfig describes consumer's configuration.
type DeadLetter ¶
type DeadLetter struct {
Queue QueueConfig `mapstructure:"queue"`
}
DeadLetter describe all the dead letters queues to be declared before declare other queues.
type ExchangeConfig ¶
type ExchangeConfig struct { Type string `mapstructure:"type"` Options Options `mapstructure:"options"` }
ExchangeConfig describes exchange's configuration.
type MessageHandler ¶
type MessageHandler interface { // Handle a single message, this method MUST be safe for concurrent use Handle(m Message) // Close the handler, this method is called when the consumer is closing Close() }
MessageHandler is the base interface used to consumer AMPQ messages.
type MessageHandlerFunc ¶
type MessageHandlerFunc func(m Message)
MessageHandlerFunc implements the MessageHandler interface.
func (MessageHandlerFunc) Close ¶
func (h MessageHandlerFunc) Close()
func (MessageHandlerFunc) Handle ¶
func (h MessageHandlerFunc) Handle(m Message)
type Options ¶
type Options struct { Durable bool `mapstructure:"durable"` Internal bool `mapstructure:"internal"` AutoDelete bool `mapstructure:"auto_delete"` Exclusive bool `mapstructure:"exclusive"` NoWait bool `mapstructure:"no_wait"` NoLocal bool `mapstructure:"no_local"` AutoAck bool `mapstructure:"auto_ack"` Args amqp.Table `mapstructure:"args"` }
Options describes optionals configuration for consumer, queue, bindings and exchanges declaration.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is an high level rabbitMQ producer instance.
func NewProducer ¶
func NewProducer(dsn string, opts ...ProducerOption) (*Producer, error)
NewProcucer create a new high level rabbitMQ producer instance
dsn is a string in the AMQP URI format the ProducerOptions can be:
rabbids.WithLogger - to set a logger instance rabbids.WithFactory - to use one instance of a factory. when added the factory is used to declare the topics in the first time the topic is used. rabbids.WithSerializer - used to set a specific serializer the default is the a JSON serializer.
func (*Producer) Close ¶
Close will close all the underline channels and close the connection with rabbitMQ. Any Emit call after calling the Close method will panic.
func (*Producer) Emit ¶
func (p *Producer) Emit() chan<- Publishing
Emit emits a message to rabbitMQ but does not wait for the response from the broker. Errors with the Publishing (encoding, validation) or with the broker will be sent to the EmitErr channel. It's your responsibility to handle these errors somehow.
func (*Producer) EmitErr ¶
func (p *Producer) EmitErr() <-chan PublishingError
EmitErr returns a channel used to receive all the errors from Emit channel. The error handle is not required but and the send inside this channel is buffered. WARNING: If the channel gets full, new errors will be dropped to avoid stop the producer internal loop.
func (*Producer) GetAMQPChannel ¶
GetAMQPChannel returns the current connection channel.
func (*Producer) GetAMQPConnection ¶
func (p *Producer) GetAMQPConnection() *amqp.Connection
GetAGetAMQPConnection returns the current amqp connetion.
func (*Producer) Send ¶
func (p *Producer) Send(m Publishing) error
Send a message to rabbitMQ. In case of connection errors, the send will block and retry until the reconnection is done. It returns an error if the Serializer returned an error OR the connection error persisted after the retries.
type ProducerOption ¶
ProducerOption represents an option function to add some functionality or change the producer state on creation time.
func WithCustomName ¶
func WithCustomName(name string) ProducerOption
func WithLogger ¶
func WithLogger(log LoggerFN) ProducerOption
WithLogger will override the default logger (no Operation Log).
func WithSerializer ¶
func WithSerializer(s Serializer) ProducerOption
type Publishing ¶
type Publishing struct { // Exchange name Exchange string // The routing key Key string // Data to be encoded inside the message Data interface{} // Delay is the duration to wait until the message is delivered to the queue. // The max delay period is 268,435,455 seconds, or about 8.5 years. Delay time.Duration amqp.Publishing // contains filtered or unexported fields }
Publishing have the fields for sending a message to rabbitMQ.
func NewDelayedPublishing ¶
func NewDelayedPublishing(queue string, delay time.Duration, data interface{}, options ...PublishingOption) Publishing
SendWithDelay send a message to arrive the queue only after the time is passed. The minimum delay is one second, if the delay is less than the minimum, the minimum will be used. The max delay period is 268,435,455 seconds, or about 8.5 years.
func NewPublishing ¶
func NewPublishing(exchange, key string, data interface{}, options ...PublishingOption) Publishing
NewPublishing create a message to be sent by some consumer.
type PublishingError ¶
type PublishingError struct { Publishing Err error }
PublishingError is returned by the async error reporting. When an async publishing message is sent and an error happens the Publishing and the error will be sent to the EmitErr channel. To get this channel, call the EmitErr method inside the producer.
type PublishingOption ¶
type PublishingOption func(*Publishing)
PublishingOption represents an option you can pass to setup some data inside the Publishing.
func WithPriority ¶
func WithPriority(v int) PublishingOption
WithPriority change the priority of the Publishing message.
type QueueConfig ¶
type QueueConfig struct { Name string `mapstructure:"name"` Bindings []Binding `mapstructure:"bindings"` Options Options `mapstructure:"options"` }
QueueConfig describes queue's configuration.
type Rabbids ¶
type Rabbids struct {
// contains filtered or unexported fields
}
Rabbids is the main block used to create and run rabbitMQ consumers and producers.
func (*Rabbids) CreateConsumer ¶
CreateConsumer create a new consumer for a specific name using the config provided.
func (*Rabbids) CreateConsumers ¶
CreateConsumers will iterate over config and create all the consumers.
func (*Rabbids) CreateProducer ¶
func (r *Rabbids) CreateProducer(connectionName string, customOpts ...ProducerOption) (*Producer, error)
CreateConsumer create a new consumer using the connection inside the config.
type Serializer ¶
type Serializer interface { Marshal(interface{}) ([]byte, error) // Name return the name used on the content type of the messsage Name() string }
Serializer is the base interface for all message serializers.