broadcast

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package broadcast contains the Broadcast interface and a simple local implementation.

Index

Constants

View Source
const (
	// AttributeOrigin is the name of the attribute which is set to the node ID of the node which published the message.
	AttributeOrigin = "_origin"
	// AttributeType is the name of the attribute which is set to the type of the message.
	AttributeType = "_type"
	// AttributeAccountID is the name of the attribute which is set to the account ID of the account which published the message.
	AttributeAccountID = "_account_id"
	// AttributeRoutingKey is the name of the attribute which is set to the routing key of the message.
	AttributeRoutingKey = "_routing_key"
)

Variables

This section is empty.

Functions

func InitConsumer

func InitConsumer[T any](opts Options[T]) eventbus.Source[T]

InitConsumer initializes the consumer and returns the routing source if routing is configured.

func ProducerOpts

func ProducerOpts[T, R any](opts Options[T]) []eventbus.SubscriptionOption[R]

ProducerOpts returns the producer options for the broadcast.

func RelayProducer

func RelayProducer[T, R any](ctx context.Context, src eventbus.Source[T], filter eventbus.SubscriptionFilter[T, R], dst eventbus.Receiver[R], opts Options[T])

RelayProducer relays messages from the source to the destination using the specified filter. It uses the broadcastOptions to determine if the source should be merged and if the channel should be unbounded.

Types

type Broadcast

type Broadcast[T any] interface {
	// Producer is used to send a message to pub/sub to be received by all nodes in the cluster.
	Producer() eventbus.Receiver[T]

	// Consumer returns the source can be subscribed to to receive messages from other nodes in the cluster.
	Consumer() eventbus.Source[T]
}

Broadcast resembles both the noun and the verb of the word "broadcast". It is used to send a message to pub/sub to be received by all nodes in the cluster and it is used to receive messages from other nodes in the cluster.

func NewLocalBroadcast

func NewLocalBroadcast[T any](ctx context.Context, logger *zap.Logger, options ...Option[T]) Broadcast[T]

NewLocalBroadcast returns a new broadcast interface. This broadcast interface is used when pub/sub is not enabled. It does not require a message type because it does not use pub/sub.

type MessageAttributes

type MessageAttributes map[string]string

MessageAttributes represents the attributes of a pubsub message.

func (MessageAttributes) Type

func (m MessageAttributes) Type() string

Type returns the type of the message.

type Option

type Option[T any] func(*Options[T])

Option is used to configure a Broadcast.

func WithAttributeProcessor

func WithAttributeProcessor[T any](addAttributes func(m T, attrs MessageAttributes), attributeFilter func(attrs MessageAttributes) bool) Option[T]

WithAttributeProcessor applies a filter to messages before they are processed. It takes a function to add attributes to outgoing messages and a filter to process incoming messages. If the filter returns false, the message will be skipped.

func WithMerge

func WithMerge[T any](merge eventbus.SubscriptionMerger[T], maxLatency time.Duration, maxEventsToMerge int) Option[T]

WithMerge configures the broadcast to merge events before sending them to the consumer. The merge function is called to merge events and the maxLatency and maxEventsToMerge are used to determine when to send the merged events.

func WithOrderingKey

func WithOrderingKey[T any](orderingKey func(T) string) Option[T]

WithOrderingKey specifies the ordering key to use for the message. It should be unique for each message type and must be less than 1KB with the messageType: prefix. If WithRouting is specified without WithOrderingKey, the RoutingKey of the message will be used as the ordering key.

func WithParseFunc added in v1.17.0

func WithParseFunc[T any](parseFunc func([]byte) (T, error)) Option[T]

WithParseFunc returns a BroadcastOption that adds a parsing function

func WithRouting

func WithRouting[T any](routingKey eventbus.RoutingKey[T, string], subscriptionKey eventbus.SubscriptionKey[string]) Option[T]

WithRouting returns a BroadcastOption that configures the routing key and subscription key for the broadcast.

func WithUnboundedChannel

func WithUnboundedChannel[T any](interval time.Duration) Option[T]

WithUnboundedChannel configures the broadcast to use an unbounded channel for sending messages to the consumer. This is useful for bursts of messages that need to be processed as quickly as possible.

type Options

type Options[T any] struct {
	// contains filtered or unexported fields
}

Options are options for a broadcast. They are used to configure the broadcast's producer and consumer.

func MakeBroadcastOptions

func MakeBroadcastOptions[T any](options []Option[T]) Options[T]

MakeBroadcastOptions creates a broadcastOptions from the given options.

func (*Options[T]) AcceptMessage

func (b *Options[T]) AcceptMessage(attrs MessageAttributes) bool

AcceptMessage returns true if the message should be processed. Returns true if the attributeFilter is nil.

func (*Options[T]) AddAttributes

func (b *Options[T]) AddAttributes(_ context.Context, msg T, attrs MessageAttributes)

AddAttributes adds attributes to the message. Does nothing if the addAttributes is nil.

func (*Options[T]) HasRoute

func (b *Options[T]) HasRoute(attrs MessageAttributes) bool

HasRoute returns true if the message should be routed. Returns true if the routingSource is nil.

func (*Options[T]) OrderingKey

func (b *Options[T]) OrderingKey(msg T) string

OrderingKey returns the ordering key for the message. Returns an empty string if the orderingKey is nil.

func (*Options[T]) ParseTo added in v1.17.0

func (b *Options[T]) ParseTo(msg []byte) (T, error)

ParseTo parses bytes to type T Returns and error if no parseFunc is set

func (*Options[T]) RoutingKey

func (b *Options[T]) RoutingKey(_ context.Context, msg T) string

RoutingKey returns the routing key for the message. Returns an empty string if the routingKey is nil.

type Processor

type Processor[T any] interface {
	// AddAttributes is used to add attributes to the message before sending to pub/sub.
	AddAttributes(m *T, attributes MessageAttributes)

	// AcceptMessage is used to determine if the message should be accepted or ignored after receiving from pub/sub.
	AcceptMessage(attributes MessageAttributes) bool

	// OrderingKey specifies the ordering key to use for the message. It should be unique for each message type and must
	// be less than 1KB with the messageType: prefix.
	OrderingKey(m *T) string
}

Processor is used to process messages before sending to pub/sub and after receiving from pub/sub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL