Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumeOption ¶
type ConsumeOption interface {
// contains filtered or unexported methods
}
ConsumeOption is used to configure a consumer. Defaults to 0.
func WithConsumeChannelLength ¶
func WithConsumeChannelLength(length int) ConsumeOption
WithConsumeChannelLength sets the channel buffer length for the resulting channel.
type Producer ¶
type Producer interface { // Produce is invoked for each consumer. The given context will be cancelled // when the producer should stop trying to write data. Until the context // is cancelled, it is expected to write available data to the given // channel. The Producer must not close the channel. If the producer // encounters an error, it is expected to do its own retry logic. Produce(ctx context.Context, request interface{}, c chan<- interface{}) }
Producer is used to produce data for subscribed consumers.
type ProducerFunc ¶
ProducerFunc is an adapter to allow ordinary functions to be a Producer.
func (ProducerFunc) Produce ¶
func (f ProducerFunc) Produce(ctx context.Context, request interface{}, c chan<- interface{})
Producer implements Producer.
type StreamAggregator ¶
type StreamAggregator struct {
// contains filtered or unexported fields
}
StreamAggregator takes a dynamic list of producers and writes to interested consumers. When a consumer is removed, it will be told to not retry any connect logic via the given context being cancelled. When a producer is added, it will be instructed to write to all the subscribed consumers. If producers are available when a consumer is added, each producer will start writing to the consumer.
func New ¶
func New(opts ...StreamAggregatorOption) *StreamAggregator
New constructs a new StreamAggregator with the given options.
func (*StreamAggregator) AddProducer ¶
func (a *StreamAggregator) AddProducer(key string, p Producer)
AddProducer adds a producer to the StreamAggregator.
func (*StreamAggregator) Consume ¶
func (a *StreamAggregator) Consume(ctx context.Context, request interface{}, opts ...ConsumeOption) <-chan interface{}
Consume starts consuming data from all the given Producers. As producers are added, each will start writing data to the consumer. The returned channel will not closed once the context is cancelled and all the producers exit.
func (*StreamAggregator) RemoveProducer ¶
func (a *StreamAggregator) RemoveProducer(key string)
RemoveProducer removes a producer from the StreamAggregator.
type StreamAggregatorOption ¶
type StreamAggregatorOption interface {
// contains filtered or unexported methods
}
StreamAggregatorOption is used to configure the StreamAggregator
func WithLogger ¶
func WithLogger(l *log.Logger) StreamAggregatorOption
WithLogger configures a logger for the StreamAggregator.