Documentation ¶
Index ¶
- Variables
- func GetFqn(obj interface{}) string
- func GetTypeFQN(t reflect.Type) string
- type AMQPOutbox
- type Builder
- type Bus
- type BusMessage
- type BusSwitch
- type DefaultBus
- func (b *DefaultBus) GetHealth() HealthCard
- func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
- func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
- func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
- func (b *DefaultBus) NotifyHealth(health chan error)
- func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, ...) error
- func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, ...) (*BusMessage, error)
- func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
- func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, ...) error
- func (b *DefaultBus) Shutdown() (shutdwonErr error)
- func (b *DefaultBus) Start() error
- type HandlerRegister
- type Health
- type HealthCard
- type Invocation
- type Message
- type MessageFilter
- type MessageHandler
- type MessagePolicy
- type Messaging
- type RegisterDeadletterHandler
- type Registration
- type RequestSagaTimeout
- type Safety
- type Saga
- type SagaConfFn
- type SagaRegister
- type SagaTimeoutMessage
- type Serializer
- type TxOutbox
- type TxProvider
Constants ¶
This section is empty.
Variables ¶
var ( //MaxRetryCount defines the max times a retry can run MaxRetryCount uint = 3 )
Functions ¶
func GetFqn ¶
func GetFqn(obj interface{}) string
GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename
func GetTypeFQN ¶
GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename
Types ¶
type AMQPOutbox ¶
type AMQPOutbox struct {
// contains filtered or unexported fields
}
AMQPOutbox sends messages to the amqp transport
func (*AMQPOutbox) NotifyConfirm ¶
func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)
NotifyConfirm send an amqp notification
func (*AMQPOutbox) Post ¶
func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)
Post implements Outbox.Send
type Builder ¶
type Builder interface { PurgeOnStartUp() Builder WithDeadlettering(deadletterExchange string) Builder /* Txnl sets the bus to be transactional using a persisted saga store provider: pg for PostgreSQL connStr: connection string in the format of the passed in provider */ Txnl(provider, connStr string) Builder //WithSerializer provides the ability to plugin custom serializers WithSerializer(serializer Serializer) Builder /* WorkerNum sets the number of worker go routines consuming messages from the queue The default value if this option is not set is 1 */ WorkerNum(workers uint, prefetchCount uint) Builder /* WithConfirms enables publisher confirms */ WithConfirms() Builder //WithPolicies defines the default policies that are applied for evey outgoing amqp messge WithPolicies(policies ...MessagePolicy) Builder //ConfigureHealthCheck defines the default timeout in seconds for the db ping check ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder //Build the bus Build(svcName string) Bus }
Builder is the main interface that should be used to create an instance of a Bus
type Bus ¶
type Bus interface { HandlerRegister RegisterDeadletterHandler BusSwitch Messaging SagaRegister Health }
Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type BusMessage ¶
type BusMessage struct { ID string CorrelationID string SagaID string SagaCorrelationID string Semantics string /*cmd or evt*/ Payload Message PayloadFQN string RPCID string }
BusMessage the structure that gets sent to the underlying transport
func NewBusMessage ¶
func NewBusMessage(payload Message) *BusMessage
NewBusMessage factory method for creating a BusMessage that wraps the given payload
func NewFromAMQPHeaders ¶
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage
NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
func (*BusMessage) GetAMQPHeaders ¶
func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)
GetAMQPHeaders convert to AMQP headers Table everything but a payload
func (*BusMessage) SetFromAMQPHeaders ¶
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table)
SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (*BusMessage) SetPayload ¶
func (bm *BusMessage) SetPayload(payload Message)
SetPayload sets the payload and makes sure that Name is saved
type BusSwitch ¶
type BusSwitch interface { /* Start starts the bus, once the bus is started messages get consiumed from the queue and handlers get invoced. Register all handlers prior to calling GBus.Start() */ Start() error /* Shutdown the bus and close connection to the underlying broker */ Shutdown() error }
BusSwitch starts and shutdowns the bus
type DefaultBus ¶
type DefaultBus struct { *Safety Outgoing *AMQPOutbox Outbox TxOutbox PrefetchCount uint AmqpConnStr string AMQPChannel *amqp.Channel SvcName string Registrations []*Registration RPCHandlers map[string]MessageHandler HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex ConsumerLock *sync.Mutex RegisteredSchemas map[string]bool DelayedSubscriptions [][]string PurgeOnStartup bool Glue SagaRegister TxProvider TxProvider IsTxnl bool WorkerNum uint Serializer Serializer DLX string DefaultPolicies []MessagePolicy Confirm bool DbPingTimeout time.Duration // contains filtered or unexported fields }
DefaultBus implements the Bus interface
func (*DefaultBus) GetHealth ¶
func (b *DefaultBus) GetHealth() HealthCard
GetHealth implements Health.GetHealth
func (*DefaultBus) HandleDeadletter ¶
HandleDeadletter implements GBus.HandleDeadletter
func (*DefaultBus) HandleEvent ¶
func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
HandleEvent implements GBus.HandleEvent
func (*DefaultBus) HandleMessage ¶
func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error
HandleMessage implements GBus.HandleMessage
func (*DefaultBus) NotifyHealth ¶
func (b *DefaultBus) NotifyHealth(health chan error)
NotifyHealth implements Health.NotifyHealth
func (*DefaultBus) Publish ¶
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error
Publish implements GBus.Publish(topic, message)
func (*DefaultBus) RPC ¶
func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
RPC implements GBus.RPC
func (*DefaultBus) RegisterSaga ¶
func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error
RegisterSaga impements GBus.RegisterSaga
func (*DefaultBus) Send ¶
func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error
Send implements GBus.Send(destination string, message interface{})
func (*DefaultBus) Shutdown ¶
func (b *DefaultBus) Shutdown() (shutdwonErr error)
Shutdown implements GBus.Start()
type HandlerRegister ¶
type HandlerRegister interface { /* HandleMessage registers a handler to a specific message type Use this method to register handlers for commands and reply messages Use the HandleEvent method to subscribe on events and register a handler */ HandleMessage(message Message, handler MessageHandler) error /* HandleEvent registers a handler for a specific message type published to an exchange with a specific topic */ HandleEvent(exchange, topic string, event Message, handler MessageHandler) error }
HandlerRegister registers message handlers to specific messages and events
type Health ¶
type Health interface { NotifyHealth(health chan error) GetHealth() HealthCard }
Health reports om health issues in which the bus needs to be restarted
type HealthCard ¶
HealthCard that holds the health values of the bus
type Invocation ¶
type Invocation interface { Reply(ctx context.Context, message *BusMessage) error Bus() Messaging Tx() *sql.Tx Ctx() context.Context Routing() (exchange, routingKey string) }
Invocation context for a specific processed message
type Message ¶
type Message interface {
SchemaName() string
}
Message a common interface that passes to the serializers to allow decoding and encoding of content
type MessageFilter ¶
func NewMessageFilter ¶
func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter
func (*MessageFilter) Matches ¶
func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool
type MessageHandler ¶
type MessageHandler func(invocation Invocation, message *BusMessage) error
MessageHandler signature for all command handlers
type MessagePolicy ¶
type MessagePolicy interface {
Apply(publishing *amqp.Publishing)
}
MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..
type Messaging ¶
type Messaging interface { /* Send a command or a command response to a specific service one-to-one semantics */ Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error /* Publish and event, one-to-many semantics */ Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error /* RPC calls the service passing him the request BusMessage and blocks until a reply is received or timeout experied. */ RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error) }
Messaging interface to send and publish messages to the bus
type RegisterDeadletterHandler ¶
type RegisterDeadletterHandler interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
}
RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Registration ¶
type Registration struct { Handler MessageHandler // contains filtered or unexported fields }
func NewRegistration ¶
func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration
func (*Registration) Matches ¶
func (sub *Registration) Matches(exchange, routingKey, msgName string) bool
type RequestSagaTimeout ¶
type RequestSagaTimeout interface { TimeoutDuration() time.Duration Timeout(invocation Invocation, message *BusMessage) error }
RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type Saga ¶
type Saga interface { //StartedBy returns the messages that when received should create a new saga instance StartedBy() []Message /* RegisterAllHandlers passes in the HandlerRegister so that the saga can register the messages that it handles */ RegisterAllHandlers(register HandlerRegister) //IsComplete retruns if the saga is complete and can be discarded IsComplete() bool //New is a factory method used by the bus to crerate new instances of a saga New() Saga }
Saga is the base interface for all Sagas.
type SagaConfFn ¶
SagaConfFn is a function to allow configuration of a saga in the context of the gbus
type SagaRegister ¶
type SagaRegister interface {
RegisterSaga(saga Saga, conf ...SagaConfFn) error
}
SagaRegister registers sagas to the bus
type SagaTimeoutMessage ¶
type SagaTimeoutMessage struct {
SagaID string
}
SagaTimeoutMessage is the timeout message for Saga's
func (SagaTimeoutMessage) SchemaName ¶
func (SagaTimeoutMessage) SchemaName() string
SchemaName implements gbus.Message
type Serializer ¶
type Serializer interface { Name() string Encode(message Message) ([]byte, error) Decode(buffer []byte, schemaName string) (Message, error) Register(obj Message) }
Serializer is the base interface for all message serializers