rabbitmq

package module
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

README

eh-rabbitmq/v2

rabbitmq event bus implementation for eventhorizon

Documentation

Index

Constants

View Source
const DefaultNumRetries = 0

DefaultNumRetries is the retry value to use if not set in the context.

View Source
const (
	// InfiniteRetries is the maximum number for recovery or event delivery retries.
	InfiniteRetries int64 = math.MaxInt64
)

Variables

View Source
var (
	// ErrCouldNotBeRouted is returned when a mandatory message could not be routed.
	ErrCouldNotBeRouted = errors.New("message could not be routed")

	// ErrFailedToPublishChannelClosed occurs when the channel accessed but is closed.
	ErrFailedToPublishChannelClosed = errors.New("amqp channel is closed")

	// ErrErrHandlerNotRegistered is returned when calling RemoveHandler with a handler that is not registered.
	ErrHandlerNotRegistered = errors.New("handler not registered")

	// ErrInvalidEventHandler when an invalid eventhorizon.EventHandler was provided.
	ErrInvalidEventHandler = errors.New("invalid event handler")
)

Functions

func NewContextWithNumRetries

func NewContextWithNumRetries(ctx context.Context, numRetries int64) context.Context

NewContextWithNumRetries sets the retries value to use in the context. The number of retries is used to determine how often an event has failed to be handled.

func NumRetriesFromContext

func NumRetriesFromContext(ctx context.Context) int64

NumRetriesFromContext returns the number of retries from the context, or zero.

Types

type AMQPError

type AMQPError clarimq.AMQPError

func (*AMQPError) Error

func (e *AMQPError) Error() string

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is a local event bus that delegates handling of published events to all matching registered handlers, in order of registration.

func NewEventBus

func NewEventBus(addr, appID, clientID, exchange, topic string, options ...Option) (*EventBus, error)

NewEventBus creates an EventBus, with optional settings.

func (*EventBus) AddHandler

func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler) error

AddHandler implements the AddHandler method of the eventhorizon.EventBus interface.

func (*EventBus) AddHandlerWithOptions

func (b *EventBus) AddHandlerWithOptions(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler, options ...HandlerOption) error

AddHandlerWithOptions adds a new eventhorizon.Eventhandler with options.

func (*EventBus) Close

func (b *EventBus) Close() error

Close implements the Close method of the eventhorizon.EventBus interface.

func (*EventBus) Errors

func (b *EventBus) Errors() <-chan error

Errors implements the Errors method of the eventhorizon.EventBus interface.

func (*EventBus) HandleEvent

func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error

HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.

func (*EventBus) HandlerType

func (*EventBus) HandlerType() eh.EventHandlerType

HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.

func (*EventBus) PublishEvent

func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error

PublishEvent publishes an event. Same as HandleEvent, but with better naming.

func (*EventBus) PublishEventWithOptions

func (b *EventBus) PublishEventWithOptions(ctx context.Context, event eh.Event, options ...PublishOption) error

PublishEventWithTopic publishes an event with options.

func (*EventBus) RegisteredHandlers

func (b *EventBus) RegisteredHandlers() []eh.EventHandlerType

RegisteredHandlers returns a slice of all registered handler types.

func (*EventBus) RemoveHandler

func (b *EventBus) RemoveHandler(handlerType eh.EventHandlerType) error

RemoveHandler removes a handler from the event bus by type.

func (*EventBus) SetupEventHandler deprecated

func (b *EventBus) SetupEventHandler(ctx context.Context, eventHandler EventHandler) error

SetupEventHandler sets up an Eventhandler to the event bus.

Deprecated: SetupEventHandler is deprecated please use SetupEventHandlers instead.

func (*EventBus) SetupEventHandlers

func (b *EventBus) SetupEventHandlers(ctx context.Context, handlers ...EventHandler) error

SetupEventHandlers sets up the given event handlers.

func (*EventBus) SetupEventHandlersWithMiddlewares added in v2.1.0

func (b *EventBus) SetupEventHandlersWithMiddlewares(
	ctx context.Context,
	middlewares []eh.EventHandlerMiddleware,
	handlers ...EventHandler,
) error

SetupEventHandlersWithMiddlewares sets up every given handler with the given middlewares. Providing middlewares is optional.

func (*EventBus) StartHandling

func (b *EventBus) StartHandling() error

StartHandling starts handling of all registered handlers.

type EventBusError

type EventBusError struct {
	eh.EventBusError
	HandlerType eh.EventHandlerType
}

EventBusError is an async error containing the error returned from a handler and the event that it happened on. Its a wrapper around the eventhorizon.EventBusError with extra information about the handler.

type EventHandler

type EventHandler interface {
	eh.EventHandler
	// Event returns the event types the handler is able to handle.
	Events() eh.MatchEvents
	// Topic returns the topic the handler is subscribed to.
	Topic() string
}

EventHandler is an interface for a type that handles events.

type HandlerOption

type HandlerOption func(*handler)

HandlerOption is a handler option.

func WithHandlerExchange

func WithHandlerExchange(name string) HandlerOption

WithHandlerExchange is an option to set the handler exchange.

func WithHandlerTopic

func WithHandlerTopic(topic string) HandlerOption

WithHandlerTopic is an option to set the handler topic.

type MaxRetriesExceededHandler

type MaxRetriesExceededHandler func(ctx context.Context, event eh.Event, errorMessage string) error

MaxRetriesExceededHandler is a function that is called when the maximum number of retries has been reached.

type Option

type Option func(*EventBus)

Option is an option setter used to configure creation.

func WithClariMQConnections

func WithClariMQConnections(publishingConn *clarimq.Connection, consumeConn *clarimq.Connection) Option

WithClariMQConnections sets the connections used for publishing and consuming events.

func WithClariMQPublishingCache

func WithClariMQPublishingCache(publishingCache clarimq.PublishingCache) Option

WithClariMQPublishingCache enables caching events that failed to be published.

func WithConsumerQuantity

func WithConsumerQuantity(concurrency int) Option

WithConsumerQuantity sets the number of concurrent consumers.

func WithEventCodec

func WithEventCodec(codec eh.EventCodec) Option

WithEventCodec uses the specified codec for encoding events.

func WithHandlerConsumeAfterAdd

func WithHandlerConsumeAfterAdd(consumeAfterAdd bool) Option

WithHandlerConsumeAfterAdd allows handlers to start consuming immediately after they have been added.

func WithLogging

func WithLogging(loggers ...clarimq.Logger) Option

WithLogging enables logging to the given loggers.

func WithMaxRecoveryRetry

func WithMaxRecoveryRetry(maxRetries int64) Option

WithMaxRecoveryRetry sets the max count for recovery retries.

Default: Infinite.

func WithRetry

func WithRetry(maxRetries int64, delays []time.Duration, handler MaxRetriesExceededHandler) Option

WithRetry enables event retries. If maxRetries is bigger than the number of delays provided, it will use the last value until maxRetries has been reached. Use InfiniteRetries to never drop the message.

Default maxRetries is Infinite.

type PublishOption

type PublishOption func(*publishOptions)

PublishOption is a publish option.

func WithPublishingExchange

func WithPublishingExchange(name string) PublishOption

WithPublishingExchange is an option to set the publishing exchange.

func WithPublishingTopic

func WithPublishingTopic(topic string) PublishOption

WithPublishingTopic is an option to set the publishing topic.

type RecoveryFailedError

type RecoveryFailedError struct {
	// contains filtered or unexported fields
}

ErrRecoveryFailed occurs when the recovery failed after a connection loss.

func (*RecoveryFailedError) ConnectionName

func (e *RecoveryFailedError) ConnectionName() string

ConnectionName returns the name of the connection that failed to recover.

func (*RecoveryFailedError) Error

func (e *RecoveryFailedError) Error() string

Error implements the Error method of the error interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL