Documentation
¶
Index ¶
- Constants
- type BuilderOption
- func WithBackendBuilder(builder backend.Builder) BuilderOption
- func WithChangelogBuilder(builder changelog.Builder) BuilderOption
- func WithConsumerBuilder(builder consumer.Builder) BuilderOption
- func WithKafkaAdmin(kafkaAdmin admin.KafkaAdmin) BuilderOption
- func WithOffsetManager(offsetManager offsets.Manager) BuilderOption
- func WithPartitionConsumerBuilder(builder consumer.PartitionConsumerBuilder) BuilderOption
- func WithProducerBuilder(builder producer.Builder) BuilderOption
- func WithStateStoreBuilder(builder store.StateStoreBuilder) BuilderOption
- func WithStoreBuilder(builder store.Builder) BuilderOption
- type DefaultBuilders
- type GlobalTable
- type GlobalTableOffset
- type GlobalTableOption
- type GlobalTableStreamConfig
- type Instances
- type InstancesOptions
- type KSink
- func (s *KSink) AddChild(node topology.Node)
- func (s *KSink) AddChildBuilder(builder topology.NodeBuilder)
- func (s *KSink) Build() (topology.Node, error)
- func (s *KSink) ChildBuilders() []topology.NodeBuilder
- func (s *KSink) Childs() []topology.Node
- func (s *KSink) Close() error
- func (s *KSink) ID() int32
- func (s *KSink) Info() map[string]string
- func (s *KSink) Name() string
- func (*KSink) Next() bool
- func (s *KSink) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
- func (s *KSink) SinkType() string
- func (*KSink) Type() topology.Type
- type Option
- type Repartition
- type RepartitionOption
- type RepartitionOptions
- type RepartitionTopic
- type Side
- type SinkOption
- type SinkRecord
- type SourceNode
- func (sn *SourceNode) AddChild(node topology.Node)
- func (sn *SourceNode) AddChildBuilder(builder topology.NodeBuilder)
- func (sn *SourceNode) Build() (topology.Node, error)
- func (sn *SourceNode) ChildBuilders() []topology.NodeBuilder
- func (sn *SourceNode) Childs() []topology.Node
- func (sn *SourceNode) Close()
- func (sn *SourceNode) Name() string
- func (sn *SourceNode) Next() bool
- func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (sn *SourceNode) Type() topology.Type
- type StoreWriter
- type Stream
- type StreamBuilder
- func (b *StreamBuilder) Build(streams ...Stream) error
- func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) GlobalTable
- func (b *StreamBuilder) StoreRegistry() store.Registry
- func (b *StreamBuilder) Stream(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) Stream
- type StreamBuilderConfig
- type StreamConfigs
- type StreamInstance
Constants ¶
View Source
const ( LeftJoin join.Type = iota InnerJoin )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BuilderOption ¶
type BuilderOption func(*DefaultBuilders)
func WithBackendBuilder ¶
func WithBackendBuilder(builder backend.Builder) BuilderOption
func WithChangelogBuilder ¶
func WithChangelogBuilder(builder changelog.Builder) BuilderOption
func WithConsumerBuilder ¶
func WithConsumerBuilder(builder consumer.Builder) BuilderOption
func WithKafkaAdmin ¶
func WithKafkaAdmin(kafkaAdmin admin.KafkaAdmin) BuilderOption
func WithOffsetManager ¶
func WithOffsetManager(offsetManager offsets.Manager) BuilderOption
func WithPartitionConsumerBuilder ¶
func WithPartitionConsumerBuilder(builder consumer.PartitionConsumerBuilder) BuilderOption
func WithProducerBuilder ¶
func WithProducerBuilder(builder producer.Builder) BuilderOption
func WithStateStoreBuilder ¶
func WithStateStoreBuilder(builder store.StateStoreBuilder) BuilderOption
func WithStoreBuilder ¶
func WithStoreBuilder(builder store.Builder) BuilderOption
type DefaultBuilders ¶
type DefaultBuilders struct { Producer producer.Builder Consumer consumer.Builder PartitionConsumer consumer.PartitionConsumerBuilder Store store.Builder Backend backend.Builder StateStore store.StateStoreBuilder OffsetManager offsets.Manager KafkaAdmin admin.KafkaAdmin // contains filtered or unexported fields }
type GlobalTable ¶
type GlobalTable interface { Stream }
type GlobalTableOffset ¶
type GlobalTableOffset int64
Starting offset for the global table partition.
const GlobalTableOffsetDefault GlobalTableOffset = 0
GlobalTableOffsetDefault defines the starting offset for the GlobalTable when GlobalTable stream syncing started.
const GlobalTableOffsetLatest GlobalTableOffset = -1
GlobalTableOffsetLatest defines the beginning of the partition. suitable for stream topics since the topic can contains historical data.
type GlobalTableOption ¶
type GlobalTableOption func(options *globalTableOptions)
func GlobalTableWithBackendWriter ¶
func GlobalTableWithBackendWriter(writer StoreWriter) GlobalTableOption
GlobalTableWithBackendWriter overrides the persisting behavior of the GlobalTable. eg :
func(r *data.Record, store store.Store) error { // tombstone handling if r.Value == nil { if err := store.Backend().Delete(r.Key); err != nil { return err } } return store.Backend().Set(r.Key, r.Value, 0) }
func GlobalTableWithLogger ¶
func GlobalTableWithLogger(logger log.Logger) GlobalTableOption
GlobalTableWithLogger overrides the default logger for the GlobalTable (default is NoopLogger).
func GlobalTableWithOffset ¶
func GlobalTableWithOffset(offset GlobalTableOffset) GlobalTableOption
GlobalTableWithOffset overrides the default starting offset when GlobalTable syncing started.
type GlobalTableStreamConfig ¶
type GlobalTableStreamConfig struct { ConsumerBuilder consumer.PartitionConsumerBuilder BackendBuilder backend.Builder OffsetManager offsets.Manager KafkaAdmin admin.KafkaAdmin Metrics metrics.Reporter Logger log.Logger }
type Instances ¶
type Instances struct {
// contains filtered or unexported fields
}
func NewStreams ¶
func NewStreams(builder *StreamBuilder, options ...InstancesOptions) *Instances
type InstancesOptions ¶
type InstancesOptions func(config *instancesOptions)
func NotifyOnStart ¶
func NotifyOnStart(c chan bool) InstancesOptions
func WithReBalanceHandler ¶
func WithReBalanceHandler(h consumer.ReBalanceHandler) InstancesOptions
type KSink ¶
type KSink struct { Id int32 KeyEncoder encoding.Encoder ValEncoder encoding.Encoder Producer producer.Producer ProducerBuilder producer.Builder TopicPrefix string Repartitioned bool KeyEncoderBuilder encoding.Builder ValEncoderBuilder encoding.Builder // contains filtered or unexported fields }
func NewKSinkBuilder ¶
func (*KSink) AddChildBuilder ¶
func (s *KSink) AddChildBuilder(builder topology.NodeBuilder)
func (*KSink) ChildBuilders ¶
func (s *KSink) ChildBuilders() []topology.NodeBuilder
type Option ¶
type Option func(*kStreamOptions)
func WithConfig ¶
func WithConfig(configs StreamConfigs) Option
func WithLogger ¶
func WithLogger(logger log.Logger) Option
func WithWorkerPoolOptions ¶
func WithWorkerPoolOptions(poolConfig *worker_pool.PoolConfig) Option
type Repartition ¶
type Repartition struct { Enable bool StreamSide Side KeyEncoder encoding.Builder ValueEncoder encoding.Builder Topic RepartitionTopic }
func (Repartition) Validate ¶
func (r Repartition) Validate(s Side) error
type RepartitionOption ¶
type RepartitionOption func(sink *RepartitionOptions)
func RepartitionLeftStream ¶
func RepartitionLeftStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
func RepartitionRightStream ¶
func RepartitionRightStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
type RepartitionOptions ¶
type RepartitionOptions struct { LeftTopic topic RightTopic topic LeftRepartition Repartition RightRepartition Repartition }
func (*RepartitionOptions) Apply ¶
func (iOpts *RepartitionOptions) Apply(options ...RepartitionOption)
type RepartitionTopic ¶
type SinkOption ¶
type SinkOption func(sink *KSink)
func WithCustomRecord ¶
func WithCustomRecord(f func(ctx context.Context, in SinkRecord) (out SinkRecord, err error)) SinkOption
func WithProducer ¶
func WithProducer(p producer.Builder) SinkOption
type SinkRecord ¶
type SinkRecord struct {
Key, Value interface{}
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
Headers []*sarama.RecordHeader // only set if kafka is version 0.11+
}
type SourceNode ¶
type SourceNode struct { Id int32 // contains filtered or unexported fields }
func (*SourceNode) AddChild ¶
func (sn *SourceNode) AddChild(node topology.Node)
func (*SourceNode) AddChildBuilder ¶
func (sn *SourceNode) AddChildBuilder(builder topology.NodeBuilder)
func (*SourceNode) ChildBuilders ¶
func (sn *SourceNode) ChildBuilders() []topology.NodeBuilder
func (*SourceNode) Childs ¶
func (sn *SourceNode) Childs() []topology.Node
func (*SourceNode) Close ¶
func (sn *SourceNode) Close()
func (*SourceNode) Name ¶
func (sn *SourceNode) Name() string
func (*SourceNode) Next ¶
func (sn *SourceNode) Next() bool
func (*SourceNode) Run ¶
func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*SourceNode) Type ¶
func (sn *SourceNode) Type() topology.Type
type Stream ¶
type Stream interface { Branch(branches []branch.Details, opts ...Option) []Stream SelectKey(selectKeyFunc processors.SelectKeyFunc) Stream TransformValue(valueTransformFunc processors.ValueTransformFunc) Stream Transform(transformer processors.TransFunc) Stream Filter(filter processors.FilterFunc) Stream Process(processor processors.ProcessFunc) Stream JoinGlobalTable(table Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper, typ join.Type) Stream JoinKTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream JoinStream(stream Stream, valMapper join.ValueMapper, opts ...RepartitionOption) Stream //LeftJoin(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream Through(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) Stream To(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) }
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(config *StreamBuilderConfig, options ...BuilderOption) *StreamBuilder
func (*StreamBuilder) Build ¶
func (b *StreamBuilder) Build(streams ...Stream) error
func (*StreamBuilder) GlobalTable ¶
func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, store string, options ...GlobalTableOption) GlobalTable
func (*StreamBuilder) StoreRegistry ¶
func (b *StreamBuilder) StoreRegistry() store.Registry
type StreamBuilderConfig ¶
type StreamBuilderConfig struct { ApplicationId string AsyncProcessing bool BootstrapServers []string // kafka Brokers WorkerPool *worker_pool.PoolConfig Store struct { BackendBuilder backend.Builder ChangeLog struct { MinInSycReplicas int // min number of insync replications in other nodes ReplicationFactor int Suffix string Buffered bool BufferedSize int } Http struct { Enabled bool Host string } } DLQ struct { Enabled bool BootstrapServers []string TopicFormat string //Type dlq.DqlType // G, T Topic string // if global } Host string ChangeLog struct { Enabled bool Replicated bool MinInSycReplicas int // min number of insync replications in other nodes ReplicationFactor int Suffix string Buffer struct { Enabled bool Size int FlushInterval time.Duration } } Consumer *consumer.Config ConsumerCount int *sarama.Config Producer *producer.Config MetricsReporter metrics.Reporter Logger log.Logger DefaultBuilders *DefaultBuilders }
func NewStreamBuilderConfig ¶
func NewStreamBuilderConfig() *StreamBuilderConfig
func (*StreamBuilderConfig) String ¶
func (c *StreamBuilderConfig) String(b *StreamBuilder) string
type StreamConfigs ¶
type StreamConfigs map[string]interface{}
type StreamInstance ¶
type StreamInstance struct {
// contains filtered or unexported fields
}
func (*StreamInstance) Start ¶
func (s *StreamInstance) Start(wg *sync.WaitGroup) error
starts the high level consumer for all streams
func (*StreamInstance) Stop ¶
func (s *StreamInstance) Stop()
Source Files
¶
Click to show internal directories.
Click to hide internal directories.