Documentation ¶
Overview ¶
Package endpoint contains components that are composed to configure an 'endpoint', which can send and receive messages to other endpoints.
Index ¶
- Constants
- Variables
- func WithEnvelope(p context.Context, env InboundEnvelope) context.Context
- type Acknowledger
- type AttemptID
- type BufferedSink
- type Endpoint
- type InboundEnvelope
- type InboundPipeline
- type InboundRejecter
- type InboundTransport
- type MessageSink
- type Operation
- type OutboundEnvelope
- type OutboundPipeline
- type OutboundRejecter
- type OutboundTracer
- type OutboundTransport
- type RetryPolicy
- type SelfValidatingMessage
- type SelfValidator
- type SinkSender
- type TimeLimiter
- type TransportStage
- type Validator
Constants ¶
const DefaultTimeout = 5 * time.Second
DefaultTimeout is the default timeout duration to use if none is given.
Variables ¶
var DefaultRetryPolicy = NewExponentialBackoffPolicy( 3, 0, 1*time.Second, 1*time.Hour, )
DefaultRetryPolicy is the default RetryPolicy.
It allows for 3 immediate attempts, after which each attempt is delayed exponentially, for a maximum of 10 attempts before the message is rejected.
var DefaultValidators = []Validator{ &SelfValidator{}, }
DefaultValidators is the set of validators used to validate outgoing messages if no other set of validators is configured on the endpoint.
Functions ¶
func WithEnvelope ¶
func WithEnvelope(p context.Context, env InboundEnvelope) context.Context
WithEnvelope returns a new context derived from p that contains env. The envelope can be retreived from the context with GetEnvelope().
Types ¶
type Acknowledger ¶
type Acknowledger interface { // Ack acknowledges the message, indicating that is was handled successfully // and does not need to be retried. Ack(ctx context.Context) error // Retry requeues the message so that it is retried at some point in the // future. // // d is a hint as to how long the transport should wait before retrying // this message. Retry(ctx context.Context, err error, d time.Duration) error // Reject indicates that the message could not be handled and should not be // retried. Depending on the transport, this may move the message to some form // of error queue or otherwise drop the message completely. Reject(ctx context.Context, err error) error }
Acknowledger is an interface for acknowledging a specific inbound message.
type AttemptID ¶
AttemptID uniquely identifies an attempt to process a message.
func GenerateAttemptID ¶
func GenerateAttemptID() AttemptID
GenerateAttemptID generates a new unique identifier for a processing attempt.
func MustParseAttemptID ¶
MustParseAttemptID parses s into an attempt ID and returns it. It panics if s is empty.
func ParseAttemptID ¶
ParseAttemptID parses s into an attempt ID and returns it. It returns an error if s is empty.
type BufferedSink ¶
type BufferedSink struct {
// contains filtered or unexported fields
}
BufferedSink is a MessageSink that buffers message envelopes in memory.
func (*BufferedSink) Accept ¶
func (s *BufferedSink) Accept(ctx context.Context, env OutboundEnvelope) error
Accept buffers env in memory.
func (*BufferedSink) Envelopes ¶
func (s *BufferedSink) Envelopes() []OutboundEnvelope
Envelopes returns the message envelopes that have been buffered.
func (*BufferedSink) Reset ¶
func (s *BufferedSink) Reset()
Reset removes the buffered message envelopes.
func (*BufferedSink) TakeEnvelopes ¶
func (s *BufferedSink) TakeEnvelopes() []OutboundEnvelope
TakeEnvelopes returns the message envelopes that have been buffered and resets the sink in a single operation.
type Endpoint ¶
type Endpoint struct { Name string OutboundTransport OutboundTransport InboundTransport InboundTransport InboundPipeline InboundPipeline OutboundPipeline OutboundPipeline RetryPolicy RetryPolicy SenderValidators []Validator Tracer opentracing.Tracer // contains filtered or unexported fields }
Endpoint is a named source and recipient of messages.
type InboundEnvelope ¶
type InboundEnvelope struct { ax.Envelope // SourceEndpoint is the endpoint that sent the message. SourceEndpoint string // AttemptID uniquely identifies the attempt to process this message. AttemptID AttemptID // AttemptCount is the number of times that an attempt has been made to process // this message. // // Messages may be retried after a failure handling the message, or if // an endpoint crashes, for example. Not all transports support an attempt // count. If the attempt count is unknown, it is set to zero. // // The attempt count may be reset if a message is manually re-queued after // being rejected by the retry policy. AttemptCount uint // SpanContext is the tracing context that was propagated with the message. SpanContext opentracing.SpanContext }
InboundEnvelope is a specialization of ax.Envelope for messages that are received by an endpoint.
Inbound envelopes traverse an InboundPipeline.
func GetEnvelope ¶
func GetEnvelope(ctx context.Context) (env InboundEnvelope, ok bool)
GetEnvelope returns the message envelope contained in ctx. If ctx does not contain an envelope then ok is false.
type InboundPipeline ¶
type InboundPipeline interface { // Initialize is called during initialization of the endpoint, after the // transport is initialized. It can be used to inspect or further // configure the endpoint as per the needs of the pipeline. Initialize(ctx context.Context, ep *Endpoint) error // Accept forwards an inbound message through the pipeline until // it is handled by some application-defined message handler(s). Accept(ctx context.Context, sink MessageSink, env InboundEnvelope) error }
InboundPipeline is an interface for a message pipeline that processes messages received from the message transport.
A "stage" within the pipeline is simply an implementation of the InboundPipeline interface that forwards messages to another pipeline.
type InboundRejecter ¶
type InboundRejecter struct { Validators []Validator Next InboundPipeline }
InboundRejecter is an inbound pipeline stage that validates messages before forwarding them to the next pipeline stage. It uses a set of validators distinct from those configured in the endpoint.
func (*InboundRejecter) Accept ¶
func (i *InboundRejecter) Accept( ctx context.Context, sink MessageSink, env InboundEnvelope, ) error
Accept forwards an inbound message to the next pipeline stage only if it is successfully validated.
func (*InboundRejecter) Initialize ¶
func (i *InboundRejecter) Initialize( ctx context.Context, ep *Endpoint, ) error
Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.
type InboundTransport ¶
type InboundTransport interface { // Initialize sets up the transport to communicate as an endpoint named ep. Initialize(ctx context.Context, ep string) error // Subscribe configures the transport to listen to messages of type mt that are // sent using op. Subscribe(ctx context.Context, op Operation, mt ax.MessageTypeSet) error // Receive returns the next message sent to this endpoint. // It blocks until a message is available, or ctx is canceled. Receive(ctx context.Context) (InboundEnvelope, Acknowledger, error) }
InboundTransport is an interface for receiving messages from endpoints.
type MessageSink ¶
type MessageSink interface { // Accept processes the message encapsulated in env. Accept(ctx context.Context, env OutboundEnvelope) error }
MessageSink is an interface that accepts outbound message envelopes as input.
type Operation ¶
type Operation int
Operation is an enumeration of transport operations that can be performed in order to send an outbound message.
const ( // OpSendUnicast is an outbound transport operation that sends a message to // a specific endpoint as determined by the outbound message's // DestinationEndpoint property. OpSendUnicast Operation = iota // OpSendMulticast is an outbound transport operation that sends a message // to all of its subscribers. OpSendMulticast )
type OutboundEnvelope ¶
type OutboundEnvelope struct { ax.Envelope // Operation is the operation to be performed on the message. It dictates // how the message is sent by the transport. Operation Operation // DestinationEndpoint is the endpoint to which the message is sent when // Operation is OpSendUnicast. The field is ignored for other operations. DestinationEndpoint string // SpanContext is the tracing context to propagate with the message. SpanContext opentracing.SpanContext }
OutboundEnvelope is a specialization of ax.Envelope for messages that are sent by an endpoint.
Outbound envelopes traverse an OutboundPipeline.
type OutboundPipeline ¶
type OutboundPipeline interface { MessageSink // Initialize is called during initialization of the endpoint, after the // transport is initialized. It can be used to inspect or further // configure the endpoint as per the needs of the pipeline. Initialize(ctx context.Context, ep *Endpoint) error }
OutboundPipeline is an interface for a message pipeline that processes messages that are sent via the message transport.
A "stage" within the pipeline is simply an implementation of the OutboundPipeline interface that forwards messages to another pipeline.
type OutboundRejecter ¶
type OutboundRejecter struct { Validators []Validator Next OutboundPipeline }
OutboundRejecter is an outbound pipeline stage that validates messages before forwarding them to the next pipeline stage. It uses a set of validators distinct from those configured in the endpoint.
func (*OutboundRejecter) Accept ¶
func (o *OutboundRejecter) Accept( ctx context.Context, env OutboundEnvelope, ) error
Accept forwards an outbound message to the next pipeline stage only if it is successfully validated.
func (*OutboundRejecter) Initialize ¶
func (o *OutboundRejecter) Initialize( ctx context.Context, ep *Endpoint, ) error
Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.
type OutboundTracer ¶
type OutboundTracer struct { Tracer opentracing.Tracer Next OutboundPipeline }
OutboundTracer is an implementation of OutboundPipeline that starts a new OpenTracing span before forwarding to the next stage.
func (OutboundTracer) Accept ¶
func (s OutboundTracer) Accept(ctx context.Context, env OutboundEnvelope) error
Accept processes the message encapsulated in env.
func (OutboundTracer) Initialize ¶
func (s OutboundTracer) Initialize(ctx context.Context, ep *Endpoint) error
Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.
type OutboundTransport ¶
type OutboundTransport interface { // Initialize sets up the transport to communicate as an endpoint named ep. Initialize(ctx context.Context, ep string) error // Send sends env via the transport. Send(ctx context.Context, env OutboundEnvelope) error }
OutboundTransport is an interface for sending messages to endpoints.
type RetryPolicy ¶
type RetryPolicy func(InboundEnvelope, error) (time.Duration, bool)
RetryPolicy is a function responsible for determining whether or not a message should be retried.
It returns the delay that should occur before retrying, and a bool indicating whether or not the message should be retried at all.
func NewExponentialBackoffPolicy ¶
func NewExponentialBackoffPolicy( ir, mr uint, bt, mt time.Duration, ) RetryPolicy
NewExponentialBackoffPolicy returns a retry policy that allows a fixed number of immediate attempts after which retries are delayed exponentially for a until some maximum delay is reached.
Optionally, the message can be rejected after some fixed number of retries.
ir is the number of immediate attempts. mr is the maximum total attempts before rejecting the message. If mr is zero, the message is retried indefinitely.
bt is a "base" delay between retries. It is used as a multplier for the backoff duration. mt is the maximum delay between retries.
type SelfValidatingMessage ¶
type SelfValidatingMessage interface { ax.Message // Validate returns a non-nil error if the message is invalid. It is up to // message implementation to check validity criteria. // // This method is invoked by SelfValidator that is one the default // validators to verify the message. Validate() error }
SelfValidatingMessage is a message that can perform its own superficial validation.
type SelfValidator ¶
type SelfValidator struct{}
SelfValidator is one of the default message validators that validates the message if it implements SelfValidatingMessage interface.
type SinkSender ¶
type SinkSender struct { Sink MessageSink Validators []Validator }
SinkSender is an implementation of ax.Sender that passes messages to a message sink.
func (SinkSender) ExecuteCommand ¶
func (s SinkSender) ExecuteCommand( ctx context.Context, m ax.Command, opts ...ax.ExecuteOption, ) (ax.Envelope, error)
ExecuteCommand sends a command message.
If ctx contains a message envelope, m is sent as a child of the message in that envelope.
func (SinkSender) PublishEvent ¶
func (s SinkSender) PublishEvent( ctx context.Context, m ax.Event, opts ...ax.PublishOption, ) (ax.Envelope, error)
PublishEvent sends an event message.
If ctx contains a message envelope, m is sent as a child of the message in that envelope.
type TimeLimiter ¶
type TimeLimiter struct { Timeout time.Duration Next InboundPipeline }
TimeLimiter is an inbound pipeline that sets a context timeout before forwarding on to the next stage.
func (TimeLimiter) Accept ¶
func (tl TimeLimiter) Accept(ctx context.Context, sink MessageSink, env InboundEnvelope) error
Accept forwards an inbound message through the pipeline until it is handled by some application-defined message handler(s).
func (TimeLimiter) Initialize ¶
func (tl TimeLimiter) Initialize(ctx context.Context, ep *Endpoint) error
Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.
type TransportStage ¶
type TransportStage struct {
// contains filtered or unexported fields
}
TransportStage is an outbound pipeline stage that forwards messages to a transport. It is typically used as the last stage in an outbound pipeline.
func (*TransportStage) Accept ¶
func (s *TransportStage) Accept(ctx context.Context, env OutboundEnvelope) error
Accept sends env via the transport.
func (*TransportStage) Initialize ¶
func (s *TransportStage) Initialize(ctx context.Context, ep *Endpoint) error
Initialize is called during initialization of the endpoint, after the transport is initialized. It can be used to inspect or further configure the endpoint as per the needs of the pipeline.
type Validator ¶
type Validator interface { // Validate checks if m is valid. // // It returns a non-nil error if the message is invalid. The meaning of // 'valid' in is implementation-defined. Validate(ctx context.Context, m ax.Message) error }
Validator is an interface for validating messages.
Application-defined validators can be implemented to provide superficial and domain validation. Each endpoint has a set of validators that are used to validate outgoing messages. Additionally, the validation.InboundRejecter and validation.OutboundRejecter can be used to perform message validation at any point in a pipeline.