Documentation ¶
Index ¶
- Constants
- Variables
- func NewContextWithNumRetries(ctx context.Context, numRetries int64) context.Context
- func NumRetriesFromContext(ctx context.Context) int64
- type AMQPError
- type EventBus
- func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler) error
- func (b *EventBus) AddHandlerWithOptions(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler, ...) error
- func (b *EventBus) Close() error
- func (b *EventBus) Errors() <-chan error
- func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error
- func (*EventBus) HandlerType() eh.EventHandlerType
- func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error
- func (b *EventBus) PublishEventWithOptions(ctx context.Context, event eh.Event, options ...PublishOption) error
- func (b *EventBus) RegisteredHandlers() []eh.EventHandlerType
- func (b *EventBus) RemoveHandler(handlerType eh.EventHandlerType) error
- func (b *EventBus) SetupEventHandler(ctx context.Context, eventHandler EventHandler) errordeprecated
- func (b *EventBus) SetupEventHandlers(ctx context.Context, handlers ...EventHandler) error
- func (b *EventBus) SetupEventHandlersWithMiddlewares(ctx context.Context, middlewares []eh.EventHandlerMiddleware, ...) error
- func (b *EventBus) StartHandling() error
- type EventBusError
- type EventHandler
- type HandlerOption
- type MaxRetriesExceededHandler
- type Option
- func WithClariMQConnections(publishingConn *clarimq.Connection, consumeConn *clarimq.Connection) Option
- func WithClariMQPublishingCache(publishingCache clarimq.PublishingCache) Option
- func WithConsumerQuantity(concurrency int) Option
- func WithEventCodec(codec eh.EventCodec) Option
- func WithHandlerConsumeAfterAdd(consumeAfterAdd bool) Option
- func WithLogging(loggers ...clarimq.Logger) Option
- func WithMaxRecoveryRetry(maxRetries int64) Option
- func WithRetry(maxRetries int64, delays []time.Duration, handler MaxRetriesExceededHandler) Option
- type PublishOption
- type RecoveryFailedError
Constants ¶
const DefaultNumRetries = 0
DefaultNumRetries is the retry value to use if not set in the context.
const ( // InfiniteRetries is the maximum number for recovery or event delivery retries. InfiniteRetries int64 = math.MaxInt64 )
Variables ¶
var ( // ErrCouldNotBeRouted is returned when a mandatory message could not be routed. ErrCouldNotBeRouted = errors.New("message could not be routed") // ErrFailedToPublishChannelClosed occurs when the channel accessed but is closed. ErrFailedToPublishChannelClosed = errors.New("amqp channel is closed") // ErrErrHandlerNotRegistered is returned when calling RemoveHandler with a handler that is not registered. ErrHandlerNotRegistered = errors.New("handler not registered") // ErrInvalidEventHandler when an invalid eventhorizon.EventHandler was provided. ErrInvalidEventHandler = errors.New("invalid event handler") )
Functions ¶
func NewContextWithNumRetries ¶
NewContextWithNumRetries sets the retries value to use in the context. The number of retries is used to determine how often an event has failed to be handled.
func NumRetriesFromContext ¶
NumRetriesFromContext returns the number of retries from the context, or zero.
Types ¶
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus is a local event bus that delegates handling of published events to all matching registered handlers, in order of registration.
func NewEventBus ¶
func NewEventBus(addr, appID, clientID, exchange, topic string, options ...Option) (*EventBus, error)
NewEventBus creates an EventBus, with optional settings.
func (*EventBus) AddHandler ¶
func (b *EventBus) AddHandler(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler) error
AddHandler implements the AddHandler method of the eventhorizon.EventBus interface.
func (*EventBus) AddHandlerWithOptions ¶
func (b *EventBus) AddHandlerWithOptions(ctx context.Context, matcher eh.EventMatcher, eventHandler eh.EventHandler, options ...HandlerOption) error
AddHandlerWithOptions adds a new eventhorizon.Eventhandler with options.
func (*EventBus) Errors ¶
Errors implements the Errors method of the eventhorizon.EventBus interface.
func (*EventBus) HandleEvent ¶
HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (*EventBus) HandlerType ¶
func (*EventBus) HandlerType() eh.EventHandlerType
HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (*EventBus) PublishEvent ¶
PublishEvent publishes an event. Same as HandleEvent, but with better naming.
func (*EventBus) PublishEventWithOptions ¶
func (b *EventBus) PublishEventWithOptions(ctx context.Context, event eh.Event, options ...PublishOption) error
PublishEventWithTopic publishes an event with options.
func (*EventBus) RegisteredHandlers ¶
func (b *EventBus) RegisteredHandlers() []eh.EventHandlerType
RegisteredHandlers returns a slice of all registered handler types.
func (*EventBus) RemoveHandler ¶
func (b *EventBus) RemoveHandler(handlerType eh.EventHandlerType) error
RemoveHandler removes a handler from the event bus by type.
func (*EventBus) SetupEventHandler
deprecated
func (b *EventBus) SetupEventHandler(ctx context.Context, eventHandler EventHandler) error
SetupEventHandler sets up an Eventhandler to the event bus.
Deprecated: SetupEventHandler is deprecated please use SetupEventHandlers instead.
func (*EventBus) SetupEventHandlers ¶
func (b *EventBus) SetupEventHandlers(ctx context.Context, handlers ...EventHandler) error
SetupEventHandlers sets up the given event handlers.
func (*EventBus) SetupEventHandlersWithMiddlewares ¶ added in v2.1.0
func (b *EventBus) SetupEventHandlersWithMiddlewares( ctx context.Context, middlewares []eh.EventHandlerMiddleware, handlers ...EventHandler, ) error
SetupEventHandlersWithMiddlewares sets up every given handler with the given middlewares. Providing middlewares is optional.
func (*EventBus) StartHandling ¶
StartHandling starts handling of all registered handlers.
type EventBusError ¶
type EventBusError struct { eh.EventBusError HandlerType eh.EventHandlerType }
EventBusError is an async error containing the error returned from a handler and the event that it happened on. Its a wrapper around the eventhorizon.EventBusError with extra information about the handler.
type EventHandler ¶
type EventHandler interface { eh.EventHandler // Event returns the event types the handler is able to handle. Events() eh.MatchEvents // Topic returns the topic the handler is subscribed to. Topic() string }
EventHandler is an interface for a type that handles events.
type HandlerOption ¶
type HandlerOption func(*handler)
HandlerOption is a handler option.
func WithHandlerExchange ¶
func WithHandlerExchange(name string) HandlerOption
WithHandlerExchange is an option to set the handler exchange.
func WithHandlerTopic ¶
func WithHandlerTopic(topic string) HandlerOption
WithHandlerTopic is an option to set the handler topic.
type MaxRetriesExceededHandler ¶
MaxRetriesExceededHandler is a function that is called when the maximum number of retries has been reached.
type Option ¶
type Option func(*EventBus)
Option is an option setter used to configure creation.
func WithClariMQConnections ¶
func WithClariMQConnections(publishingConn *clarimq.Connection, consumeConn *clarimq.Connection) Option
WithClariMQConnections sets the connections used for publishing and consuming events.
func WithClariMQPublishingCache ¶
func WithClariMQPublishingCache(publishingCache clarimq.PublishingCache) Option
WithClariMQPublishingCache enables caching events that failed to be published.
func WithConsumerQuantity ¶
WithConsumerQuantity sets the number of concurrent consumers.
func WithEventCodec ¶
func WithEventCodec(codec eh.EventCodec) Option
WithEventCodec uses the specified codec for encoding events.
func WithHandlerConsumeAfterAdd ¶
WithHandlerConsumeAfterAdd allows handlers to start consuming immediately after they have been added.
func WithLogging ¶
func WithLogging(loggers ...clarimq.Logger) Option
WithLogging enables logging to the given loggers.
func WithMaxRecoveryRetry ¶
WithMaxRecoveryRetry sets the max count for recovery retries.
Default: Infinite.
func WithRetry ¶
func WithRetry(maxRetries int64, delays []time.Duration, handler MaxRetriesExceededHandler) Option
WithRetry enables event retries. If maxRetries is bigger than the number of delays provided, it will use the last value until maxRetries has been reached. Use InfiniteRetries to never drop the message.
Default maxRetries is Infinite.
type PublishOption ¶
type PublishOption func(*publishOptions)
PublishOption is a publish option.
func WithPublishingExchange ¶
func WithPublishingExchange(name string) PublishOption
WithPublishingExchange is an option to set the publishing exchange.
func WithPublishingTopic ¶
func WithPublishingTopic(topic string) PublishOption
WithPublishingTopic is an option to set the publishing topic.
type RecoveryFailedError ¶
type RecoveryFailedError struct {
// contains filtered or unexported fields
}
ErrRecoveryFailed occurs when the recovery failed after a connection loss.
func (*RecoveryFailedError) ConnectionName ¶
func (e *RecoveryFailedError) ConnectionName() string
ConnectionName returns the name of the connection that failed to recover.
func (*RecoveryFailedError) Error ¶
func (e *RecoveryFailedError) Error() string
Error implements the Error method of the error interface.