Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Close(id rlid.RLID) error
- func (b *Broker) Lock()
- func (b *Broker) NumPublishers() int
- func (b *Broker) NumSubscribers() int
- func (b *Broker) Publish(publisherID rlid.RLID, event *api.EventWrapper) error
- func (b *Broker) RLock()
- func (b *Broker) RUnlock()
- func (b *Broker) Register() (rlid.RLID, <-chan PublishResult, error)
- func (b *Broker) Run(errc chan<- error)
- func (b *Broker) Shutdown() error
- func (b *Broker) Subscribe(topics ...ulid.ULID) (rlid.RLID, <-chan *api.EventWrapper, error)
- func (b *Broker) Unlock()
- type PublishResult
Constants ¶
const ( NackMaxEventSizeExceeded = "the maximum size for event data was exceeded, the event could not be processed" NackTopicUnknown = "could not publish event to unknown topic" NackTopicArchived = "cannot publish event to archived/readonly topic" NackTopicDeleted = "cannot publish event to a deleted topic" NackPermissionDenied = "client does not have permission to perform this operation" NackConsensusFailure = "could not commit event, please try again" NackShardingFailure = "unable to assign event key to a shard, please try again" NackRedirect = "redirect to node handling topic or shard, please close and reconnect and try again" NackInternal = "an internal error occurred, please try again shortly" )
Standard error messages for nack codes
const BufferSize = 16384
Variables ¶
var ( ErrBrokerNotRunning = errors.New("operation could not be completed: broker is not running") ErrUnknownID = errors.New("no publisher or subscriber registered with specified id") )
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Every Ensign node is composed of a single Broker routine that collects events from publisher handlers, commits the events through consensus, writes the events to disk, and ensures that registered consumer groups receive any published events they are subscribed to. Essentially, the Broker fans in events from multiple, concurrent publisher streams, ensures the events are replicated and written, then fans out the events to one or more subscriber streams. The Broker uses an internal buffer that applies backpressure to the publisher streams when the buffer is full.
func (*Broker) Close ¶
Close either a publisher or subscriber so no events will be sent from the broker.
func (*Broker) NumPublishers ¶
func (*Broker) NumSubscribers ¶
func (*Broker) Publish ¶
Publish an event from the specified publisher. When the event is committed, an acknowledgement or error is sent on the channel specified when registering.
func (*Broker) Register ¶
func (b *Broker) Register() (rlid.RLID, <-chan PublishResult, error)
Register a publisher to receive an ack/nack channel for events that are published using the publisher ID specified. If the broker is not running, an error is returned so that the publisher can shutdown the stream.
func (*Broker) Shutdown ¶
Gracefully shutdown the broker. If a consensus or write operation is underway, then shutdown blocks until it is concluded. The broker then stops handling incoming events from publishers and closes all registered publishers and subscribers. This has the effect of closing any open event stream handlers.
type PublishResult ¶
type PublishResult struct { LocalID []byte // The localID on the event sent by the publisher for client-side correlation Committed time.Time // The timestamp the event was committed (if it was committed) Code api.Nack_Code // The reason why the result errored; if not unknown the result is treated as an error Error string // An error message, should be set if the nack code is set }
PublishResult is sent back to the publisher stream that created an event to let it send an ack/nack back to to the client. If the event was correctly committed and emitted the result will contain an Ack message. If the event was unable to be processed by the broker (e.g. not committed, not written, etc.) then the result will contain a Nack message.
func (PublishResult) Ack ¶
func (p PublishResult) Ack() *api.Ack
func (PublishResult) Nack ¶
func (p PublishResult) Nack() *api.Nack
func (PublishResult) Reply ¶
func (p PublishResult) Reply() *api.PublisherReply
Reply composes a publisher reply to return to the client via the publish stream. If the Code is > 0 (e.g. is not NACK_UNKNOWN) then a Nack is returned; otherwise an Ack is returned. This method performs no data validation other than to set the error message to a standard message for the code if it is a nack.