broker

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2023 License: BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NackMaxEventSizeExceeded = "the maximum size for event data was exceeded, the event could not be processed"
	NackTopicUnknown         = "could not publish event to unknown topic"
	NackTopicArchived        = "cannot publish event to archived/readonly topic"
	NackTopicDeleted         = "cannot publish event to a deleted topic"
	NackPermissionDenied     = "client does not have permission to perform this operation"
	NackConsensusFailure     = "could not commit event, please try again"
	NackShardingFailure      = "unable to assign event key to a shard, please try again"
	NackRedirect             = "redirect to node handling topic or shard, please close and reconnect and try again"
	NackInternal             = "an internal error occurred, please try again shortly"
)

Standard error messages for nack codes

View Source
const BufferSize = 16384

Variables

View Source
var (
	ErrBrokerNotRunning = errors.New("operation could not be completed: broker is not running")
	ErrUnknownID        = errors.New("no publisher or subscriber registered with specified id")
)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Every Ensign node is composed of a single Broker routine that collects events from publisher handlers, commits the events through consensus, writes the events to disk, and ensures that registered consumer groups receive any published events they are subscribed to. Essentially, the Broker fans in events from multiple, concurrent publisher streams, ensures the events are replicated and written, then fans out the events to one or more subscriber streams. The Broker uses an internal buffer that applies backpressure to the publisher streams when the buffer is full.

func New

func New() *Broker

func (*Broker) Close

func (b *Broker) Close(id rlid.RLID) error

Close either a publisher or subscriber so no events will be sent from the broker.

func (*Broker) Lock

func (b *Broker) Lock()

Locks both the pubmu and submu mutexes.

func (*Broker) NumPublishers

func (b *Broker) NumPublishers() int

func (*Broker) NumSubscribers

func (b *Broker) NumSubscribers() int

func (*Broker) Publish

func (b *Broker) Publish(publisherID rlid.RLID, event *api.EventWrapper) error

Publish an event from the specified publisher. When the event is committed, an acknowledgement or error is sent on the channel specified when registering.

func (*Broker) RLock

func (b *Broker) RLock()

RLocks the pubmu and submu mutexes.

func (*Broker) RUnlock

func (b *Broker) RUnlock()

RUnlocks both the pubmu and submu mutexes.

func (*Broker) Register

func (b *Broker) Register() (rlid.RLID, <-chan PublishResult, error)

Register a publisher to receive an ack/nack channel for events that are published using the publisher ID specified. If the broker is not running, an error is returned so that the publisher can shutdown the stream.

func (*Broker) Run

func (b *Broker) Run(errc chan<- error)

Run the broker; any fatal errors will be sent on the specified channel.

func (*Broker) Shutdown

func (b *Broker) Shutdown() error

Gracefully shutdown the broker. If a consensus or write operation is underway, then shutdown blocks until it is concluded. The broker then stops handling incoming events from publishers and closes all registered publishers and subscribers. This has the effect of closing any open event stream handlers.

func (*Broker) Subscribe

func (b *Broker) Subscribe(topics ...ulid.ULID) (rlid.RLID, <-chan *api.EventWrapper, error)

Subscribe to events filtered by topic ids. All recent events will be sent on the event wrapper channel once they are committed. If the broker is not running an error is returned so that the consumer group can shutdown the stream.

func (*Broker) Unlock

func (b *Broker) Unlock()

Unlocks both the pubmu and submu mutexex.

type PublishResult

type PublishResult struct {
	LocalID   []byte        // The localID on the event sent by the publisher for client-side correlation
	Committed time.Time     // The timestamp the event was committed (if it was committed)
	Code      api.Nack_Code // The reason why the result errored; if not unknown the result is treated as an error
	Error     string        // An error message, should be set if the nack code is set
}

PublishResult is sent back to the publisher stream that created an event to let it send an ack/nack back to to the client. If the event was correctly committed and emitted the result will contain an Ack message. If the event was unable to be processed by the broker (e.g. not committed, not written, etc.) then the result will contain a Nack message.

func (PublishResult) Ack

func (p PublishResult) Ack() *api.Ack

func (PublishResult) Nack

func (p PublishResult) Nack() *api.Nack

func (PublishResult) Reply

func (p PublishResult) Reply() *api.PublisherReply

Reply composes a publisher reply to return to the client via the publish stream. If the Code is > 0 (e.g. is not NACK_UNKNOWN) then a Nack is returned; otherwise an Ack is returned. This method performs no data validation other than to set the error message to a standard message for the code if it is a nack.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL