Documentation ¶
Index ¶
- Variables
- func GetDeliveryLogEntries(delivery amqp.Delivery) logrus.Fields
- func GetFqn(obj interface{}) string
- func GetMessageName(delivery amqp.Delivery) string
- func GetTypeFQN(t reflect.Type) string
- type AMQPOutbox
- type Builder
- type Bus
- type BusConfiguration
- type BusMessage
- func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)
- func (bm *BusMessage) GetTraceLog() (fields []log.Field)
- func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery)
- func (bm *BusMessage) SetIdempotencyKey(idempotencyKey string)
- func (bm *BusMessage) SetPayload(payload Message)
- func (bm *BusMessage) TargetSaga(sagaID string)
- type BusSwitch
- type Deadlettering
- type DefaultBus
- func (b *DefaultBus) GetHealth() HealthCard
- func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler)
- func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
- func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
- func (b *DefaultBus) NotifyHealth(health chan error)
- func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, ...) error
- func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, ...) (*BusMessage, error)
- func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
- func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
- func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, ...) error
- func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler)
- func (b *DefaultBus) Shutdown() (shutdwonErr error)
- func (b *DefaultBus) Start() error
- type DeliveryInfo
- type Glogged
- type HandlerRegister
- type Health
- type HealthCard
- type Invocation
- type Logged
- type Message
- type MessageFilter
- type MessageHandler
- type MessagePolicy
- type Messaging
- type OutboxConfiguration
- type RawMessageHandler
- type RawMessageHandling
- type Registration
- type RequestSagaTimeout
- type Safety
- type Saga
- type SagaConfFn
- type SagaGlue
- type SagaInvocation
- type SagaRegister
- type SagaTimeoutMessage
- type Semantics
- type Serializer
- type TimeoutManager
- type TxOutbox
- type TxProvider
Constants ¶
This section is empty.
Variables ¶
var ( //MaxRetryCount defines the max times a retry can run. //Default is 3 but it is configurable MaxRetryCount uint = 3 //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. BaseRetryDuration = 10 * time.Millisecond //RPCHeaderName used to define the header in grabbit for RPC RPCHeaderName = "x-grabbit-msg-rpc-id" ResurrectedHeaderName = "x-resurrected-from-death" FirstDeathRoutingKeyHeaderName = "x-first-death-routing-key" )
Functions ¶
func GetDeliveryLogEntries ¶ added in v1.1.6
func GetFqn ¶
func GetFqn(obj interface{}) string
GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename
func GetMessageName ¶ added in v1.1.1
GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
func GetTypeFQN ¶
GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename
Types ¶
type AMQPOutbox ¶
type AMQPOutbox struct { SvcName string // contains filtered or unexported fields }
AMQPOutbox sends messages to the amqp transport
func (*AMQPOutbox) NotifyConfirm ¶
func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)
NotifyConfirm send an amqp notification
func (*AMQPOutbox) Post ¶
func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)
Post implements Outbox.Send
func (*AMQPOutbox) Shutdown ¶ added in v1.1.0
func (out *AMQPOutbox) Shutdown()
Shutdown stops the outbox
type Builder ¶
type Builder interface { PurgeOnStartUp() Builder WithDeadlettering(deadletterExchange string) Builder /* Txnl sets the bus to be transactional using a persisted saga store provider: mysql for mysql database connStr: connection string in the format of the passed in provider */ Txnl(provider, connStr string) Builder //WithSerializer provides the ability to plugin custom serializers WithSerializer(serializer Serializer) Builder /* WorkerNum sets the number of worker go routines consuming messages from the queue The default value if this option is not set is 1 */ WorkerNum(workers uint, prefetchCount uint) Builder /* WithConfirms enables publisher confirms */ WithConfirms() Builder //WithPolicies defines the default policies that are applied for evey outgoing amqp messge WithPolicies(policies ...MessagePolicy) Builder //ConfigureHealthCheck defines the default timeout in seconds for the db ping check ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder //RetriesNum defines the number of retries upon error WithConfiguration(config BusConfiguration) Builder //Build the bus Build(svcName string) Bus //WithLogger set custom logger instance WithLogger(logger logrus.FieldLogger) Builder }
Builder is the main interface that should be used to create an instance of a Bus
type Bus ¶
type Bus interface { HandlerRegister Deadlettering RawMessageHandling BusSwitch Messaging SagaRegister Health Logged }
Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type BusConfiguration ¶ added in v1.0.2
type BusConfiguration struct { MaxRetryCount uint BaseRetryDuration int //TODO:Change type to uint OutboxCfg OutboxConfiguration }
BusConfiguration provides configuration passed to the bus builder
type BusMessage ¶
type BusMessage struct { ID string IdempotencyKey string CorrelationID string SagaID string SagaCorrelationID string Semantics Semantics /*cmd or evt*/ Payload Message PayloadFQN string RPCID string }
BusMessage the structure that gets sent to the underlying transport
func NewBusMessage ¶
func NewBusMessage(payload Message) *BusMessage
NewBusMessage factory method for creating a BusMessage that wraps the given payload
func NewFromDelivery ¶ added in v1.1.1
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error)
NewFromDelivery creates a BusMessage from an amqp delivery
func (*BusMessage) GetAMQPHeaders ¶
func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)
func (*BusMessage) GetTraceLog ¶
func (bm *BusMessage) GetTraceLog() (fields []log.Field)
GetTraceLog returns an array of log entires containing all of the message properties
func (*BusMessage) SetFromAMQPHeaders ¶
func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery)
SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (*BusMessage) SetIdempotencyKey ¶ added in v1.1.6
func (bm *BusMessage) SetIdempotencyKey(idempotencyKey string)
func (*BusMessage) SetPayload ¶
func (bm *BusMessage) SetPayload(payload Message)
SetPayload sets the payload and makes sure that Name is saved
func (*BusMessage) TargetSaga ¶ added in v1.1.3
func (bm *BusMessage) TargetSaga(sagaID string)
TargetSaga allows sending the message to a specific Saga instance
type BusSwitch ¶
type BusSwitch interface { /* Start starts the bus, once the bus is started messages get consiumed from the queue and handlers get invoced. Register all handlers prior to calling GBus.Start() */ Start() error /* Shutdown the bus and close connection to the underlying broker */ Shutdown() error }
BusSwitch starts and shutdowns the bus
type Deadlettering ¶ added in v1.1.0
type Deadlettering interface { /* HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead. This function will be removed in future grabbit releases */ HandleDeadletter(handler RawMessageHandler) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error }
Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type DefaultBus ¶
type DefaultBus struct { *Safety *Glogged Outbox TxOutbox PrefetchCount uint AmqpConnStr string SvcName string Registrations []*Registration RPCHandlers map[string]MessageHandler HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex ConsumerLock *sync.Mutex RegisteredSchemas map[string]bool DelayedSubscriptions [][]string PurgeOnStartup bool Glue SagaGlue TxProvider TxProvider WorkerNum uint Serializer Serializer DLX string DefaultPolicies []MessagePolicy Confirm bool DbPingTimeout time.Duration // contains filtered or unexported fields }
DefaultBus implements the Bus interface
func (*DefaultBus) GetHealth ¶
func (b *DefaultBus) GetHealth() HealthCard
GetHealth implements Health.GetHealth
func (*DefaultBus) HandleDeadletter ¶
func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler)
HandleDeadletter implements Deadlettering.HandleDeadletter
func (*DefaultBus) HandleEvent ¶
func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
HandleEvent implements GBus.HandleEvent
func (*DefaultBus) HandleMessage ¶
func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
HandleMessage implements GBus.HandleMessage
func (*DefaultBus) NotifyHealth ¶
func (b *DefaultBus) NotifyHealth(health chan error)
NotifyHealth implements Health.NotifyHealth
func (*DefaultBus) Publish ¶
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error
Publish implements GBus.Publish(topic, message)
func (*DefaultBus) RPC ¶
func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
RPC implements GBus.RPC
func (*DefaultBus) RegisterSaga ¶
func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
RegisterSaga impements GBus.RegisterSaga
func (*DefaultBus) ReturnDeadToQueue ¶ added in v1.1.0
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
ReturnDeadToQueue returns a message to its original destination
func (*DefaultBus) Send ¶
func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error
Send implements GBus.Send(destination string, message interface{})
func (*DefaultBus) SetGlobalRawMessageHandler ¶ added in v1.1.1
func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler)
SetGlobalRawMessageHandler implements RawMessageHandling.SetGlobalRawMessageHandler
func (*DefaultBus) Shutdown ¶
func (b *DefaultBus) Shutdown() (shutdwonErr error)
Shutdown implements GBus.Start()
type DeliveryInfo ¶ added in v1.1.0
DeliveryInfo provdes information as to the attempted deilvery of the invocation
type Glogged ¶ added in v1.0.3
type Glogged struct {
// contains filtered or unexported fields
}
Glogged provides an easy way for structs with in the grabbit package to participate in the general logging schema of the bus
func (*Glogged) Log ¶ added in v1.0.3
func (gl *Glogged) Log() logrus.FieldLogger
Log returns the set default log or a new instance of a logrus.FieldLogger
func (*Glogged) SetLogger ¶ added in v1.0.3
func (gl *Glogged) SetLogger(entry logrus.FieldLogger)
SetLogger sets the default logrus.FieldLogger that should be used when logging a new message
type HandlerRegister ¶
type HandlerRegister interface { /* HandleMessage registers a handler to a specific message type Use this method to register handlers for commands and reply messages Use the HandleEvent method to subscribe on events and register a handler */ HandleMessage(message Message, handler MessageHandler) error /* HandleEvent registers a handler for a specific message type published to an exchange with a specific topic */ HandleEvent(exchange, topic string, event Message, handler MessageHandler) error }
HandlerRegister registers message handlers to specific messages and events
type Health ¶
type Health interface { NotifyHealth(health chan error) GetHealth() HealthCard }
Health reports om health issues in which the bus needs to be restarted
type HealthCard ¶
HealthCard that holds the health values of the bus
type Invocation ¶
type Invocation interface { Logged Reply(ctx context.Context, message *BusMessage) error Bus() Messaging Tx() *sql.Tx Ctx() context.Context InvokingSvc() string Routing() (exchange, routingKey string) DeliveryInfo() DeliveryInfo }
Invocation context for a specific processed message
type Logged ¶ added in v1.0.3
type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger }
Logged represents a grabbit component that can be logged
type Message ¶
type Message interface {
SchemaName() string
}
Message a common interface that passes to the serializers to allow decoding and encoding of content
type MessageFilter ¶
MessageFilter matches rabbitmq topic patterns
func NewMessageFilter ¶
func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter
NewMessageFilter creates a new MessageFilter
func (*MessageFilter) Matches ¶
func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool
Matches the passed in exchange, routingKey, msgName with the defined filter
type MessageHandler ¶
type MessageHandler func(invocation Invocation, message *BusMessage) error
MessageHandler signature for all command handlers
func (MessageHandler) Name ¶ added in v1.1.0
func (mg MessageHandler) Name() string
Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
type MessagePolicy ¶
type MessagePolicy interface {
Apply(publishing *amqp.Publishing)
}
MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..
type Messaging ¶
type Messaging interface { /* Send a command or a command response to a specific service one-to-one semantics */ Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error /* Publish and event, one-to-many semantics */ Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error /* RPC calls the service passing him the request BusMessage and blocks until a reply is received or timeout experied. */ RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error) }
Messaging interface to send and publish messages to the bus
type OutboxConfiguration ¶ added in v1.1.6
type OutboxConfiguration struct { /* Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker. Increase this value if you are experiencing deadlocks. Default is 10 */ Ackers uint //PageSize is the amount of pending messsage records the outbox selects from the database every iteration, the default is 500 PageSize uint //MetricsInterval is the duration the outbox waits between each metrics report, default is 15 seconds MetricsInterval time.Duration //SendInterval is the duration the outbox waits before each iteration, default is 1 second SendInterval time.Duration /* ScavengeInterval is the duration the outbox waits before attempting to re-send messages that were already sent to the broker but were not yet confirmed. Default is 60 seconds */ ScavengeInterval time.Duration }
OutboxConfiguration configures the transactional outbox
type RawMessageHandler ¶ added in v1.1.1
RawMessageHandler signature for handlers that handle raw amqp deliveries
func (RawMessageHandler) Name ¶ added in v1.1.1
func (dlmg RawMessageHandler) Name() string
Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
type RawMessageHandling ¶ added in v1.1.1
type RawMessageHandling interface { /* SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered to the service queue. The handler will get called with a scoped transaction that is a different transaction than the ones that regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery can not be serialized by the bus to one of the registered schemas In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler and afterward will call any registered message handlers. if the global raw handler returns an error the message gets rejected and any additional handlers will not be called. You should not use the global raw message handler to drive business logic as it breaks the local transactivity guarantees grabbit provides and should only be used in specialized cases. If you do decide to use this feature try not shooting yourself in the foot. */ SetGlobalRawMessageHandler(handler RawMessageHandler) }
RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees that the bus provides
type Registration ¶
type Registration struct { Handler MessageHandler // contains filtered or unexported fields }
Registration represents a message handler's registration for a given exchange, topic and msg combination
func NewRegistration ¶
func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration
NewRegistration creates a new registration
func (*Registration) Matches ¶
func (sub *Registration) Matches(exchange, routingKey, msgName string) bool
Matches the registration with the given xchange, routingKey, msgName
type RequestSagaTimeout ¶
type RequestSagaTimeout interface { TimeoutDuration() time.Duration Timeout(tx *sql.Tx, bus Messaging) error }
RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type Saga ¶
type Saga interface { //StartedBy returns the messages that when received should create a new saga instance StartedBy() []Message /* RegisterAllHandlers passes in the HandlerRegister so that the saga can register the messages that it handles */ RegisterAllHandlers(register HandlerRegister) //IsComplete retruns if the saga is complete and can be discarded IsComplete() bool //New is a factory method used by the bus to crerate new instances of a saga New() Saga }
Saga is the base interface for all Sagas.
type SagaConfFn ¶
SagaConfFn is a function to allow configuration of a saga in the context of the gbus
type SagaGlue ¶ added in v1.1.0
type SagaGlue interface { SagaRegister Logged Start() error Stop() error }
SagaGlue glues together all the parts needed in order to orchistrate saga instances
type SagaInvocation ¶ added in v1.1.2
type SagaInvocation interface { ReplyToInitiator(ctx context.Context, message *BusMessage) error //HostingSvc returns the svc name that is executing the service HostingSvc() string //SagaID returns the saga id of the currently invoked saga instance SagaID() string }
SagaInvocation allows saga instances to reply to their creator even when not in the conext of handling the message that starts the saga. A message handler that is attached to a saga instance can safly cast the passed in invocation to SagaInvocation and use the ReplyToInitiator function to send a message to the originating service that sent the message that started the saga
type SagaRegister ¶
type SagaRegister interface {
RegisterSaga(saga Saga, conf ...SagaConfFn) error
}
SagaRegister registers sagas to the bus
type SagaTimeoutMessage ¶
type SagaTimeoutMessage struct {
SagaID string
}
SagaTimeoutMessage is the timeout message for Saga's
func (SagaTimeoutMessage) SchemaName ¶
func (SagaTimeoutMessage) SchemaName() string
SchemaName implements gbus.Message
type Serializer ¶
type Serializer interface { Name() string Encode(message Message) ([]byte, error) Decode(buffer []byte, schemaName string) (Message, error) Register(obj Message) }
Serializer is the base interface for all message serializers
type TimeoutManager ¶ added in v1.1.0
type TimeoutManager interface { //RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error //ClearTimeout clears a timeout for a specific saga ClearTimeout(tx *sql.Tx, sagaID string) error //SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error) //Start starts the timeout manager Start() error //Stop shuts the timeout manager down Stop() error }
TimeoutManager abstracts the implementation of determining when a saga should be timed out