package module
v0.0.0-...-6c519ef Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2018 License: Apache-2.0 Imports: 6 Imported by: 0



GoDoc travis

A stream aggregator is used to aggregate data in the fan-in scheme.


This repository should be imported as:

import streamaggregator ""
Dynamic Producers and Consumers

It manages producers and consumers to be added and removed freely.

When a new producer is added, the existing consumers will start to receive data from the producer. When a new consumer is added, all previoulsy added producers will start writing to the new consumer.

Lifecycle Management

Contexts are used to manage the lifecycle of the consumer. Cancelling the consumer's context will indicated to the producers to drain (and stop any reconnect logic). Once each producer exits, the given channel is closed. Contexts were chosen due to their tight integration with gRPC.




This section is empty.


This section is empty.


This section is empty.


type ConsumeOption

type ConsumeOption interface {
	// contains filtered or unexported methods

ConsumeOption is used to configure a consumer. Defaults to 0.

func WithConsumeChannelLength

func WithConsumeChannelLength(length int) ConsumeOption

WithConsumeChannelLength sets the channel buffer length for the resulting channel.

type Producer

type Producer interface {
	// Produce is invoked for each consumer. The given context will be cancelled
	// when the producer should stop trying to write data. Until the context
	// is cancelled, it is expected to write available data to the given
	// channel. The Producer must not close the channel. If the producer
	// encounters an error, it is expected to do its own retry logic.
	Produce(ctx context.Context, request interface{}, c chan<- interface{})

Producer is used to produce data for subscribed consumers.

type ProducerFunc

type ProducerFunc func(ctx context.Context, request interface{}, c chan<- interface{})

ProducerFunc is an adapter to allow ordinary functions to be a Producer.

func (ProducerFunc) Produce

func (f ProducerFunc) Produce(ctx context.Context, request interface{}, c chan<- interface{})

Producer implements Producer.

type StreamAggregator

type StreamAggregator struct {
	// contains filtered or unexported fields

StreamAggregator takes a dynamic list of producers and writes to interested consumers. When a consumer is removed, it will be told to not retry any connect logic via the given context being cancelled. When a producer is added, it will be instructed to write to all the subscribed consumers. If producers are available when a consumer is added, each producer will start writing to the consumer.

func New

New constructs a new StreamAggregator with the given options.

func (*StreamAggregator) AddProducer

func (a *StreamAggregator) AddProducer(key string, p Producer)

AddProducer adds a producer to the StreamAggregator.

func (*StreamAggregator) Consume

func (a *StreamAggregator) Consume(ctx context.Context, request interface{}, opts ...ConsumeOption) <-chan interface{}

Consume starts consuming data from all the given Producers. As producers are added, each will start writing data to the consumer. The returned channel will not closed once the context is cancelled and all the producers exit.

func (*StreamAggregator) RemoveProducer

func (a *StreamAggregator) RemoveProducer(key string)

RemoveProducer removes a producer from the StreamAggregator.

type StreamAggregatorOption

type StreamAggregatorOption interface {
	// contains filtered or unexported methods

StreamAggregatorOption is used to configure the StreamAggregator

func WithLogger

func WithLogger(l *log.Logger) StreamAggregatorOption

WithLogger configures a logger for the StreamAggregator.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL