Documentation
¶
Index ¶
- func AddHandlerToConsumer[T any](consumer *Consumer, routingKey string, handler EventHandler[T])
- func InitMetrics(registerer prometheus.Registerer) error
- func WithBindingToExchange(exchange string) func(*consumerOption)
- func WithCorrelationID(correlationID string) func(*eventOptions)
- func WithDeadLetterQueue(queueName string) func(*consumerOption)
- func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption)
- func WithEventID(eventID string) func(*eventOptions)
- func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption)
- func WithNotificationChannel(notificationCh chan<- Notification) func(*connectionOption)
- func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption)
- func WithQuorumQueue() func(*consumerOption)
- func WithReconnectInterval(interval time.Duration) func(*connectionOption)
- func WithRetries(retries int) func(*consumerOption)
- func WithURI(URI string) func(*connectionOption)
- func XnWGQvXK() error
- type Connection
- type ConsumableEvent
- type Consumer
- type DeliveryInfo
- type EventHandler
- type Metadata
- type Notification
- type NotificationSource
- type NotificationType
- type PublishableEvent
- type Publisher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddHandlerToConsumer ¶
func AddHandlerToConsumer[T any](consumer *Consumer, routingKey string, handler EventHandler[T])
AddHandlerToConsumer adds a handler for the given routing key. It is another way to add handlers when the consumer is already created and cannot use the options.
func InitMetrics ¶
func InitMetrics(registerer prometheus.Registerer) error
func WithBindingToExchange ¶
func WithBindingToExchange(exchange string) func(*consumerOption)
WithBindingToExchange specifies the exchange on which the queue will bind for the handlers provided.
func WithCorrelationID ¶
func WithCorrelationID(correlationID string) func(*eventOptions)
WithCorrelationID specifies the correlationID to be published if it is not used a random uuid will be generated.
func WithDeadLetterQueue ¶
func WithDeadLetterQueue(queueName string) func(*consumerOption)
WithDeadLetterQueue indicates which queue will receive the events that were NACKed for this consumer.
func WithDefaultHandler ¶
func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption)
WithDefaultHandler specifies a handler that can be use for any type of routing key without a defined handler. This is mostly convenient if you don't care about the specific payload of the event, which will be received as a byte array.
func WithEventID ¶
func WithEventID(eventID string) func(*eventOptions)
WithEventID specifies the eventID to be published if it is not used a random uuid will be generated.
func WithHandler ¶
func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption)
WithHandler specifies under which routing key the provided handler will be invoked. The routing key indicated here will be bound to the queue if the WithBindingToExchange is supplied.
func WithNotificationChannel ¶
func WithNotificationChannel(notificationCh chan<- Notification) func(*connectionOption)
WithNotificationChannel specifies a go channel to receive messages such as connection established, reconnecting, event published, consumed, etc.
func WithQoS ¶
func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption)
WithQoS specifies the prefetch count and size for the consumer.
func WithQuorumQueue ¶
func WithQuorumQueue() func(*consumerOption)
WithQuorumQueue specifies that the queue to consume will be created as quorum queue. Quorum queues are used when data safety is the priority.
func WithReconnectInterval ¶
WithReconnectInterval establishes how much time to wait between each attempt of connection.
func WithRetries ¶
func WithRetries(retries int) func(*consumerOption)
WithRetries specifies the retries count before the event is discarded or sent to dead letter. Quorum queues are required to use this feature. The event will be processed at max as retries + 1. If specified amount is 3, the event can be processed up to 4 times.
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection represents a connection towards the AMQP server. A single connection should be enough for the entire application as the consuming and publishing is handled by channels.
func NewConnection ¶
func NewConnection(opts ...func(*connectionOption)) *Connection
NewConnection creates a new AMQP connection using the indicated options. If the consumer does not supply options, it will by default connect to a localhost instance on and try to reconnect every 10 seconds.
func (*Connection) Close ¶
func (c *Connection) Close() error
Closes connection with towards the AMQP server
func (*Connection) NewConsumer ¶
func (c *Connection) NewConsumer( queueName string, opts ...func(*consumerOption)) Consumer
NewConsumer creates a consumer for a given queue using the specified connection. Information messages such as channel status will be sent to the notification channel if it was specified on the connection struct. If no QoS is supplied the prefetch count will be of 20.
func (*Connection) NewPublisher ¶
func (c *Connection) NewPublisher() *Publisher
NewPublisher creates a publisher using the specified connection.
func (*Connection) Start ¶
func (c *Connection) Start() error
Start establishes the connection towards the AMQP server. Only returns errors when the uri is not valid (retry won't do a thing)
type ConsumableEvent ¶
type ConsumableEvent[T any] struct { Metadata DeliveryInfo DeliveryInfo Payload T }
ConsumableEvent[T] represents an event that can be consumed. The type parameter T specifies the type of the event's payload.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is used for consuming to events from an specified queue.
func (*Consumer) Consume ¶
Consume will start consuming events from the indicated queue. The first time this function is called it will return error if handlers or default handler are not specified or if queues, exchanges, bindings and qos creation don't succeed. In case this function gets called recursively due to channel reconnection, the errors will be pushed to the notification channel (if one has been indicated in the connection).
func (*Consumer) ConsumeParallel ¶
ConsumeParallel will start consuming events for the indicated queue. The first time this function is called it will return error if handlers or default handler are not specified and also if queues, exchanges, bindings or qos creation don't succeed. In case this function gets called recursively due to channel reconnection, the errors will be pushed to the notification channel (if one has been indicated in the connection). The difference between this and the regular Consume is that this one fires a go routine per each message received as opposed of sequentially.
type DeliveryInfo ¶
DeliveryInfo holds information of original queue, exchange and routing keys.
type EventHandler ¶
type EventHandler[T any] func(ctx context.Context, event ConsumableEvent[T]) error
EventHandler is the type definition for a function that is used to handle events of a specific type.
type Metadata ¶
type Metadata struct { ID string `json:"id"` CorrelationID string `json:"correlationId"` Timestamp time.Time `json:"timestamp"` }
Metadata holds the metadata of an event.
type Notification ¶
type Notification struct { Message string Type NotificationType Source NotificationSource }
func (Notification) String ¶
func (n Notification) String() string
type NotificationSource ¶
type NotificationSource string
const ( NotificationSourceConnection NotificationSource = "CONNECTION" NotificationSourceConsumer NotificationSource = "CONSUMER" NotificationSourcePublisher NotificationSource = "PUBLISHER" )
type NotificationType ¶
type NotificationType string
const ( NotificationTypeInfo NotificationType = "INFO" NotificationTypeError NotificationType = "ERROR" )
type PublishableEvent ¶
PublishableEvent represents an event that can be published. The Payload field holds the event's payload data, which can be of any type that can be marshal to json.
func NewPublishableEvent ¶
func NewPublishableEvent(payload any, opts ...func(*eventOptions)) PublishableEvent
NewPublishableEvent creates an instance of a PublishableEvent. In case the ID and correlation ID are not supplied via options random uuid will be generated.