ax

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 17 Imported by: 0

README

Ax

Build Status Code Coverage Latest Version GoDoc Go Report Card

Ax is a toolkit for writing distributed message-driven applications in Go. It has features specifically focused on CQRS and Event Sourcing.

go get -u github.com/jmalloc/ax/src/ax

This project is EXPERIMENTAL. Expect frequent breaking changes to the API.

Documentation

Overview

Package ax is a toolkit for writing distributed message-driven applications, with features specifically focused on CQRS and Event Sourcing.

Index

Constants

This section is empty.

Variables

View Source
var File_github_com_jmalloc_ax_envelope_proto protoreflect.FileDescriptor

Functions

func MarshalEnvelope

func MarshalEnvelope(env Envelope) ([]byte, error)

MarshalEnvelope marshals env to a binary representation.

func MarshalMessage

func MarshalMessage(m Message) (contentType string, data []byte, err error)

MarshalMessage marshals m to a binary representation.

Types

type Command

type Command interface {
	Message

	// IsCommand() is a "marker method" used to indicate that a message is
	// intended to be used as a command.
	IsCommand()
}

Command is a message that requests some action take place.

Commands are always sent to a single handler within a single end-point.

type CommandExecutor

type CommandExecutor func(Command, ...ExecuteOption)

CommandExecutor is a function that queues a command to be executed. It is used to send commands within workflows. See saga.NewWorkflow().

type Envelope

type Envelope struct {
	// MessageID is a globally unique identifier for a single message.
	//
	// Among other things, the message ID is often used during message
	// de-duplicatation in order to provide exactly-one handling semantics.
	MessageID MessageID

	// CausationID is the ID of the message that directly caused this message to
	// occur.
	//
	// Messages can be thought of as occurring within a tree of messages. The
	// CausationID identifies the direct parent message within that tree.
	//
	// When a message is injected into the messaging system via a MessageBus,
	// the CausationID is set to the MessageID, that is, the message is its own
	// cause.
	//
	// When a message is sent via a MessageContext, the CausationID is
	// automatically set to the MessageID of the message being handled in that
	// context.
	CausationID MessageID

	// CorrelationID is the ID of the message that (perhaps indirectly) caused
	// this message to occur.
	//
	// Messages can be thought of as occurring within a tree of messages. The
	// CorrelationID identifies the message at the root of that tree.
	//
	// When a message is injected into the message system via a MessageBus,
	// the CorrelationID is set to the MessageID, that is, the message is at the
	// root of the tree.
	//
	// When a message is sent via a MessageContext, the CorrelationID is
	// automatically set to the CorrelationID of the message being handled in
	// that context.
	CorrelationID MessageID

	// CreatedAt is the time at which the message was created. This typically
	// correlates to the time at which the message was passed to a Sender.
	//
	// It is populated via the regular Go system clock, and as such there are
	// almost no guarantees about the accuracy of the time. It must not be
	// assumed that messages will arrive in any chronological order.
	//
	// Depending on the application this field may not be appropriate for use as an
	// "occurred" time. Care must be taken to choose appropriately between
	// CreatedAt and SendAt for each use case.
	CreatedAt time.Time

	// SendAt is the time at which the message should be sent by the endpoint,
	// which may be after the CreatedAt time.
	//
	// Depending on the application this field may not be appropriate for use as an
	// "occurred" time. Care must be taken to choose appropriately between
	// CreatedAt and SendAt for each use case.
	SendAt time.Time

	// Message is the application-defined message encapsulated by the envelope.
	Message Message
}

Envelope is a container for a message and its associated meta-data.

func NewEnvelope

func NewEnvelope(m Message) Envelope

NewEnvelope creates a new message envelope containing m.

It generates a UUID-based message ID and configures the envelope such that m is at the root of a new tree of messages.

func NewEnvelopeFromProto

func NewEnvelopeFromProto(env *EnvelopeProto) (Envelope, error)

NewEnvelopeFromProto returns a new envelope from its protocol-buffers representation.

func UnmarshalEnvelope

func UnmarshalEnvelope(data []byte) (Envelope, error)

UnmarshalEnvelope unmarshals an envelope from its serialized representation.

func (Envelope) AsProto

func (e Envelope) AsProto() (*EnvelopeProto, error)

AsProto returns a Protocol Buffers representation of the envelope.

func (Envelope) Delay

func (e Envelope) Delay() time.Duration

Delay returns the delay between the messages creation time and the time at which it is to be sent.

func (Envelope) Equal

func (e Envelope) Equal(env Envelope) bool

Equal returns true if e and env contain the same data.

func (Envelope) NewChild

func (e Envelope) NewChild(m Message) Envelope

NewChild returns a new message envelope containing m.

It generates a UUID-based message ID and configures the envelope such that m is a child of e.Message within an existing tree of messages.

func (Envelope) Type

func (e Envelope) Type() MessageType

Type returns the message type of the message contained in the envelope.

type EnvelopeProto

type EnvelopeProto struct {
	MessageId     string                 `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
	CausationId   string                 `protobuf:"bytes,2,opt,name=causation_id,json=causationId,proto3" json:"causation_id,omitempty"`
	CorrelationId string                 `protobuf:"bytes,3,opt,name=correlation_id,json=correlationId,proto3" json:"correlation_id,omitempty"`
	CreatedAt     *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
	SendAt        *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=send_at,json=sendAt,proto3" json:"send_at,omitempty"`
	Message       *anypb.Any             `protobuf:"bytes,6,opt,name=message,proto3" json:"message,omitempty"`
	// contains filtered or unexported fields
}

EnvelopeProto is a Protocol Buffers representation of an Envelope.

func (*EnvelopeProto) Descriptor deprecated

func (*EnvelopeProto) Descriptor() ([]byte, []int)

Deprecated: Use EnvelopeProto.ProtoReflect.Descriptor instead.

func (*EnvelopeProto) GetCausationId

func (x *EnvelopeProto) GetCausationId() string

func (*EnvelopeProto) GetCorrelationId

func (x *EnvelopeProto) GetCorrelationId() string

func (*EnvelopeProto) GetCreatedAt

func (x *EnvelopeProto) GetCreatedAt() *timestamppb.Timestamp

func (*EnvelopeProto) GetMessage

func (x *EnvelopeProto) GetMessage() *anypb.Any

func (*EnvelopeProto) GetMessageId

func (x *EnvelopeProto) GetMessageId() string

func (*EnvelopeProto) GetSendAt

func (x *EnvelopeProto) GetSendAt() *timestamppb.Timestamp

func (*EnvelopeProto) ProtoMessage

func (*EnvelopeProto) ProtoMessage()

func (*EnvelopeProto) ProtoReflect added in v0.4.2

func (x *EnvelopeProto) ProtoReflect() protoreflect.Message

func (*EnvelopeProto) Reset

func (x *EnvelopeProto) Reset()

func (*EnvelopeProto) String

func (x *EnvelopeProto) String() string

type Event

type Event interface {
	Message

	// IsEvent() is a "marker method" used to indicate that a message is
	// intended to be used as an event.
	IsEvent()
}

Event is a message that indicates some action has already taken place.

Events are published by one endpoint and (potentially) consumed by many.

type EventRecorder

type EventRecorder func(Event)

EventRecorder is a function that records the occurrence of events. It is used to record events within aggregates. See saga.NewAggregate().

type ExecuteOption

type ExecuteOption interface {
	ApplyExecuteOption(env *Envelope) error
}

ExecuteOption is configures an envelope containing a command message to exhibit some specific behavior.

func Delay

func Delay(d time.Duration) ExecuteOption

Delay is an option that delays sending the message until a duration has passed. Events can not be delayed.

func DelayUntil

func DelayUntil(t time.Time) ExecuteOption

DelayUntil is an option that delays sending the message until a specific time. Events can not be delayed.

type Message

type Message interface {
	proto.Message

	// MessageDescription returns a human-readable description of the message.
	//
	// Assume that the description will be used inside log messages or displayed
	// in audit logs. They should be understood by non-developers who are
	// familiar with the application's business domain.
	//
	// Follow the same conventions as for error messages:
	// https://github.com/golang/go/wiki/CodeReviewComments#error-strings
	MessageDescription() string
}

Message is a unit of communication.

func UnmarshalMessage

func UnmarshalMessage(ct string, data []byte) (Message, error)

UnmarshalMessage unmarshals an Ax message from some serialized representation. ct is the MIME content-type for the binary data.

type MessageContext

type MessageContext struct {
	// Envelope is the message envelope containing the message to be handled.
	Envelope Envelope
	// contains filtered or unexported fields
}

MessageContext provides context about the message being handled.

func NewMessageContext

func NewMessageContext(
	env Envelope,
	span opentracing.Span,
	logger twelf.Logger,
) MessageContext

NewMessageContext returns a message context for the given envelope.

func (*MessageContext) Log

func (c *MessageContext) Log(f string, v ...interface{})

Log writes an application-level log message about the handling of the message.

The log messages should be understood by non-developers who are familiar with the application's business domain.

type MessageID

type MessageID struct {
	ident.ID
}

MessageID uniquely identifies a message.

func GenerateMessageID

func GenerateMessageID() MessageID

GenerateMessageID generates a new unique identifier for a message.

func MustParseMessageID

func MustParseMessageID(s string) MessageID

MustParseMessageID parses s into a message ID and returns it. It panics if s is empty.

func ParseMessageID

func ParseMessageID(s string) (MessageID, error)

ParseMessageID parses s into a message ID and returns it. It returns an error if s is empty.

type MessageType

type MessageType struct {
	// Name is the fully-qualified Protocol Buffers name of the message type.
	Name string

	// StructType is the struct that represents messages of this type.
	//
	// Note that only a pointer-to-struct type will satisfy the Message
	// interface.
	StructType reflect.Type
}

MessageType provides information about a particular message type.

func TypeByGoType

func TypeByGoType(t reflect.Type) (mt MessageType)

TypeByGoType returns the message type for the given Go type.

It panics if t does not implement Message.

Note that messages are only added to the registry when their respective Go package is imported.

func TypeByName

func TypeByName(n string) (mt MessageType, ok bool)

TypeByName returns the message type for a fully-qualified Protocol Buffers message name.

If n is the name of a registered implementation of Message, then mt is the type of that message, and ok is true; otherwise, ok is false.

Note that messages are only added to the registry when their respective Go package is imported.

func TypeOf

func TypeOf(m Message) MessageType

TypeOf returns the message type of m.

func (MessageType) IsCommand

func (mt MessageType) IsCommand() bool

IsCommand returns true if the message type satisfies the Command interface.

func (MessageType) IsEvent

func (mt MessageType) IsEvent() bool

IsEvent returns true if the message type satisfies the Event interface.

func (MessageType) MessageName

func (mt MessageType) MessageName() string

MessageName returns the Protocol Buffers message name without the package name.

func (MessageType) New

func (mt MessageType) New() Message

New returns a new pointer to a zero-value message of this type.

func (MessageType) PackageName

func (mt MessageType) PackageName() string

PackageName returns the Protocol Buffers package name for this message type.

func (MessageType) String

func (mt MessageType) String() string

func (MessageType) ToSet

func (mt MessageType) ToSet() MessageTypeSet

ToSet returns a MessageTypeSet containing mt as its only member.

type MessageTypeSet

type MessageTypeSet struct {
	// contains filtered or unexported fields
}

MessageTypeSet is a collection of unique message types.

func NewMessageTypeSet

func NewMessageTypeSet(mt ...MessageType) MessageTypeSet

NewMessageTypeSet returns a set containing the message types in mt.

func TypesByGoType

func TypesByGoType(t ...reflect.Type) MessageTypeSet

TypesByGoType returns a set containing the message types of the Go types in t.

It panics if any of the types do not implement Message.

func TypesOf

func TypesOf(m ...Message) MessageTypeSet

TypesOf returns a set containing the message types of the messages in m.

func (MessageTypeSet) Add

Add returns a new set containing the members of this set and mt.

func (MessageTypeSet) Has

func (s MessageTypeSet) Has(mt MessageType) bool

Has returns true if mt is a member of the set.

func (MessageTypeSet) Intersection

func (s MessageTypeSet) Intersection(o MessageTypeSet) MessageTypeSet

Intersection returns the set intersection of s and o.

func (MessageTypeSet) Len

func (s MessageTypeSet) Len() int

Len returns the number of types in the set.

func (MessageTypeSet) Members

func (s MessageTypeSet) Members() []MessageType

Members returns the message types in the set.

func (MessageTypeSet) Union

Union returns the set union of s and o.

type PublishOption

type PublishOption interface {
	ApplyPublishOption(env *Envelope) error
}

PublishOption is configures an envelope containing an event message to exhibit some specific behavior.

type SendOption

type SendOption interface {
	ExecuteOption
	PublishOption
}

SendOption is an option that can be used for both commands and events.

type Sender

type Sender interface {
	// ExecuteCommand sends a command message.
	//
	// Commands are routed to a single endpoint as per the routing rules of the
	// outbound message pipeline.
	ExecuteCommand(context.Context, Command, ...ExecuteOption) (Envelope, error)

	// PublishEvent sends an event message.
	//
	// Events are routed to endpoints that subscribe to messages of that type.
	PublishEvent(context.Context, Event, ...PublishOption) (Envelope, error)
}

