Documentation
¶
Index ¶
- func UseSagas(store saga.Store) func(endpoint *ServiceBusEndpoint)
- type Endpoint
- type IncomingMessageConfiguration
- func (config *IncomingMessageConfiguration) Handle(handler func(ctx *IncomingMessageContext)) *IncomingMessageConfiguration
- func (config *IncomingMessageConfiguration) Mutate(behavior IncomingMutation) *IncomingMessageConfiguration
- func (config *IncomingMessageConfiguration) StartSaga(saga string) *IncomingMessageConfiguration
- type IncomingMessageContext
- func (context *IncomingMessageContext) Bind(obj interface{}) error
- func (context *IncomingMessageContext) Publish(messageType string, msg interface{}, options ...OutgoingMutation) error
- func (context *IncomingMessageContext) Reply(messageType string, msg interface{}, options ...OutgoingMutation) error
- func (context *IncomingMessageContext) RequestSaga(sagaType string) (*saga.Context, error)
- func (context *IncomingMessageContext) Send(messageType string, destination string, msg interface{}, ...) error
- func (context *IncomingMessageContext) SendLocal(messageType string, msg interface{}, options ...OutgoingMutation) error
- type IncomingMutation
- type MessageConfiguration
- type OutgoingMessageConfiguration
- type OutgoingMessageContext
- type OutgoingMutation
- type RetryConfiguration
- type RetryPolicy
- type ServiceBusEndpoint
- func (endpoint *ServiceBusEndpoint) Message(messageType string) *MessageConfiguration
- func (endpoint *ServiceBusEndpoint) Publish(messageType string, msg interface{}, options ...OutgoingMutation) error
- func (endpoint *ServiceBusEndpoint) SagaStore() saga.Store
- func (endpoint *ServiceBusEndpoint) Send(messageType string, destination string, msg interface{}, ...) error
- func (endpoint *ServiceBusEndpoint) SendLocal(messageType string, msg interface{}, options ...OutgoingMutation) error
- func (endpoint *ServiceBusEndpoint) Start() error
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func UseSagas ¶
func UseSagas(store saga.Store) func(endpoint *ServiceBusEndpoint)
Types ¶
type Endpoint ¶
type Endpoint interface { Message(messageType string) *MessageConfiguration Start() error Publish(messageType string, msg interface{}, options ...OutgoingMutation) error Send(messageType string, destination string, msg interface{}, options ...OutgoingMutation) error SendLocal(messageType string, msg interface{}, options ...OutgoingMutation) error SagaStore() saga.Store }
type IncomingMessageConfiguration ¶
type IncomingMessageConfiguration struct {
// contains filtered or unexported fields
}
func (*IncomingMessageConfiguration) Handle ¶
func (config *IncomingMessageConfiguration) Handle(handler func(ctx *IncomingMessageContext)) *IncomingMessageConfiguration
Handles the incoming message context with the given function
func (*IncomingMessageConfiguration) Mutate ¶
func (config *IncomingMessageConfiguration) Mutate(behavior IncomingMutation) *IncomingMessageConfiguration
Mutates the incoming message context with the given function. Multiple mutations will be executed in order of declaration.
func (*IncomingMessageConfiguration) StartSaga ¶
func (config *IncomingMessageConfiguration) StartSaga(saga string) *IncomingMessageConfiguration
Start a saga of the given type whenever a message of this configuration has been received.
type IncomingMessageContext ¶
type IncomingMessageContext struct { Headers map[string]interface{} Origin string Payload []byte Type string CorrelationId string CorrelationTimestamp time.Time MessageId string Timestamp time.Time Priority uint8 Ack func() Retry func() Discard func() Fail func() Test string // contains filtered or unexported fields }
The IncomingMessageContext holds the message information of the ServiceBusEndpoint instance that handled the message.
func (*IncomingMessageContext) Bind ¶
func (context *IncomingMessageContext) Bind(obj interface{}) error
Bind the message payload to a struct object
func (*IncomingMessageContext) Publish ¶
func (context *IncomingMessageContext) Publish(messageType string, msg interface{}, options ...OutgoingMutation) error
Publish a message to all subscribers.
func (*IncomingMessageContext) Reply ¶
func (context *IncomingMessageContext) Reply(messageType string, msg interface{}, options ...OutgoingMutation) error
Reply with a message to the origin of the current message context.
func (*IncomingMessageContext) RequestSaga ¶
func (context *IncomingMessageContext) RequestSaga(sagaType string) (*saga.Context, error)
Request a saga from the persistence store and applies a transaction lock
func (*IncomingMessageContext) Send ¶
func (context *IncomingMessageContext) Send(messageType string, destination string, msg interface{}, options ...OutgoingMutation) error
Send a message to a specific ServiceBusEndpoint.
func (*IncomingMessageContext) SendLocal ¶
func (context *IncomingMessageContext) SendLocal(messageType string, msg interface{}, options ...OutgoingMutation) error
Send the message to the local ServiceBusEndpoint.
type IncomingMutation ¶
type IncomingMutation func(ctx *IncomingMessageContext)
type MessageConfiguration ¶
type MessageConfiguration struct {
// contains filtered or unexported fields
}
func (*MessageConfiguration) AsIncoming ¶
func (config *MessageConfiguration) AsIncoming() *IncomingMessageConfiguration
Declare this message configuration to be an incoming message.
func (*MessageConfiguration) AsOutgoing ¶
func (config *MessageConfiguration) AsOutgoing() *OutgoingMessageConfiguration
Declare this message configuration to be an outgoing message.
type OutgoingMessageConfiguration ¶
type OutgoingMessageConfiguration struct {
// contains filtered or unexported fields
}
func (*OutgoingMessageConfiguration) Mutate ¶
func (config *OutgoingMessageConfiguration) Mutate(behavior OutgoingMutation) *OutgoingMessageConfiguration
Mutates the outgoing message context with the given function. Multiple mutations will be executed in order of declaration.
func (*OutgoingMessageConfiguration) Retry ¶
func (config *OutgoingMessageConfiguration) Retry(maxRetries int, policy RetryPolicy) *OutgoingMessageConfiguration
type OutgoingMessageContext ¶
type OutgoingMessageContext struct { Origin string Type string CorrelationId string CorrelationTimestamp time.Time MessageId string Timestamp time.Time Payload interface{} Priority uint8 Headers map[string]interface{} Version string IsCancelled bool // contains filtered or unexported fields }
func CreateOutgoingContext ¶
func CreateOutgoingContext(endpoint Endpoint) *OutgoingMessageContext
func (*OutgoingMessageContext) Cancel ¶
func (context *OutgoingMessageContext) Cancel()
type OutgoingMutation ¶
type OutgoingMutation func(ctx *OutgoingMessageContext)
type RetryConfiguration ¶
type RetryConfiguration struct { MaxRetries int Policy RetryPolicy }
func (*RetryConfiguration) Execute ¶
func (cfg *RetryConfiguration) Execute(f func() error) error
type RetryPolicy ¶
type ServiceBusEndpoint ¶
type ServiceBusEndpoint struct {
// contains filtered or unexported fields
}
func (*ServiceBusEndpoint) Message ¶
func (endpoint *ServiceBusEndpoint) Message(messageType string) *MessageConfiguration
Declare a message configuration.
func (*ServiceBusEndpoint) Publish ¶
func (endpoint *ServiceBusEndpoint) Publish(messageType string, msg interface{}, options ...OutgoingMutation) error
Publish a message to all subscribers
func (*ServiceBusEndpoint) SagaStore ¶
func (endpoint *ServiceBusEndpoint) SagaStore() saga.Store
func (*ServiceBusEndpoint) Send ¶
func (endpoint *ServiceBusEndpoint) Send(messageType string, destination string, msg interface{}, options ...OutgoingMutation) error
Send a message to a specific ServiceBusEndpoint
func (*ServiceBusEndpoint) SendLocal ¶
func (endpoint *ServiceBusEndpoint) SendLocal(messageType string, msg interface{}, options ...OutgoingMutation) error
Send the message to the local ServiceBusEndpoint
func (*ServiceBusEndpoint) Start ¶
func (endpoint *ServiceBusEndpoint) Start() error
Start receiving/sending messages with the ServiceBus and setup transport topology.
type Transport ¶
type Transport interface { Start(endpointName string) error RegisterRouting(route string) error UnregisterRouting(route string) error Publish(message *OutgoingMessageContext) error Send(destination string, command *OutgoingMessageContext) error SendLocal(command *OutgoingMessageContext) error MessageReceived(chan *IncomingMessageContext) chan *IncomingMessageContext }