Documentation ¶
Overview ¶
Package broadcast contains the Broadcast interface and a simple local implementation.
Index ¶
- Constants
- func InitConsumer[T any](opts Options[T]) eventbus.Source[T]
- func ProducerOpts[T, R any](opts Options[T]) []eventbus.SubscriptionOption[R]
- func RelayProducer[T, R any](ctx context.Context, src eventbus.Source[T], ...)
- type Broadcast
- type MessageAttributes
- type Option
- func WithAttributeProcessor[T any](addAttributes func(m T, attrs MessageAttributes), ...) Option[T]
- func WithMerge[T any](merge eventbus.SubscriptionMerger[T], maxLatency time.Duration, ...) Option[T]
- func WithOrderingKey[T any](orderingKey func(T) string) Option[T]
- func WithParseFunc[T any](parseFunc func([]byte) (T, error)) Option[T]
- func WithRouting[T any](routingKey eventbus.RoutingKey[T, string], ...) Option[T]
- func WithUnboundedChannel[T any](interval time.Duration) Option[T]
- type Options
- func (b *Options[T]) AcceptMessage(attrs MessageAttributes) bool
- func (b *Options[T]) AddAttributes(_ context.Context, msg T, attrs MessageAttributes)
- func (b *Options[T]) HasRoute(attrs MessageAttributes) bool
- func (b *Options[T]) OrderingKey(msg T) string
- func (b *Options[T]) ParseTo(msg []byte) (T, error)
- func (b *Options[T]) RoutingKey(_ context.Context, msg T) string
- type Processor
Constants ¶
const ( // AttributeOrigin is the name of the attribute which is set to the node ID of the node which published the message. AttributeOrigin = "_origin" // AttributeType is the name of the attribute which is set to the type of the message. AttributeType = "_type" // AttributeAccountID is the name of the attribute which is set to the account ID of the account which published the message. AttributeAccountID = "_account_id" // AttributeRoutingKey is the name of the attribute which is set to the routing key of the message. AttributeRoutingKey = "_routing_key" )
Variables ¶
This section is empty.
Functions ¶
func InitConsumer ¶
InitConsumer initializes the consumer and returns the routing source if routing is configured.
func ProducerOpts ¶
func ProducerOpts[T, R any](opts Options[T]) []eventbus.SubscriptionOption[R]
ProducerOpts returns the producer options for the broadcast.
func RelayProducer ¶
func RelayProducer[T, R any](ctx context.Context, src eventbus.Source[T], filter eventbus.SubscriptionFilter[T, R], dst eventbus.Receiver[R], opts Options[T])
RelayProducer relays messages from the source to the destination using the specified filter. It uses the broadcastOptions to determine if the source should be merged and if the channel should be unbounded.
Types ¶
type Broadcast ¶
type Broadcast[T any] interface { // Producer is used to send a message to pub/sub to be received by all nodes in the cluster. Producer() eventbus.Receiver[T] // Consumer returns the source can be subscribed to to receive messages from other nodes in the cluster. Consumer() eventbus.Source[T] }
Broadcast resembles both the noun and the verb of the word "broadcast". It is used to send a message to pub/sub to be received by all nodes in the cluster and it is used to receive messages from other nodes in the cluster.
func NewLocalBroadcast ¶
func NewLocalBroadcast[T any](ctx context.Context, logger *zap.Logger, options ...Option[T]) Broadcast[T]
NewLocalBroadcast returns a new broadcast interface. This broadcast interface is used when pub/sub is not enabled. It does not require a message type because it does not use pub/sub.
type MessageAttributes ¶
MessageAttributes represents the attributes of a pubsub message.
func (MessageAttributes) Type ¶
func (m MessageAttributes) Type() string
Type returns the type of the message.
type Option ¶
Option is used to configure a Broadcast.
func WithAttributeProcessor ¶
func WithAttributeProcessor[T any](addAttributes func(m T, attrs MessageAttributes), attributeFilter func(attrs MessageAttributes) bool) Option[T]
WithAttributeProcessor applies a filter to messages before they are processed. It takes a function to add attributes to outgoing messages and a filter to process incoming messages. If the filter returns false, the message will be skipped.
func WithMerge ¶
func WithMerge[T any](merge eventbus.SubscriptionMerger[T], maxLatency time.Duration, maxEventsToMerge int) Option[T]
WithMerge configures the broadcast to merge events before sending them to the consumer. The merge function is called to merge events and the maxLatency and maxEventsToMerge are used to determine when to send the merged events.
func WithOrderingKey ¶
WithOrderingKey specifies the ordering key to use for the message. It should be unique for each message type and must be less than 1KB with the messageType: prefix. If WithRouting is specified without WithOrderingKey, the RoutingKey of the message will be used as the ordering key.
func WithParseFunc ¶ added in v1.17.0
WithParseFunc returns a BroadcastOption that adds a parsing function
func WithRouting ¶
func WithRouting[T any](routingKey eventbus.RoutingKey[T, string], subscriptionKey eventbus.SubscriptionKey[string]) Option[T]
WithRouting returns a BroadcastOption that configures the routing key and subscription key for the broadcast.
type Options ¶
type Options[T any] struct { // contains filtered or unexported fields }
Options are options for a broadcast. They are used to configure the broadcast's producer and consumer.
func MakeBroadcastOptions ¶
MakeBroadcastOptions creates a broadcastOptions from the given options.
func (*Options[T]) AcceptMessage ¶
func (b *Options[T]) AcceptMessage(attrs MessageAttributes) bool
AcceptMessage returns true if the message should be processed. Returns true if the attributeFilter is nil.
func (*Options[T]) AddAttributes ¶
func (b *Options[T]) AddAttributes(_ context.Context, msg T, attrs MessageAttributes)
AddAttributes adds attributes to the message. Does nothing if the addAttributes is nil.
func (*Options[T]) HasRoute ¶
func (b *Options[T]) HasRoute(attrs MessageAttributes) bool
HasRoute returns true if the message should be routed. Returns true if the routingSource is nil.
func (*Options[T]) OrderingKey ¶
OrderingKey returns the ordering key for the message. Returns an empty string if the orderingKey is nil.
type Processor ¶
type Processor[T any] interface { // AddAttributes is used to add attributes to the message before sending to pub/sub. AddAttributes(m *T, attributes MessageAttributes) // AcceptMessage is used to determine if the message should be accepted or ignored after receiving from pub/sub. AcceptMessage(attributes MessageAttributes) bool // OrderingKey specifies the ordering key to use for the message. It should be unique for each message type and must // be less than 1KB with the messageType: prefix. OrderingKey(m *T) string }
Processor is used to process messages before sending to pub/sub and after receiving from pub/sub.