Documentation ¶
Overview ¶
Package ax is a toolkit for writing distributed message-driven applications, with features specifically focused on CQRS and Event Sourcing.
Index ¶
- Variables
- func MarshalEnvelope(env Envelope) ([]byte, error)
- func MarshalMessage(m Message) (contentType string, data []byte, err error)
- type Command
- type CommandExecutor
- type Envelope
- type EnvelopeProto
- func (*EnvelopeProto) Descriptor() ([]byte, []int)deprecated
- func (x *EnvelopeProto) GetCausationId() string
- func (x *EnvelopeProto) GetCorrelationId() string
- func (x *EnvelopeProto) GetCreatedAt() *timestamppb.Timestamp
- func (x *EnvelopeProto) GetMessage() *anypb.Any
- func (x *EnvelopeProto) GetMessageId() string
- func (x *EnvelopeProto) GetSendAt() *timestamppb.Timestamp
- func (*EnvelopeProto) ProtoMessage()
- func (x *EnvelopeProto) ProtoReflect() protoreflect.Message
- func (x *EnvelopeProto) Reset()
- func (x *EnvelopeProto) String() string
- type Event
- type EventRecorder
- type ExecuteOption
- type Message
- type MessageContext
- type MessageID
- type MessageType
- type MessageTypeSet
- func (s MessageTypeSet) Add(mt MessageType) MessageTypeSet
- func (s MessageTypeSet) Has(mt MessageType) bool
- func (s MessageTypeSet) Intersection(o MessageTypeSet) MessageTypeSet
- func (s MessageTypeSet) Len() int
- func (s MessageTypeSet) Members() []MessageType
- func (s MessageTypeSet) Union(o MessageTypeSet) MessageTypeSet
- type PublishOption
- type SendOption
- type Sender
Constants ¶
This section is empty.
Variables ¶
var File_github_com_jmalloc_ax_envelope_proto protoreflect.FileDescriptor
Functions ¶
func MarshalEnvelope ¶
MarshalEnvelope marshals env to a binary representation.
Types ¶
type Command ¶
type Command interface { Message // IsCommand() is a "marker method" used to indicate that a message is // intended to be used as a command. IsCommand() }
Command is a message that requests some action take place.
Commands are always sent to a single handler within a single end-point.
type CommandExecutor ¶
type CommandExecutor func(Command, ...ExecuteOption)
CommandExecutor is a function that queues a command to be executed. It is used to send commands within workflows. See saga.NewWorkflow().
type Envelope ¶
type Envelope struct { // MessageID is a globally unique identifier for a single message. // // Among other things, the message ID is often used during message // de-duplicatation in order to provide exactly-one handling semantics. MessageID MessageID // CausationID is the ID of the message that directly caused this message to // occur. // // Messages can be thought of as occurring within a tree of messages. The // CausationID identifies the direct parent message within that tree. // // When a message is injected into the messaging system via a MessageBus, // the CausationID is set to the MessageID, that is, the message is its own // cause. // // When a message is sent via a MessageContext, the CausationID is // automatically set to the MessageID of the message being handled in that // context. CausationID MessageID // CorrelationID is the ID of the message that (perhaps indirectly) caused // this message to occur. // // Messages can be thought of as occurring within a tree of messages. The // CorrelationID identifies the message at the root of that tree. // // When a message is injected into the message system via a MessageBus, // the CorrelationID is set to the MessageID, that is, the message is at the // root of the tree. // // When a message is sent via a MessageContext, the CorrelationID is // automatically set to the CorrelationID of the message being handled in // that context. CorrelationID MessageID // CreatedAt is the time at which the message was created. This typically // correlates to the time at which the message was passed to a Sender. // // It is populated via the regular Go system clock, and as such there are // almost no guarantees about the accuracy of the time. It must not be // assumed that messages will arrive in any chronological order. // // Depending on the application this field may not be appropriate for use as an // "occurred" time. Care must be taken to choose appropriately between // CreatedAt and SendAt for each use case. CreatedAt time.Time // SendAt is the time at which the message should be sent by the endpoint, // which may be after the CreatedAt time. // // Depending on the application this field may not be appropriate for use as an // "occurred" time. Care must be taken to choose appropriately between // CreatedAt and SendAt for each use case. SendAt time.Time // Message is the application-defined message encapsulated by the envelope. Message Message }
Envelope is a container for a message and its associated meta-data.
func NewEnvelope ¶
NewEnvelope creates a new message envelope containing m.
It generates a UUID-based message ID and configures the envelope such that m is at the root of a new tree of messages.
func NewEnvelopeFromProto ¶
func NewEnvelopeFromProto(env *EnvelopeProto) (Envelope, error)
NewEnvelopeFromProto returns a new envelope from its protocol-buffers representation.
func UnmarshalEnvelope ¶
UnmarshalEnvelope unmarshals an envelope from its serialized representation.
func (Envelope) AsProto ¶
func (e Envelope) AsProto() (*EnvelopeProto, error)
AsProto returns a Protocol Buffers representation of the envelope.
func (Envelope) Delay ¶
Delay returns the delay between the messages creation time and the time at which it is to be sent.
func (Envelope) NewChild ¶
NewChild returns a new message envelope containing m.
It generates a UUID-based message ID and configures the envelope such that m is a child of e.Message within an existing tree of messages.
func (Envelope) Type ¶
func (e Envelope) Type() MessageType
Type returns the message type of the message contained in the envelope.
type EnvelopeProto ¶
type EnvelopeProto struct { MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` CausationId string `protobuf:"bytes,2,opt,name=causation_id,json=causationId,proto3" json:"causation_id,omitempty"` CorrelationId string `protobuf:"bytes,3,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"` CreatedAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` SendAt *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=send_at,json=sendAt,proto3" json:"send_at,omitempty"` Message *anypb.Any `protobuf:"bytes,6,opt,name=message,proto3" json:"message,omitempty"` // contains filtered or unexported fields }
EnvelopeProto is a Protocol Buffers representation of an Envelope.
func (*EnvelopeProto) Descriptor
deprecated
func (*EnvelopeProto) Descriptor() ([]byte, []int)
Deprecated: Use EnvelopeProto.ProtoReflect.Descriptor instead.
func (*EnvelopeProto) GetCausationId ¶
func (x *EnvelopeProto) GetCausationId() string
func (*EnvelopeProto) GetCorrelationId ¶
func (x *EnvelopeProto) GetCorrelationId() string
func (*EnvelopeProto) GetCreatedAt ¶
func (x *EnvelopeProto) GetCreatedAt() *timestamppb.Timestamp
func (*EnvelopeProto) GetMessage ¶
func (x *EnvelopeProto) GetMessage() *anypb.Any
func (*EnvelopeProto) GetMessageId ¶
func (x *EnvelopeProto) GetMessageId() string
func (*EnvelopeProto) GetSendAt ¶
func (x *EnvelopeProto) GetSendAt() *timestamppb.Timestamp
func (*EnvelopeProto) ProtoMessage ¶
func (*EnvelopeProto) ProtoMessage()
func (*EnvelopeProto) ProtoReflect ¶ added in v0.4.2
func (x *EnvelopeProto) ProtoReflect() protoreflect.Message
func (*EnvelopeProto) Reset ¶
func (x *EnvelopeProto) Reset()
func (*EnvelopeProto) String ¶
func (x *EnvelopeProto) String() string
type Event ¶
type Event interface { Message // IsEvent() is a "marker method" used to indicate that a message is // intended to be used as an event. IsEvent() }
Event is a message that indicates some action has already taken place.
Events are published by one endpoint and (potentially) consumed by many.
type EventRecorder ¶
type EventRecorder func(Event)
EventRecorder is a function that records the occurrence of events. It is used to record events within aggregates. See saga.NewAggregate().
type ExecuteOption ¶
ExecuteOption is configures an envelope containing a command message to exhibit some specific behavior.
func Delay ¶
func Delay(d time.Duration) ExecuteOption
Delay is an option that delays sending the message until a duration has passed. Events can not be delayed.
func DelayUntil ¶
func DelayUntil(t time.Time) ExecuteOption
DelayUntil is an option that delays sending the message until a specific time. Events can not be delayed.
type Message ¶
type Message interface { proto.Message // MessageDescription returns a human-readable description of the message. // // Assume that the description will be used inside log messages or displayed // in audit logs. They should be understood by non-developers who are // familiar with the application's business domain. // // Follow the same conventions as for error messages: // https://github.com/golang/go/wiki/CodeReviewComments#error-strings MessageDescription() string }
Message is a unit of communication.
type MessageContext ¶
type MessageContext struct { // Envelope is the message envelope containing the message to be handled. Envelope Envelope // contains filtered or unexported fields }
MessageContext provides context about the message being handled.
func NewMessageContext ¶
func NewMessageContext( env Envelope, span opentracing.Span, logger twelf.Logger, ) MessageContext
NewMessageContext returns a message context for the given envelope.
func (*MessageContext) Log ¶
func (c *MessageContext) Log(f string, v ...interface{})
Log writes an application-level log message about the handling of the message.
The log messages should be understood by non-developers who are familiar with the application's business domain.
type MessageID ¶
MessageID uniquely identifies a message.
func GenerateMessageID ¶
func GenerateMessageID() MessageID
GenerateMessageID generates a new unique identifier for a message.
func MustParseMessageID ¶
MustParseMessageID parses s into a message ID and returns it. It panics if s is empty.
func ParseMessageID ¶
ParseMessageID parses s into a message ID and returns it. It returns an error if s is empty.
type MessageType ¶
type MessageType struct { // Name is the fully-qualified Protocol Buffers name of the message type. Name string // StructType is the struct that represents messages of this type. // // Note that only a pointer-to-struct type will satisfy the Message // interface. StructType reflect.Type }
MessageType provides information about a particular message type.
func TypeByGoType ¶
func TypeByGoType(t reflect.Type) (mt MessageType)
TypeByGoType returns the message type for the given Go type.
It panics if t does not implement Message.
Note that messages are only added to the registry when their respective Go package is imported.
func TypeByName ¶
func TypeByName(n string) (mt MessageType, ok bool)
TypeByName returns the message type for a fully-qualified Protocol Buffers message name.
If n is the name of a registered implementation of Message, then mt is the type of that message, and ok is true; otherwise, ok is false.
Note that messages are only added to the registry when their respective Go package is imported.
func (MessageType) IsCommand ¶
func (mt MessageType) IsCommand() bool
IsCommand returns true if the message type satisfies the Command interface.
func (MessageType) IsEvent ¶
func (mt MessageType) IsEvent() bool
IsEvent returns true if the message type satisfies the Event interface.
func (MessageType) MessageName ¶
func (mt MessageType) MessageName() string
MessageName returns the Protocol Buffers message name without the package name.
func (MessageType) New ¶
func (mt MessageType) New() Message
New returns a new pointer to a zero-value message of this type.
func (MessageType) PackageName ¶
func (mt MessageType) PackageName() string
PackageName returns the Protocol Buffers package name for this message type.
func (MessageType) String ¶
func (mt MessageType) String() string
func (MessageType) ToSet ¶
func (mt MessageType) ToSet() MessageTypeSet
ToSet returns a MessageTypeSet containing mt as its only member.
type MessageTypeSet ¶
type MessageTypeSet struct {
// contains filtered or unexported fields
}
MessageTypeSet is a collection of unique message types.
func NewMessageTypeSet ¶
func NewMessageTypeSet(mt ...MessageType) MessageTypeSet
NewMessageTypeSet returns a set containing the message types in mt.
func TypesByGoType ¶
func TypesByGoType(t ...reflect.Type) MessageTypeSet
TypesByGoType returns a set containing the message types of the Go types in t.
It panics if any of the types do not implement Message.
func TypesOf ¶
func TypesOf(m ...Message) MessageTypeSet
TypesOf returns a set containing the message types of the messages in m.
func (MessageTypeSet) Add ¶
func (s MessageTypeSet) Add(mt MessageType) MessageTypeSet
Add returns a new set containing the members of this set and mt.
func (MessageTypeSet) Has ¶
func (s MessageTypeSet) Has(mt MessageType) bool
Has returns true if mt is a member of the set.
func (MessageTypeSet) Intersection ¶
func (s MessageTypeSet) Intersection(o MessageTypeSet) MessageTypeSet
Intersection returns the set intersection of s and o.
func (MessageTypeSet) Len ¶
func (s MessageTypeSet) Len() int
Len returns the number of types in the set.
func (MessageTypeSet) Members ¶
func (s MessageTypeSet) Members() []MessageType
Members returns the message types in the set.
func (MessageTypeSet) Union ¶
func (s MessageTypeSet) Union(o MessageTypeSet) MessageTypeSet
Union returns the set union of s and o.
type PublishOption ¶
PublishOption is configures an envelope containing an event message to exhibit some specific behavior.
type SendOption ¶
type SendOption interface { ExecuteOption PublishOption }
SendOption is an option that can be used for both commands and events.
type Sender ¶
type Sender interface { // ExecuteCommand sends a command message. // // Commands are routed to a single endpoint as per the routing rules of the // outbound message pipeline. ExecuteCommand(context.Context, Command, ...ExecuteOption) (Envelope, error) // PublishEvent sends an event message. // // Events are routed to endpoints that subscribe to messages of that type. PublishEvent(context.Context, Event, ...PublishOption) (Envelope, error) }
Sender is an interface for sending messages.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package axcli generates Cobra CLI commands that send Ax messages.
|
Package axcli generates Cobra CLI commands that send Ax messages. |
Package axmysql provides MySQL-backed implementations of various interfaces consumed by Ax.
|
Package axmysql provides MySQL-backed implementations of various interfaces consumed by Ax. |
delayedmessage
Package delayedmessage provides MySQL-specific implementations of the interfaces in Ax's top-level "delayedmessage" package.
|
Package delayedmessage provides MySQL-specific implementations of the interfaces in Ax's top-level "delayedmessage" package. |
internal/envelopestore
Package envelopestore provides common implementation used by repositories that persist outbound envelopes.
|
Package envelopestore provides common implementation used by repositories that persist outbound envelopes. |
messagestore
Package messagestore provides MySQL-specific implementations of the interfaces in Ax's top-level "messagestore" package.
|
Package messagestore provides MySQL-specific implementations of the interfaces in Ax's top-level "messagestore" package. |
outbox
Package outbox provides MySQL-specific implementations of the interfaces in Ax's top-level "outbox" package.
|
Package outbox provides MySQL-specific implementations of the interfaces in Ax's top-level "outbox" package. |
persistence
Package persistence provides MySQL-specific implementations of the interfaces in Ax's top-level "persistence" package.
|
Package persistence provides MySQL-specific implementations of the interfaces in Ax's top-level "persistence" package. |
saga
Package saga provides MySQL-specific implementations of the interfaces in Ax's top-level "saga" package.
|
Package saga provides MySQL-specific implementations of the interfaces in Ax's top-level "saga" package. |
Package axrmq provides a RabbitMQ-based message transport.
|
Package axrmq provides a RabbitMQ-based message transport. |
Package axtest contains testing utilities and reusable test suites for implementations of various Ax components.
|
Package axtest contains testing utilities and reusable test suites for implementations of various Ax components. |
delayedmessagetests
Package delayedmessagetests contains functional test suites for implementations of delayed messaging features.
|
Package delayedmessagetests contains functional test suites for implementations of delayed messaging features. |
outboxtests
Package outboxtests contains functional test suites for implementations of outbox related features.
|
Package outboxtests contains functional test suites for implementations of outbox related features. |
Package delayedmessage provides tools for delaying the sending of messages.
|
Package delayedmessage provides tools for delaying the sending of messages. |
Package endpoint contains components that are composed to configure an 'endpoint', which can send and receive messages to other endpoints.
|
Package endpoint contains components that are composed to configure an 'endpoint', which can send and receive messages to other endpoints. |
examples
|
|
Package ident contains utilities for representing arbitrary identifiers.
|
Package ident contains utilities for representing arbitrary identifiers. |
internal
|
|
Package marshaling provides utilities for marshaling and unmarshaling messages and other types.
|
Package marshaling provides utilities for marshaling and unmarshaling messages and other types. |
Package messagestore contains interfaces used by Ax to read and write from persisted streams of messages.
|
Package messagestore contains interfaces used by Ax to read and write from persisted streams of messages. |
Package observability provides tools for observing the behavior of an application, such as logging, metrics collection and tracing.
|
Package observability provides tools for observing the behavior of an application, such as logging, metrics collection and tracing. |
Package outbox provides an inbound pipeline stage that ensures message idempotence by employing the "outbox pattern".
|
Package outbox provides an inbound pipeline stage that ensures message idempotence by employing the "outbox pattern". |
Package persistence defines the interfaces Ax uses to persist data.
|
Package persistence defines the interfaces Ax uses to persist data. |
Package projection contains tools for projecting state from messages.
|
Package projection contains tools for projecting state from messages. |
Package routing provides pipeline stages for routing messages to endpoints and within endpoints.
|
Package routing provides pipeline stages for routing messages to endpoints and within endpoints. |
Package saga provides tools for building stateful message handlers.
|
Package saga provides tools for building stateful message handlers. |
mapping
Package mapping contains subpackages that implement various message-to-saga mapping strategies.
|
Package mapping contains subpackages that implement various message-to-saga mapping strategies. |
mapping/direct
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly.
|
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly. |
mapping/keyset
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message.
|
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message. |
persistence
Package persistence contains subpackages that implement various saga persistence strategies.
|
Package persistence contains subpackages that implement various saga persistence strategies. |
persistence/crud
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics.
|
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics. |
persistence/eventsourcing
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.
|
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots. |