substrate

package module
v0.0.0-...-5cd769b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: MIT Imports: 4 Imported by: 20

README

Substrate

go-doc

Substrate is a simple thin abstraction for message publishing and consumption. It presents a simple API set for durable, at-least-once message publishing and subscription, on a number of backend message broker types.

The API is not yet stable.

Current implementations and their status

Implementation Status
Apache Kafka beta
Nats streaming beta
Proximo alpha
Freezer alpha

Additional resources

  • substrate-tools - Provides wrappers and packages that are useful for various tasks, such as acknowledgement ordering and instrumentation.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSinkAlreadyClosed is an error returned when user tries to publish a message or
	// close a sink after it was already closed.
	ErrSinkAlreadyClosed = errors.New("sink was closed already")
	// ErrSinkClosedOrFailedDuringSend is an error returned when a sink is closed or fails while sending a message.
	ErrSinkClosedOrFailedDuringSend = errors.New("sink was closed or failed while sending the message")
)

Functions

This section is empty.

Types

type AsyncMessageSink

type AsyncMessageSink interface {
	// PublishMessages publishes any messages found on the `messages`
	// channel and returns them on the `acks` channel once they have
	// been published.  This function will block until `ctx` is done,
	// or until an error occurs.  Messages will always be processed
	// and acknowledged in order.
	// Normal termination is achieved when the passed Context is done,
	// and will return the associated Context error.
	PublishMessages(ctx context.Context, acks chan<- Message, messages <-chan Message) error
	// Close permanently closes the AsyncMessageSink and frees underlying resources
	Close() error
	Statuser
}

AsyncMessageSink represents a message sink that allows publishing messages, and multiple messages can be in flight before any acks are recieved, depending upon the configuration of the underlying message sink.

type AsyncMessageSource

type AsyncMessageSource interface {
	// ConsumeMessages provides messages to the caller on the `messages`
	// channel and expects them to be sent back to the `acks` channel once
	// that have been handled properly.  This function will block until
	// `ctx` is done, or until an error occurs.
	// Normal termination is achieved when the passed Context is done,
	// and will return the associated Context error.
	ConsumeMessages(ctx context.Context, messages chan<- Message, acks <-chan Message) error
	// Close permanently closes the AsyncMessageSource and frees underlying resources
	Close() error
	Statuser
}

AsyncMessageSource represents a message source that allows consuming messages, and multiple messages can be in flight before any acks are sent, depending upon the configuration of the underlying message source.

type ConsumerMessageHandler

type ConsumerMessageHandler func(context.Context, Message) error

ConsumerMessageHandler is the callback function type that synchronous message consumers must implement.

type DiscardableMessage

type DiscardableMessage interface {
	Message
	// DiscardPayload discards the payload of the message. After calling this,
	// Calls to Data() will panic.
	// Calling this a second or subsequent time has no effect.
	DiscardPayload()
}

DiscardableMessage allows a consumer to discard the payload after use (but before acking) in order to release memory earlier. This can be useful in cases where a consumer reads a very large number of messages before acking any of them. Since not all backends implement this, a checked type assertion is recommended.

type InvalidAckError

type InvalidAckError struct {
	Acked    Message
	Expected Message
}

InvalidAckError means that a message acknowledgement was not as expected. This is possilbly from mis-use of the asynchronous APIs, for example acking out of order.

func (InvalidAckError) Error

func (e InvalidAckError) Error() string

type KeyedMessage

type KeyedMessage interface {
	Message
	Key() []byte
}

Some brokers have the notion of keyed messages. Callers may optionally implement this interface in their message types for the benefiy of those brokers.

type Message

type Message interface {
	Data() []byte
}

Message is the single type that represents all messages in substrate.

type Status

type Status struct {
	// Working indicates whether the source or sink is in a working state
	Working bool
	// Problems indicates and problems with the source or sink, whether or not they prevent it working.
	Problems []string
}

Status represents a snapshot of the state of a source or sink.

type Statuser

type Statuser interface {
	Status() (*Status, error)
}

Statuser is the interface that wraps the Status method.

type SynchronousMessageSink

type SynchronousMessageSink interface {
	// Close closed the SynchronousMessageSink, freeing underlying
	// resources.
	Close() error
	// PublishMessage publishes messages to the broker, waiting for
	// confirmation from the broker before returning.
	PublishMessage(context.Context, Message) error
	Statuser
}

SynchronousMessageSink represents a message source that allows "message at a time" publishing and relieves the consumer from having to deal with acknowledgements themselves.

func NewSynchronousMessageSink

func NewSynchronousMessageSink(ams AsyncMessageSink) SynchronousMessageSink

NewSynchronousMessageSink returns a new synchronous message sink, given an AsyncMessageSink. When Close is called on the SynchronousMessageSink, this is also propogated to the underlying SynchronousMessageSink

type SynchronousMessageSource

type SynchronousMessageSource interface {
	// Close closed the SynchronousMessageSource, freeing underlying
	// resources.
	Close() error
	// ConsumeMessages calls the `handler` function for each messages
	// available to consume.  If the handler returns no error, an
	// acknowledgement will be sent to the broker.  If an error is returned
	// by the handler, it will be propogated and returned from this
	// function.  This function will block until `ctx` is done or until an
	// error occurs.
	ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error
	Statuser
}

SynchronousMessageSource represents a message source that allows "message at a time" consumption and relieves the consumer from having to deal with acknowledgements themselves.

func NewSynchronousMessageSource

func NewSynchronousMessageSource(ams AsyncMessageSource) SynchronousMessageSource

NewSynchronousMessageSource returns a new synchronous message source, given an AsyncMessageSource. When Close is called on the SynchronousMessageSource, this is also propogated to the underlying SynchronousMessageSource.

Directories

Path Synopsis
Package freezer provides freezer support for substrate
Package freezer provides freezer support for substrate
internal
Package kafka provides kafka support for substrate
Package kafka provides kafka support for substrate
Package natsstreaming provides kafka support for substrate
Package natsstreaming provides kafka support for substrate
Package proximo provides proximo support for substrate
Package proximo provides proximo support for substrate
Package suburl provides a generic URL based interface for obtaining substrate source and sink objects.
Package suburl provides a generic URL based interface for obtaining substrate source and sink objects.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL