Documentation ¶
Index ¶
- Constants
- func NewKSinkBuilder(topic string, options ...KSinkOption) topology.NodeBuilder
- func NewKSource(topic string, opts ...KSourceOption) topology.Source
- func NewKTopologyBuilder(storeRegistry stores.Registry, logger log.Logger) topology.Builder
- type AggregateOpt
- type AggregateOpts
- type AutoTopicOpts
- type Branch
- type BranchedStream
- type BuilderOpt
- func BuilderWithAdminClient(admin kafka.Admin) BuilderOpt
- func BuilderWithBackendBuilder(builder backend.Builder) BuilderOpt
- func BuilderWithConsumerAdaptor(provider kafka.ConsumerProvider) BuilderOpt
- func BuilderWithConsumerProvider(provider kafka.GroupConsumerProvider) BuilderOpt
- func BuilderWithProducerProvider(provider kafka.ProducerProvider) BuilderOpt
- func BuilderWithStoreBuilder(builder stores.Builder) BuilderOpt
- type Config
- type Consumer
- type DefinedStream
- type DslOption
- type DslOptions
- type GlobalStoreBuilderWrapper
- type GlobalTable
- type GlobalTableConsumer
- type GlobalTableNode
- func (n *GlobalTableNode) Build(ctx topology.SubTopologyContext) (topology.Node, error)
- func (n *GlobalTableNode) Init(ctx topology.NodeContext) error
- func (n *GlobalTableNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (n *GlobalTableNode) State() topology.StateStore
- func (n *GlobalTableNode) StateType() topology.StateType
- func (n *GlobalTableNode) Type() topology.Type
- func (n *GlobalTableNode) WritesAt() []string
- type GlobalTableOption
- func GlobalTableWithLogger(logger log.Logger) GlobalTableOption
- func GlobalTableWithOffset(offset kafka.Offset) GlobalTableOption
- func GlobalTableWithStore(store stores.Store) GlobalTableOption
- func GlobalTableWithStoreChangelogOptions(opts ...state_stores.ChangelogBuilderOption) GlobalTableOption
- func GlobalTableWithStoreOptions(opts ...stores.Option) GlobalTableOption
- type HeaderExtractor
- type JoinOption
- type JoinOptions
- type JoinOpts
- type JoinType
- type KSinkBuilderConfigs
- type KSinkOption
- func ProduceWithHeadersExtractor(h HeaderExtractor) KSinkOption
- func ProduceWithKeyEncoder(encoder encoding.Encoder) KSinkOption
- func ProduceWithLogger(logger log.Logger) KSinkOption
- func ProduceWithPartitioner(partitioner Partitioner) KSinkOption
- func ProduceWithTombstoneFilter(f Tombstoner) KSinkOption
- func ProduceWithTopicNameFormatter(formatter TopicNameFormatter) KSinkOption
- func ProduceWithValEncoder(encoder encoding.Encoder) KSinkOption
- type KSource
- func (s *KSource) AutoCreate() bool
- func (s *KSource) Build(_ topology.SubTopologyContext) (topology.Node, error)
- func (s *KSource) CoPartitionedWith() topology.Source
- func (s *KSource) Encoder() topology.SourceEncoder
- func (s *KSource) InitialOffset() kafka.Offset
- func (s *KSource) Internal() bool
- func (s *KSource) RePartitionedAs() topology.Source
- func (s *KSource) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (s *KSource) Setup(ctx topology.SubTopologySetupContext) error
- func (s *KSource) ShouldCoPartitionedWith(source topology.Source)
- func (s *KSource) Topic() string
- func (s *KSource) TopicConfigs() kafka.TopicConfig
- func (s *KSource) Type() topology.Type
- type KSourceOption
- func ConsumeWithAutoTopicCreateEnabled(options ...TopicOpt) KSourceOption
- func ConsumeWithContextParamExtractors(fn ...SourceCtxParamExtractor) KSourceOption
- func ConsumeWithKeyEncoder(encoder encoding.Encoder) KSourceOption
- func ConsumeWithOffset(offset kafka.Offset) KSourceOption
- func ConsumeWithTopicNameFormatterFunc(fn TopicNameFormatter) KSourceOption
- func ConsumeWithValEncoder(encoder encoding.Encoder) KSourceOption
- type KSubTopology
- func (t *KSubTopology) AddProducer(p kafka.Producer)
- func (t *KSubTopology) Close() error
- func (t *KSubTopology) Id() topology.SubTopologyId
- func (t *KSubTopology) Init(ctx topology.SubTopologyContext) error
- func (t *KSubTopology) Nodes() []topology.Node
- func (t *KSubTopology) Producer() kafka.Producer
- func (t *KSubTopology) Sinks() []topology.Sink
- func (t *KSubTopology) Source(topic string) topology.Source
- func (t *KSubTopology) StateStores() map[string]topology.StateStore
- func (t *KSubTopology) Store(name string) topology.StateStore
- type KafkaVersion
- type KeyMapper
- type LocalQueryableStoreWrapper
- func (l *LocalQueryableStoreWrapper) Close() error
- func (l *LocalQueryableStoreWrapper) Get(ctx context.Context, key interface{}) (interface{}, error)
- func (l *LocalQueryableStoreWrapper) Instances() []topology.StateStore
- func (l *LocalQueryableStoreWrapper) Iterator(ctx context.Context) (stores.Iterator, error)
- func (l *LocalQueryableStoreWrapper) KeyEncoder() encoding.Encoder
- func (l *LocalQueryableStoreWrapper) Name() string
- func (l *LocalQueryableStoreWrapper) PrefixedIterator(ctx context.Context, keyPrefix interface{}, prefixEncoder encoding.Encoder) (stores.Iterator, error)
- func (l *LocalQueryableStoreWrapper) String() string
- func (l *LocalQueryableStoreWrapper) ValEncoder() encoding.Encoder
- type MultiStoreIterator
- func (i *MultiStoreIterator) Close()
- func (i *MultiStoreIterator) Error() error
- func (i *MultiStoreIterator) Key() (interface{}, error)
- func (i *MultiStoreIterator) Next()
- func (i *MultiStoreIterator) SeekToFirst()
- func (i *MultiStoreIterator) Valid() bool
- func (i *MultiStoreIterator) Value() (interface{}, error)
- type Partitioner
- type ProcessingGuarantee
- type QueryableStoreWrapper
- type RepartitionOpt
- func RePartitionAs(topic string) RepartitionOpt
- func RePartitionWithKeyEncoder(enc encoding.Encoder) RepartitionOpt
- func RePartitionWithPartitioner(partitioner Partitioner) RepartitionOpt
- func RePartitionWithTopicNameFormatter(formatter TopicNameFormatter) RepartitionOpt
- func RePartitionWithValEncoder(enc encoding.Encoder) RepartitionOpt
- type RepartitionOpts
- type Runner
- type RunnerContext
- type RunnerOpt
- type Serde
- type SinkOption
- type SourceCtxParamExtractor
- type Stream
- type StreamBuilder
- func (b *StreamBuilder) Build() (topology.Topology, error)
- func (b *StreamBuilder) GlobalTable(topic string, keyEnc, valEnc encoding.Encoder, storeName string, ...) GlobalTable
- func (b *StreamBuilder) KStream(topic string, keyEnc, valEnc encoding.Encoder, opts ...KSourceOption) Stream
- func (b *StreamBuilder) NewRunner() Runner
- func (b *StreamBuilder) StoreRegistry() stores.Registry
- func (b *StreamBuilder) Topology() topology.Builder
- type StreamOption
- type StreamOptions
- type StreamTopology
- type Table
- type TableOpt
- type TableOpts
- type Tombstoner
- type TopicNameFormatter
- type TopicOpt
- type Transformer
- type ValueMapper
Constants ¶
const GlobalTableOffsetDefault = kafka.OffsetEarliest
GlobalTableOffsetDefault defines the starting offset for the GlobalTable when GlobalTable stream syncing started.
const GlobalTableOffsetLatest = kafka.OffsetLatest
GlobalTableOffsetLatest defines the beginning of the partition. Suitable for topics with retention policy delete since the topic can contain historical data.
Variables ¶
This section is empty.
Functions ¶
func NewKSinkBuilder ¶
func NewKSinkBuilder(topic string, options ...KSinkOption) topology.NodeBuilder
func NewKSource ¶
func NewKSource(topic string, opts ...KSourceOption) topology.Source
Types ¶
type AggregateOpt ¶
type AggregateOpt func(opts *AggregateOpts)
func AggregateWithKeyEncoder ¶
func AggregateWithKeyEncoder(encoder encoding.Encoder) AggregateOpt
func AggregateWithStoreOptions ¶
func AggregateWithStoreOptions(options ...state_stores.StoreBuilderOption) AggregateOpt
func AggregateWithStreamOptions ¶
func AggregateWithStreamOptions(streamOptions ...StreamOption) AggregateOpt
func AggregateWithValEncoder ¶
func AggregateWithValEncoder(encoder encoding.Encoder) AggregateOpt
type AggregateOpts ¶
type AggregateOpts struct {
// contains filtered or unexported fields
}
type AutoTopicOpts ¶
type AutoTopicOpts struct {
// contains filtered or unexported fields
}
type BranchedStream ¶
type BranchedStream struct {
// contains filtered or unexported fields
}
func (*BranchedStream) Branch ¶
func (brs *BranchedStream) Branch(name string) Stream
func (*BranchedStream) New ¶
func (brs *BranchedStream) New(name string, predicate processors.BranchPredicate) Stream
type BuilderOpt ¶
type BuilderOpt func(config *StreamBuilder)
func BuilderWithAdminClient ¶
func BuilderWithAdminClient(admin kafka.Admin) BuilderOpt
func BuilderWithBackendBuilder ¶
func BuilderWithBackendBuilder(builder backend.Builder) BuilderOpt
func BuilderWithConsumerAdaptor ¶
func BuilderWithConsumerAdaptor(provider kafka.ConsumerProvider) BuilderOpt
func BuilderWithConsumerProvider ¶
func BuilderWithConsumerProvider(provider kafka.GroupConsumerProvider) BuilderOpt
func BuilderWithProducerProvider ¶
func BuilderWithProducerProvider(provider kafka.ProducerProvider) BuilderOpt
func BuilderWithStoreBuilder ¶
func BuilderWithStoreBuilder(builder stores.Builder) BuilderOpt
type Config ¶
type Config struct { // ApplicationId will be used as // 1 - a consumer group when using streams, and tables // 2 - topic prefix when creating repartition topics // 3 - transaction ID prefix when using producer transactions ApplicationId string // BootstrapServers a list of kafka Brokers BootstrapServers []string SecurityProtocol string Store struct { Http struct { // Enabled enable state stores http server(debug purposes only) Enabled bool // Host stores http server host(eg: http://localhost:8080) Host string } // StateDir directory to store Persistable state stores StateDir string Changelog struct { // ReplicaCount store changelog topic(auto generated) replica count ReplicaCount int16 } } // InternalTopicsDefaultReplicaCount default replica count for auto generated topic(eg: repartition topics) InternalTopicsDefaultReplicaCount int16 Processing struct { // ConsumerCount number of stream consumers(default:1) to run. This can be used to scale application vertically ConsumerCount int // Guarantee end to end processing guarantee. Supported values are ExactlyOnce(default) and AtLeastOnce Guarantee ProcessingGuarantee // Buffer defines min time or min mum of records before the flush starts Buffer tasks.BufferConfig // FailedMessageHandler used to handle failed messages(Process failures and serialization errors) FailedMessageHandler tasks.FailedMessageHandler } // Consumer default consumer properties Consumer *kafka.GroupConsumerConfig // Host application host(used to identify application instances) Host string // Producer default producer configs Producer *kafka.ProducerConfig // MetricsReporter default metrics reporter(default: NoopReporter) MetricsReporter metrics.Reporter // MetricsReporter default logger(default: NoopLogger) SaslCfg kafka.SaslCfg SSLCfg kafka.SSLCfg Tracer trace.TracerProvider TraceContext propagation.TraceContext Logger log.Logger // RepartitionTopicFormatter repartition topic name formatter function // (default ApplicationId-${storeName}-repartitioned) RepartitionTopicNameFormatter TopicNameFormatter // DefaultPartitioner if defined will be used in Sink operators. Eg: To, Repartition DefaultPartitioner Partitioner // ChangelogTopicNameFormatter changelog topic name formatter function // (default ApplicationId-${storeName}-store-changelog) ChangelogTopicNameFormatter state_stores.ChangelogTopicFormatter DltTopic string }
func NewStreamBuilderConfig ¶
func NewStreamBuilderConfig() *Config
type DefinedStream ¶
type DslOption ¶
type DslOption func(options *DslOptions)
func DslOptsSinkOptions ¶
func DslOptsSinkOptions(opts ...KSinkOption) DslOption
func DslOptsSourceOptions ¶
func DslOptsSourceOptions(opts ...KSourceOption) DslOption
func DslWithSinkOptions ¶
func DslWithSinkOptions(opts ...KSinkOption) DslOption
type DslOptions ¶
type DslOptions struct { StreamOptions // contains filtered or unexported fields }
type GlobalStoreBuilderWrapper ¶
type GlobalStoreBuilderWrapper struct {
// contains filtered or unexported fields
}
func (*GlobalStoreBuilderWrapper) Name ¶
func (s *GlobalStoreBuilderWrapper) Name() string
type GlobalTable ¶
type GlobalTable interface { StreamTopology Store() topology.LoggableStoreBuilder Stream }
type GlobalTableConsumer ¶
type GlobalTableConsumer struct {
// contains filtered or unexported fields
}
func (*GlobalTableConsumer) Init ¶
func (g *GlobalTableConsumer) Init(topologyBuilder topology.Topology) error
Init initializes the GlobalTableConsumer with the given topology and starts consuming messages from the sources of each GlobalTableTopology in the topology. For each source, it fetches the available partitions, creates a new task with the GlobalTableTopology and adds it to the task manager. Then, it creates two goroutines to run the task: one to stop the task when the program is shutting down and another to start the task and log any errors that occur. Finally, it adds the task to the runGroup.
Parameters: - topologyBuilder: the topology builder to use for creating GlobalTableTopologies
Returns:
- error: if there was an error fetching partitions, adding a task to the task manager, or if the task failed to start.
func (*GlobalTableConsumer) Ready ¶
func (g *GlobalTableConsumer) Ready() error
func (*GlobalTableConsumer) Stop ¶
func (g *GlobalTableConsumer) Stop() error
type GlobalTableNode ¶
type GlobalTableNode struct { topology.DefaultNode OffsetBackend backend.Backend StoreName string // contains filtered or unexported fields }
func (*GlobalTableNode) Build ¶
func (n *GlobalTableNode) Build(ctx topology.SubTopologyContext) (topology.Node, error)
func (*GlobalTableNode) Init ¶
func (n *GlobalTableNode) Init(ctx topology.NodeContext) error
func (*GlobalTableNode) Run ¶
func (n *GlobalTableNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*GlobalTableNode) State ¶
func (n *GlobalTableNode) State() topology.StateStore
func (*GlobalTableNode) StateType ¶
func (n *GlobalTableNode) StateType() topology.StateType
func (*GlobalTableNode) Type ¶
func (n *GlobalTableNode) Type() topology.Type
func (*GlobalTableNode) WritesAt ¶
func (n *GlobalTableNode) WritesAt() []string
type GlobalTableOption ¶
type GlobalTableOption func(options *globalTableOptions)
func GlobalTableWithLogger ¶
func GlobalTableWithLogger(logger log.Logger) GlobalTableOption
GlobalTableWithLogger overrides the default logger for the GlobalTable (default is log.NoopLogger).
func GlobalTableWithOffset ¶
func GlobalTableWithOffset(offset kafka.Offset) GlobalTableOption
GlobalTableWithOffset overrides the default starting offset when GlobalTable syncing started.
func GlobalTableWithStore ¶
func GlobalTableWithStore(store stores.Store) GlobalTableOption
func GlobalTableWithStoreChangelogOptions ¶
func GlobalTableWithStoreChangelogOptions(opts ...state_stores.ChangelogBuilderOption) GlobalTableOption
func GlobalTableWithStoreOptions ¶
func GlobalTableWithStoreOptions(opts ...stores.Option) GlobalTableOption
type HeaderExtractor ¶
type HeaderExtractor func(ctx context.Context, key, val interface{}) kafka.RecordHeaders
type JoinOption ¶
type JoinOption func(options *JoinOptions)
func JoinWithLeftValLookupFunc ¶
func JoinWithLeftValLookupFunc(fn processors.ValueLookupFunc) JoinOption
func JoinWithRightValLookupFunc ¶
func JoinWithRightValLookupFunc(fn processors.ValueLookupFunc) JoinOption
func JoinWithStreamOptions ¶
func JoinWithStreamOptions(opts ...StreamOption) JoinOption
func JoinWithValueLookupFunc ¶
func JoinWithValueLookupFunc(fn processors.ValueLookupFunc) JoinOption
type JoinOptions ¶
type JoinOptions struct {
// contains filtered or unexported fields
}
type KSinkBuilderConfigs ¶
type KSinkBuilderConfigs struct { Topic string Encoder topology.SinkEncoder }
type KSinkOption ¶
type KSinkOption func(sink *kSinkBuilder)
func ProduceWithHeadersExtractor ¶
func ProduceWithHeadersExtractor(h HeaderExtractor) KSinkOption
func ProduceWithKeyEncoder ¶
func ProduceWithKeyEncoder(encoder encoding.Encoder) KSinkOption
func ProduceWithLogger ¶
func ProduceWithLogger(logger log.Logger) KSinkOption
func ProduceWithPartitioner ¶
func ProduceWithPartitioner(partitioner Partitioner) KSinkOption
func ProduceWithTombstoneFilter ¶
func ProduceWithTombstoneFilter(f Tombstoner) KSinkOption
func ProduceWithTopicNameFormatter ¶
func ProduceWithTopicNameFormatter(formatter TopicNameFormatter) KSinkOption
func ProduceWithValEncoder ¶
func ProduceWithValEncoder(encoder encoding.Encoder) KSinkOption
type KSource ¶
type KSource struct { topology.DefaultNode // contains filtered or unexported fields }
func (*KSource) AutoCreate ¶
func (*KSource) CoPartitionedWith ¶
func (*KSource) Encoder ¶
func (s *KSource) Encoder() topology.SourceEncoder
func (*KSource) InitialOffset ¶
func (*KSource) RePartitionedAs ¶
func (*KSource) ShouldCoPartitionedWith ¶
func (*KSource) TopicConfigs ¶
func (s *KSource) TopicConfigs() kafka.TopicConfig
type KSourceOption ¶
type KSourceOption func(*KSource)
KSourceOption is used to customize the KSource
func ConsumeWithAutoTopicCreateEnabled ¶
func ConsumeWithAutoTopicCreateEnabled(options ...TopicOpt) KSourceOption
func ConsumeWithContextParamExtractors ¶
func ConsumeWithContextParamExtractors(fn ...SourceCtxParamExtractor) KSourceOption
ConsumeWithContextParamExtractors adds a list of SourceCtxParamBinder func to the topology.Source and each binder will be called in a loop for each record
func ConsumeWithKeyEncoder ¶
func ConsumeWithKeyEncoder(encoder encoding.Encoder) KSourceOption
func ConsumeWithOffset ¶
func ConsumeWithOffset(offset kafka.Offset) KSourceOption
func ConsumeWithTopicNameFormatterFunc ¶
func ConsumeWithTopicNameFormatterFunc(fn TopicNameFormatter) KSourceOption
func ConsumeWithValEncoder ¶
func ConsumeWithValEncoder(encoder encoding.Encoder) KSourceOption
type KSubTopology ¶
type KSubTopology struct {
// contains filtered or unexported fields
}
func (*KSubTopology) AddProducer ¶
func (t *KSubTopology) AddProducer(p kafka.Producer)
func (*KSubTopology) Close ¶
func (t *KSubTopology) Close() error
func (*KSubTopology) Id ¶
func (t *KSubTopology) Id() topology.SubTopologyId
func (*KSubTopology) Init ¶
func (t *KSubTopology) Init(ctx topology.SubTopologyContext) error
func (*KSubTopology) Nodes ¶
func (t *KSubTopology) Nodes() []topology.Node
func (*KSubTopology) Producer ¶
func (t *KSubTopology) Producer() kafka.Producer
func (*KSubTopology) Sinks ¶
func (t *KSubTopology) Sinks() []topology.Sink
func (*KSubTopology) StateStores ¶
func (t *KSubTopology) StateStores() map[string]topology.StateStore
func (*KSubTopology) Store ¶
func (t *KSubTopology) Store(name string) topology.StateStore
type KafkaVersion ¶
type KafkaVersion string
type LocalQueryableStoreWrapper ¶
type LocalQueryableStoreWrapper struct {
// contains filtered or unexported fields
}
func (*LocalQueryableStoreWrapper) Close ¶
func (l *LocalQueryableStoreWrapper) Close() error
func (*LocalQueryableStoreWrapper) Get ¶
func (l *LocalQueryableStoreWrapper) Get(ctx context.Context, key interface{}) (interface{}, error)
func (*LocalQueryableStoreWrapper) Instances ¶
func (l *LocalQueryableStoreWrapper) Instances() []topology.StateStore
func (*LocalQueryableStoreWrapper) KeyEncoder ¶
func (l *LocalQueryableStoreWrapper) KeyEncoder() encoding.Encoder
func (*LocalQueryableStoreWrapper) Name ¶
func (l *LocalQueryableStoreWrapper) Name() string
func (*LocalQueryableStoreWrapper) PrefixedIterator ¶
func (*LocalQueryableStoreWrapper) String ¶
func (l *LocalQueryableStoreWrapper) String() string
func (*LocalQueryableStoreWrapper) ValEncoder ¶
func (l *LocalQueryableStoreWrapper) ValEncoder() encoding.Encoder
type MultiStoreIterator ¶
type MultiStoreIterator struct {
// contains filtered or unexported fields
}
func (*MultiStoreIterator) Close ¶
func (i *MultiStoreIterator) Close()
func (*MultiStoreIterator) Error ¶
func (i *MultiStoreIterator) Error() error
func (*MultiStoreIterator) Key ¶
func (i *MultiStoreIterator) Key() (interface{}, error)
func (*MultiStoreIterator) Next ¶
func (i *MultiStoreIterator) Next()
func (*MultiStoreIterator) SeekToFirst ¶
func (i *MultiStoreIterator) SeekToFirst()
func (*MultiStoreIterator) Valid ¶
func (i *MultiStoreIterator) Valid() bool
func (*MultiStoreIterator) Value ¶
func (i *MultiStoreIterator) Value() (interface{}, error)
type Partitioner ¶
type ProcessingGuarantee ¶
type ProcessingGuarantee int8
const ( AtLeastOnce ProcessingGuarantee = iota ExactlyOnce )
type QueryableStoreWrapper ¶
type QueryableStoreWrapper interface { Instances() []topology.LoggableStateStore stores.ReadOnlyStore }
type RepartitionOpt ¶
type RepartitionOpt func(rpOpts *RepartitionOpts)
func RePartitionAs ¶
func RePartitionAs(topic string) RepartitionOpt
func RePartitionWithKeyEncoder ¶
func RePartitionWithKeyEncoder(enc encoding.Encoder) RepartitionOpt
func RePartitionWithPartitioner ¶
func RePartitionWithPartitioner(partitioner Partitioner) RepartitionOpt
func RePartitionWithTopicNameFormatter ¶
func RePartitionWithTopicNameFormatter(formatter TopicNameFormatter) RepartitionOpt
func RePartitionWithValEncoder ¶
func RePartitionWithValEncoder(enc encoding.Encoder) RepartitionOpt
type RepartitionOpts ¶
type RepartitionOpts struct {
// contains filtered or unexported fields
}
type RunnerContext ¶
type RunnerOpt ¶
type RunnerOpt func(runner *streamRunner)
func NotifyGlobalStoresReady ¶
func NotifyGlobalStoresReady(ch chan struct{}) RunnerOpt
type SinkOption ¶
type SinkOption interface{}
type SourceCtxParamExtractor ¶
SourceCtxParamExtractor extracts a key:val pair from a record. Used to bind key:val pairs into the record Context.
type Stream ¶
type Stream interface { Branch(branches ...processors.BranchDetails) []Stream Split(opts ...StreamOption) *BranchedStream SelectKey(selectKeyFunc processors.SelectKeyFunc, opts ...StreamOption) Stream Aggregate(store string, aggregatorFunc processors.AggregatorFunc, opts ...AggregateOpt) Table FlatMap(flatMapFunc processors.FlatMapFunc, opts ...StreamOption) Stream FlatMapValues(flatMapFunc processors.FlatMapValuesFunc, opts ...StreamOption) Stream Map(mapper processors.MapperFunc, opts ...StreamOption) Stream MapValue(valTransformFunc processors.MapValueFunc, opts ...StreamOption) Stream Filter(filter processors.FilterFunc, opts ...StreamOption) Stream Each(eachFunc processors.EachFunc, opts ...StreamOption) Stream NewProcessor(node topology.NodeBuilder, opts ...StreamOption) Stream JoinGlobalTable(table GlobalTable, keyMapper processors.KeyMapper, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream LeftJoinGlobalTable(table GlobalTable, keyMapper processors.KeyMapper, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream JoinTable(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream LeftJoinTable(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream // Through redirect the stream through an intermediate topic // Deprecated: use Repartition instead Through(topic string, options ...DslOption) Stream ToTable(store string, options ...TableOpt) Table Merge(stream Stream) Stream Repartition(topic string, opts ...RepartitionOpt) Stream AddStateStore(name string, keyEnc, valEnc encoding.Encoder, options ...state_stores.StoreBuilderOption) To(topic string, options ...KSinkOption) StreamTopology // contains filtered or unexported methods }
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(config *Config, opts ...BuilderOpt) *StreamBuilder
func (*StreamBuilder) GlobalTable ¶
func (b *StreamBuilder) GlobalTable(topic string, keyEnc, valEnc encoding.Encoder, storeName string, options ...GlobalTableOption) GlobalTable
func (*StreamBuilder) KStream ¶
func (b *StreamBuilder) KStream(topic string, keyEnc, valEnc encoding.Encoder, opts ...KSourceOption) Stream
func (*StreamBuilder) NewRunner ¶
func (b *StreamBuilder) NewRunner() Runner
func (*StreamBuilder) StoreRegistry ¶
func (b *StreamBuilder) StoreRegistry() stores.Registry
func (*StreamBuilder) Topology ¶
func (b *StreamBuilder) Topology() topology.Builder
type StreamOption ¶
type StreamOption func(options *StreamOptions)
func DisableRepartition ¶
func DisableRepartition() StreamOption
DisableRepartition disables stream repartitioning even if it's marked for repartitioning. Useful when dealing with custom partitioners and joiners where message partition doesn't rely on its key.
func Named ¶
func Named(name string) StreamOption
type StreamOptions ¶
type StreamOptions struct {
// contains filtered or unexported fields
}
type StreamTopology ¶
type StreamTopology interface {
// contains filtered or unexported methods
}
type Table ¶
type Table interface { StreamTopology ToStream(opts ...StreamOption) Stream Filter(filter processors.FilterFunc, opts ...StreamOption) Table Each(eachFunc processors.EachFunc, opts ...StreamOption) Table Join(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table JoinGlobalTable(table GlobalTable, keyMapper processors.KeyMapper, valMapper processors.JoinValueMapper, opts ...JoinOption) Table LeftJoin(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table RightJoin(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table OuterJoin(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table // contains filtered or unexported methods }
type TableOpt ¶
type TableOpt func(opts *TableOpts)
func TableWithKeyEncoder ¶
func TableWithSourceAsChangelog ¶
func TableWithSourceAsChangelog() TableOpt
func TableWithStoreOptions ¶
func TableWithStoreOptions(options ...state_stores.StoreBuilderOption) TableOpt
func TableWithStreamOptions ¶
func TableWithStreamOptions(options ...StreamOption) TableOpt
func TableWithValEncoder ¶
type Tombstoner ¶
type TopicNameFormatter ¶
type TopicOpt ¶
type TopicOpt func(tpOpts *AutoTopicOpts)
func PartitionAs ¶
func WithPartitionCount ¶
func WithReplicaCount ¶
func WithTopicConfigs ¶
type Transformer ¶
type Transformer func(ctx topology.NodeContext, key, value interface{})
type ValueMapper ¶
Source Files ¶
- config.go
- consumer.go
- dsl_options.go
- global_store_builder_wrapper.go
- global_table_consumer.go
- global_table_node.go
- global_table_stream.go
- ksink.go
- ksink_builder.go
- ksource.go
- kstream.go
- kstream_options.go
- ksub_topology.go
- ksub_topology_builder.go
- ktable_stream.go
- ktopology.go
- ktopology_builder.go
- queryble_store_wrapper.go
- repartition_opts.go
- runner.go
- runner_context.go
- serde.go
- stream.go
- stream_builder.go
- stream_consumer.go
- stream_consumer_instance.go
- topic_opts.go