Sender is an interface for sending messages.

Directories

Path Synopsis
Package axcli generates Cobra CLI commands that send Ax messages.
Package axcli generates Cobra CLI commands that send Ax messages.
Package axmysql provides MySQL-backed implementations of various interfaces consumed by Ax.
Package axmysql provides MySQL-backed implementations of various interfaces consumed by Ax.
delayedmessage
Package delayedmessage provides MySQL-specific implementations of the interfaces in Ax's top-level "delayedmessage" package.
Package delayedmessage provides MySQL-specific implementations of the interfaces in Ax's top-level "delayedmessage" package.
internal/envelopestore
Package envelopestore provides common implementation used by repositories that persist outbound envelopes.
Package envelopestore provides common implementation used by repositories that persist outbound envelopes.
messagestore
Package messagestore provides MySQL-specific implementations of the interfaces in Ax's top-level "messagestore" package.
Package messagestore provides MySQL-specific implementations of the interfaces in Ax's top-level "messagestore" package.
outbox
Package outbox provides MySQL-specific implementations of the interfaces in Ax's top-level "outbox" package.
Package outbox provides MySQL-specific implementations of the interfaces in Ax's top-level "outbox" package.
persistence
Package persistence provides MySQL-specific implementations of the interfaces in Ax's top-level "persistence" package.
Package persistence provides MySQL-specific implementations of the interfaces in Ax's top-level "persistence" package.
saga
Package saga provides MySQL-specific implementations of the interfaces in Ax's top-level "saga" package.
Package saga provides MySQL-specific implementations of the interfaces in Ax's top-level "saga" package.
Package axrmq provides a RabbitMQ-based message transport.
Package axrmq provides a RabbitMQ-based message transport.
Package axtest contains testing utilities and reusable test suites for implementations of various Ax components.
Package axtest contains testing utilities and reusable test suites for implementations of various Ax components.
delayedmessagetests
Package delayedmessagetests contains functional test suites for implementations of delayed messaging features.
Package delayedmessagetests contains functional test suites for implementations of delayed messaging features.
outboxtests
Package outboxtests contains functional test suites for implementations of outbox related features.
Package outboxtests contains functional test suites for implementations of outbox related features.
Package delayedmessage provides tools for delaying the sending of messages.
Package delayedmessage provides tools for delaying the sending of messages.
Package endpoint contains components that are composed to configure an 'endpoint', which can send and receive messages to other endpoints.
Package endpoint contains components that are composed to configure an 'endpoint', which can send and receive messages to other endpoints.
examples
Package ident contains utilities for representing arbitrary identifiers.
Package ident contains utilities for representing arbitrary identifiers.
internal
Package marshaling provides utilities for marshaling and unmarshaling messages and other types.
Package marshaling provides utilities for marshaling and unmarshaling messages and other types.
Package messagestore contains interfaces used by Ax to read and write from persisted streams of messages.
Package messagestore contains interfaces used by Ax to read and write from persisted streams of messages.
Package observability provides tools for observing the behavior of an application, such as logging, metrics collection and tracing.
Package observability provides tools for observing the behavior of an application, such as logging, metrics collection and tracing.
Package outbox provides an inbound pipeline stage that ensures message idempotence by employing the "outbox pattern".
Package outbox provides an inbound pipeline stage that ensures message idempotence by employing the "outbox pattern".
Package persistence defines the interfaces Ax uses to persist data.
Package persistence defines the interfaces Ax uses to persist data.
Package projection contains tools for projecting state from messages.
Package projection contains tools for projecting state from messages.
Package routing provides pipeline stages for routing messages to endpoints and within endpoints.
Package routing provides pipeline stages for routing messages to endpoints and within endpoints.
Package saga provides tools for building stateful message handlers.
Package saga provides tools for building stateful message handlers.
mapping
Package mapping contains subpackages that implement various message-to-saga mapping strategies.
Package mapping contains subpackages that implement various message-to-saga mapping strategies.
mapping/direct
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly.
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly.
mapping/keyset
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message.
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message.
persistence
Package persistence contains subpackages that implement various saga persistence strategies.
Package persistence contains subpackages that implement various saga persistence strategies.
persistence/crud
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics.
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics.
persistence/eventsourcing
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL