Documentation ¶
Index ¶
- Constants
- func GlobalTableWithLogger(logger log.Logger) globalTableOption
- func GlobalTableWithOffset(offset GlobalTableOffset) globalTableOption
- type DefaultBuilders
- type GlobalTable
- type GlobalTableOffset
- type GlobalTableStreamConfig
- type Instances
- type InstancesOptions
- type KSink
- func (s *KSink) AddChild(node node.Node)
- func (s *KSink) AddChildBuilder(builder node.NodeBuilder)
- func (s *KSink) Build() (node.Node, error)
- func (s *KSink) ChildBuilders() []node.NodeBuilder
- func (s *KSink) Childs() []node.Node
- func (s *KSink) Close() error
- func (s *KSink) ID() int32
- func (s *KSink) Info() map[string]string
- func (s *KSink) Name() string
- func (*KSink) Next() bool
- func (s *KSink) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
- func (s *KSink) SinkType() string
- func (*KSink) Type() node.Type
- type Option
- type Repartition
- type RepartitionOption
- type RepartitionOptions
- type RepartitionTopic
- type Side
- type SinkOption
- type SourceNode
- func (sn *SourceNode) AddChild(node node.Node)
- func (sn *SourceNode) AddChildBuilder(builder node.NodeBuilder)
- func (sn *SourceNode) Build() (node.Node, error)
- func (sn *SourceNode) ChildBuilders() []node.NodeBuilder
- func (sn *SourceNode) Childs() []node.Node
- func (sn *SourceNode) Close()
- func (sn *SourceNode) Name() string
- func (sn *SourceNode) Next() bool
- func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (sn *SourceNode) Type() node.Type
- type Stream
- type StreamBuilder
- func (b *StreamBuilder) Build(streams ...Stream) error
- func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) GlobalTable
- func (b *StreamBuilder) StoreRegistry() store.Registry
- func (b *StreamBuilder) Stream(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) Stream
- type StreamBuilderConfig
- type StreamConfigs
- type StreamInstance
Constants ¶
View Source
const ( LeftJoin join.JoinType = iota InnerJoin )
Variables ¶
This section is empty.
Functions ¶
func GlobalTableWithLogger ¶
func GlobalTableWithLogger(logger log.Logger) globalTableOption
func GlobalTableWithOffset ¶
func GlobalTableWithOffset(offset GlobalTableOffset) globalTableOption
Types ¶
type DefaultBuilders ¶
type DefaultBuilders struct { Producer producer.Builder Consumer consumer.Builder PartitionConsumer consumer.PartitionConsumerBuilder Store store.Builder Backend backend.Builder StateStore store.StateStoreBuilder OffsetManager offsets.Manager KafkaAdmin admin.KafkaAdmin // contains filtered or unexported fields }
type GlobalTable ¶
type GlobalTable interface { Stream }
type GlobalTableOffset ¶
type GlobalTableOffset int
const GlobalTableOffsetDefault GlobalTableOffset = 0
table will start syncing from locally stored offset or topic oldest offset
const GlobalTableOffsetLatest GlobalTableOffset = -1
table will start syncing from topic latest offset (suitable for stream topics since the topic can contains historical data )
type GlobalTableStreamConfig ¶
type GlobalTableStreamConfig struct { ConsumerBuilder consumer.PartitionConsumerBuilder BackendBuilder backend.Builder OffsetManager offsets.Manager KafkaAdmin admin.KafkaAdmin Metrics metrics.Reporter Logger log.Logger }
type Instances ¶
type Instances struct {
// contains filtered or unexported fields
}
func NewStreams ¶
func NewStreams(builder *StreamBuilder, options ...InstancesOptions) *Instances
type InstancesOptions ¶
type InstancesOptions func(config *instancesOptions)
func NotifyOnStart ¶
func NotifyOnStart(c chan bool) InstancesOptions
func WithReBalanceHandler ¶
func WithReBalanceHandler(h consumer.ReBalanceHandler) InstancesOptions
type KSink ¶
type KSink struct { Id int32 KeyEncoder encoding.Encoder ValEncoder encoding.Encoder Producer producer.Producer ProducerBuilder producer.Builder TopicPrefix string Repartitioned bool KeyEncoderBuilder encoding.Builder ValEncoderBuilder encoding.Builder // contains filtered or unexported fields }
func NewKSinkBuilder ¶
func (*KSink) AddChildBuilder ¶
func (s *KSink) AddChildBuilder(builder node.NodeBuilder)
func (*KSink) ChildBuilders ¶
func (s *KSink) ChildBuilders() []node.NodeBuilder
type Option ¶
type Option func(*kStreamConfig)
func WithConfig ¶
func WithConfig(configs StreamConfigs) Option
func WithLogger ¶
func WithWorkerPool ¶
func WithWorkerPool(poolConfig *task_pool.PoolConfig) Option
type Repartition ¶
type Repartition struct { Enable bool StreamSide Side KeyEncoder encoding.Builder ValueEncoder encoding.Builder Topic RepartitionTopic }
func (Repartition) Validate ¶
func (r Repartition) Validate(s Side) error
type RepartitionOption ¶
type RepartitionOption func(sink *RepartitionOptions)
func RepartitionLeftStream ¶
func RepartitionLeftStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
func RepartitionRightStream ¶
func RepartitionRightStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
type RepartitionOptions ¶
type RepartitionOptions struct { LeftTopic topic RightTopic topic LeftRepartition Repartition RightRepartition Repartition }
func (*RepartitionOptions) Apply ¶
func (iOpts *RepartitionOptions) Apply(options ...RepartitionOption)
type RepartitionTopic ¶
type SinkOption ¶
type SinkOption func(sink *KSink)
func WithProducer ¶
func WithProducer(p producer.Builder) SinkOption
type SourceNode ¶
type SourceNode struct { Id int32 // contains filtered or unexported fields }
func (*SourceNode) AddChild ¶
func (sn *SourceNode) AddChild(node node.Node)
func (*SourceNode) AddChildBuilder ¶
func (sn *SourceNode) AddChildBuilder(builder node.NodeBuilder)
func (*SourceNode) ChildBuilders ¶
func (sn *SourceNode) ChildBuilders() []node.NodeBuilder
func (*SourceNode) Childs ¶
func (sn *SourceNode) Childs() []node.Node
func (*SourceNode) Close ¶
func (sn *SourceNode) Close()
func (*SourceNode) Name ¶
func (sn *SourceNode) Name() string
func (*SourceNode) Next ¶
func (sn *SourceNode) Next() bool
func (*SourceNode) Run ¶
func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*SourceNode) Type ¶
func (sn *SourceNode) Type() node.Type
type Stream ¶
type Stream interface { Branch(branches []branch.Details, opts ...Option) []Stream SelectKey(selectKeyFunc processors.SelectKeyFunc) Stream TransformValue(valueTransformFunc processors.ValueTransformFunc) Stream Transform(transformer processors.TransFunc) Stream Filter(filter processors.FilterFunc) Stream Process(processor processors.ProcessFunc) Stream JoinGlobalTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper, typ join.JoinType) Stream JoinKTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream JoinStream(stream Stream, valMapper join.ValueMapper, opts ...RepartitionOption) Stream //LeftJoin(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream Through(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) Stream To(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) }
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(config *StreamBuilderConfig) *StreamBuilder
func (*StreamBuilder) Build ¶
func (b *StreamBuilder) Build(streams ...Stream) error
func (*StreamBuilder) GlobalTable ¶
func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, store string, options ...globalTableOption) GlobalTable
func (*StreamBuilder) StoreRegistry ¶
func (b *StreamBuilder) StoreRegistry() store.Registry
type StreamBuilderConfig ¶
type StreamBuilderConfig struct { ApplicationId string AsyncProcessing bool BootstrapServers []string // kafka Brokers WorkerPool *task_pool.PoolConfig Store struct { BackendBuilder backend.Builder ChangeLog struct { MinInSycReplicas int // min number of insync replications in other nodes ReplicationFactor int Suffix string Buffered bool BufferedSize int } Http struct { Host string } } DLQ struct { Enabled bool BootstrapServers []string TopicFormat string //Type dlq.DqlType // G, T Topic string // if global } Host string ChangeLog struct { Enabled bool Replicated bool MinInSycReplicas int // min number of insync replications in other nodes ReplicationFactor int Suffix string Buffer struct { Enabled bool Size int FlushInterval time.Duration } } Consumer *consumer.Config ConsumerCount int *sarama.Config Producer *producer.Config MetricsReporter metrics.Reporter Logger log.Logger DefaultBuilders *DefaultBuilders }
func NewStreamBuilderConfig ¶
func NewStreamBuilderConfig() *StreamBuilderConfig
func (*StreamBuilderConfig) String ¶
func (c *StreamBuilderConfig) String(b *StreamBuilder) string
type StreamConfigs ¶
type StreamConfigs map[string]interface{}
type StreamInstance ¶
type StreamInstance struct {
// contains filtered or unexported fields
}
func (*StreamInstance) Start ¶
func (s *StreamInstance) Start(wg *sync.WaitGroup) error
starts the high level consumer for all streams
func (*StreamInstance) Stop ¶
func (s *StreamInstance) Stop()
Source Files ¶
Click to show internal directories.
Click to hide internal directories.