streams

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const GlobalTableOffsetDefault = kafka.OffsetEarliest

GlobalTableOffsetDefault defines the starting offset for the GlobalTable when GlobalTable stream syncing started.

View Source
const GlobalTableOffsetLatest = kafka.OffsetLatest

GlobalTableOffsetLatest defines the beginning of the partition. Suitable for topics with retention policy delete since the topic can contain historical data.

Variables

This section is empty.

Functions

func NewKSinkBuilder

func NewKSinkBuilder(topic string, options ...KSinkOption) topology.NodeBuilder

func NewKSource

func NewKSource(topic string, opts ...KSourceOption) topology.Source

func NewKTopologyBuilder

func NewKTopologyBuilder(storeRegistry stores.Registry, logger log.Logger) topology.Builder

Types

type AggregateOpt

type AggregateOpt func(opts *AggregateOpts)

func AggregateWithKeyEncoder

func AggregateWithKeyEncoder(encoder encoding.Encoder) AggregateOpt

func AggregateWithStoreOptions

func AggregateWithStoreOptions(options ...state_stores.StoreBuilderOption) AggregateOpt

func AggregateWithStreamOptions

func AggregateWithStreamOptions(streamOptions ...StreamOption) AggregateOpt

func AggregateWithValEncoder

func AggregateWithValEncoder(encoder encoding.Encoder) AggregateOpt

type AggregateOpts

type AggregateOpts struct {
	// contains filtered or unexported fields
}

type AutoTopicOpts

type AutoTopicOpts struct {
	// contains filtered or unexported fields
}

type Branch

type Branch struct {
	// contains filtered or unexported fields
}

type BranchedStream

type BranchedStream struct {
	// contains filtered or unexported fields
}

func (*BranchedStream) Branch

func (brs *BranchedStream) Branch(name string) Stream

func (*BranchedStream) New

func (brs *BranchedStream) New(name string, predicate processors.BranchPredicate) Stream

type BuilderOpt

type BuilderOpt func(config *StreamBuilder)

func BuilderWithAdminClient

func BuilderWithAdminClient(admin kafka.Admin) BuilderOpt

func BuilderWithBackendBuilder

func BuilderWithBackendBuilder(builder backend.Builder) BuilderOpt

func BuilderWithConsumerAdaptor

func BuilderWithConsumerAdaptor(provider kafka.ConsumerProvider) BuilderOpt

func BuilderWithConsumerProvider

func BuilderWithConsumerProvider(provider kafka.GroupConsumerProvider) BuilderOpt

func BuilderWithProducerProvider

func BuilderWithProducerProvider(provider kafka.ProducerProvider) BuilderOpt

func BuilderWithStoreBuilder

func BuilderWithStoreBuilder(builder stores.Builder) BuilderOpt

type Config

type Config struct {
	// ApplicationId will be used as
	// 	1 - a consumer group when using streams, and tables
	// 	2 - topic prefix when creating repartition topics
	// 	3 - transaction ID prefix when using producer transactions
	ApplicationId string
	// BootstrapServers a list of kafka Brokers
	BootstrapServers []string
	Store            struct {
		Http struct {
			// Enabled enable state stores http server(debug purposes only)
			Enabled bool
			// Host stores http server host(eg: http://localhost:8080)
			Host string
		}
		// StateDir directory to store Persistable state stores
		StateDir  string
		Changelog struct {
			// ReplicaCount store changelog topic(auto generated) replica count
			ReplicaCount int16
		}
	}
	// InternalTopicsDefaultReplicaCount default replica count for auto generated topic(eg: repartition topics)
	InternalTopicsDefaultReplicaCount int16
	Processing                        struct {
		// ConsumerCount number of stream consumers(default:1) to run. This can be used to scale application vertically
		ConsumerCount int
		// Guarantee end to end processing guarantee. Supported values are ExactlyOnce(default) and AtLeastOnce
		Guarantee ProcessingGuarantee
		// Buffer defines min time or min mum of records before the flush starts
		Buffer tasks.BufferConfig
		// FailedMessageHandler used to handle failed messages(Process failures and serialization errors)
		FailedMessageHandler tasks.FailedMessageHandler
	}
	// Consumer default consumer properties
	Consumer *kafka.GroupConsumerConfig
	// Host application host(used to identify application instances)
	Host string
	// Producer default producer configs
	Producer *kafka.ProducerConfig
	// MetricsReporter default metrics reporter(default: NoopReporter)
	MetricsReporter metrics.Reporter
	// MetricsReporter default logger(default: NoopLogger)
	Logger log.Logger

	// RepartitionTopicFormatter repartition topic name formatter function
	// (default ApplicationId-${storeName}-repartitioned)
	RepartitionTopicNameFormatter TopicNameFormatter

	// DefaultPartitioner if defined will be used in Sink operators. Eg: To, Repartition
	DefaultPartitioner Partitioner

	// ChangelogTopicNameFormatter changelog topic name formatter function
	// (default ApplicationId-${storeName}-store-changelog)
	ChangelogTopicNameFormatter state_stores.ChangelogTopicFormatter
}

func NewStreamBuilderConfig

func NewStreamBuilderConfig() *Config

type Consumer

type Consumer interface {
	Run(topologyBuilder topology.Topology) error
	Init(topologyBuilder topology.Topology) error
	Stop() error
	Ready() error
}

type DslOption

type DslOption func(options *DslOptions)

func DslOptsSinkOptions

func DslOptsSinkOptions(opts ...KSinkOption) DslOption

func DslOptsSourceOptions

func DslOptsSourceOptions(opts ...KSourceOption) DslOption

func DslWithSinkOptions

func DslWithSinkOptions(opts ...KSinkOption) DslOption

type DslOptions

type DslOptions struct {
	StreamOptions
	// contains filtered or unexported fields
}

type GlobalStoreBuilderWrapper

type GlobalStoreBuilderWrapper struct {
	// contains filtered or unexported fields
}

func (*GlobalStoreBuilderWrapper) Build

func (s *GlobalStoreBuilderWrapper) Build(name string, options ...stores.Option) (stores.Store, error)

func (*GlobalStoreBuilderWrapper) Name

type GlobalTable

type GlobalTable interface {
	StreamTopology
	Store() topology.LoggableStoreBuilder
	Stream
}

type GlobalTableConsumer

type GlobalTableConsumer struct {
	// contains filtered or unexported fields
}

func (*GlobalTableConsumer) Init

func (g *GlobalTableConsumer) Init(topologyBuilder topology.Topology) error

Init initializes the GlobalTableConsumer with the given topology and starts consuming messages from the sources of each GlobalTableTopology in the topology. For each source, it fetches the available partitions, creates a new task with the GlobalTableTopology and adds it to the task manager. Then, it creates two goroutines to run the task: one to stop the task when the program is shutting down and another to start the task and log any errors that occur. Finally, it adds the task to the runGroup.

Parameters: - topologyBuilder: the topology builder to use for creating GlobalTableTopologies

Returns:

  • error: if there was an error fetching partitions, adding a task to the task manager, or if the task failed to start.

func (*GlobalTableConsumer) Ready

func (g *GlobalTableConsumer) Ready() error

func (*GlobalTableConsumer) Run

func (*GlobalTableConsumer) Stop

func (g *GlobalTableConsumer) Stop() error

type GlobalTableNode

type GlobalTableNode struct {
	topology.DefaultNode
	OffsetBackend backend.Backend
	StoreName     string
	// contains filtered or unexported fields
}

func (*GlobalTableNode) Build

func (*GlobalTableNode) Init

func (*GlobalTableNode) Run

func (n *GlobalTableNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*GlobalTableNode) State

func (n *GlobalTableNode) State() topology.StateStore

func (*GlobalTableNode) StateType

func (n *GlobalTableNode) StateType() topology.StateType

func (*GlobalTableNode) Type

func (n *GlobalTableNode) Type() topology.Type

func (*GlobalTableNode) WritesAt

func (n *GlobalTableNode) WritesAt() []string

type GlobalTableOption

type GlobalTableOption func(options *globalTableOptions)

func GlobalTableWithLogger

func GlobalTableWithLogger(logger log.Logger) GlobalTableOption

GlobalTableWithLogger overrides the default logger for the GlobalTable (default is log.NoopLogger).

func GlobalTableWithOffset

func GlobalTableWithOffset(offset kafka.Offset) GlobalTableOption

GlobalTableWithOffset overrides the default starting offset when GlobalTable syncing started.

func GlobalTableWithStore

func GlobalTableWithStore(store stores.Store) GlobalTableOption

func GlobalTableWithStoreChangelogOptions

func GlobalTableWithStoreChangelogOptions(opts ...state_stores.ChangelogBuilderOption) GlobalTableOption

func GlobalTableWithStoreOptions

func GlobalTableWithStoreOptions(opts ...stores.Option) GlobalTableOption

type HeaderExtractor

type HeaderExtractor func(ctx context.Context, key, val interface{}) kafka.RecordHeaders

type JoinOption

type JoinOption func(options *JoinOptions)

func JoinWithLeftValLookupFunc

func JoinWithLeftValLookupFunc(fn processors.ValueLookupFunc) JoinOption

func JoinWithRightValLookupFunc

func JoinWithRightValLookupFunc(fn processors.ValueLookupFunc) JoinOption

func JoinWithStreamOptions

func JoinWithStreamOptions(opts ...StreamOption) JoinOption

func JoinWithValueLookupFunc

func JoinWithValueLookupFunc(fn processors.ValueLookupFunc) JoinOption

type JoinOptions

type JoinOptions struct {
	// contains filtered or unexported fields
}

type JoinOpts

type JoinOpts interface {
	// contains filtered or unexported methods
}

type JoinType

type JoinType int
const (
	LeftJoin JoinType = iota
	InnerJoin
)

type KSinkBuilderConfigs

type KSinkBuilderConfigs struct {
	Topic   string
	Encoder topology.SinkEncoder
}

type KSinkOption

type KSinkOption func(sink *kSinkBuilder)

func ProduceWithHeadersExtractor

func ProduceWithHeadersExtractor(h HeaderExtractor) KSinkOption

func ProduceWithKeyEncoder

func ProduceWithKeyEncoder(encoder encoding.Encoder) KSinkOption

func ProduceWithLogger

func ProduceWithLogger(logger log.Logger) KSinkOption

func ProduceWithPartitioner

func ProduceWithPartitioner(partitioner Partitioner) KSinkOption

func ProduceWithTombstoneFilter

func ProduceWithTombstoneFilter(f Tombstoner) KSinkOption

func ProduceWithTopicNameFormatter

func ProduceWithTopicNameFormatter(formatter TopicNameFormatter) KSinkOption

func ProduceWithValEncoder

func ProduceWithValEncoder(encoder encoding.Encoder) KSinkOption

type KSource

type KSource struct {
	topology.DefaultNode
	// contains filtered or unexported fields
}

func (*KSource) AutoCreate

func (s *KSource) AutoCreate() bool

func (*KSource) Build

func (*KSource) CoPartitionedWith

func (s *KSource) CoPartitionedWith() topology.Source

func (*KSource) Encoder

func (s *KSource) Encoder() topology.SourceEncoder

func (*KSource) InitialOffset

func (s *KSource) InitialOffset() kafka.Offset

func (*KSource) Internal

func (s *KSource) Internal() bool

func (*KSource) RePartitionedAs

func (s *KSource) RePartitionedAs() topology.Source

func (*KSource) Run

func (s *KSource) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*KSource) Setup

func (*KSource) ShouldCoPartitionedWith

func (s *KSource) ShouldCoPartitionedWith(source topology.Source)

func (*KSource) Topic

func (s *KSource) Topic() string

func (*KSource) TopicConfigs

func (s *KSource) TopicConfigs() kafka.TopicConfig

func (*KSource) Type

func (s *KSource) Type() topology.Type

type KSourceOption

type KSourceOption func(*KSource)

KSourceOption is used to customize the KSource

func ConsumeWithAutoTopicCreateEnabled

func ConsumeWithAutoTopicCreateEnabled(options ...TopicOpt) KSourceOption

func ConsumeWithContextParamExtractors

func ConsumeWithContextParamExtractors(fn ...SourceCtxParamExtractor) KSourceOption

ConsumeWithContextParamExtractors adds a list of SourceCtxParamBinder func to the topology.Source and each binder will be called in a loop for each record

func ConsumeWithKeyEncoder

func ConsumeWithKeyEncoder(encoder encoding.Encoder) KSourceOption

func ConsumeWithOffset

func ConsumeWithOffset(offset kafka.Offset) KSourceOption

func ConsumeWithTopicNameFormatterFunc

func ConsumeWithTopicNameFormatterFunc(fn TopicNameFormatter) KSourceOption

func ConsumeWithValEncoder

func ConsumeWithValEncoder(encoder encoding.Encoder) KSourceOption

type KSubTopology

type KSubTopology struct {
	// contains filtered or unexported fields
}

func (*KSubTopology) AddProducer

func (t *KSubTopology) AddProducer(p kafka.Producer)

func (*KSubTopology) Close

func (t *KSubTopology) Close() error

func (*KSubTopology) Id

func (*KSubTopology) Init

func (*KSubTopology) Nodes

func (t *KSubTopology) Nodes() []topology.Node

func (*KSubTopology) Producer

func (t *KSubTopology) Producer() kafka.Producer

func (*KSubTopology) Sinks

func (t *KSubTopology) Sinks() []topology.Sink

func (*KSubTopology) Source

func (t *KSubTopology) Source(topic string) topology.Source

func (*KSubTopology) StateStores

func (t *KSubTopology) StateStores() map[string]topology.StateStore

func (*KSubTopology) Store

func (t *KSubTopology) Store(name string) topology.StateStore

type KafkaVersion

type KafkaVersion string

type KeyMapper

type KeyMapper func(ctx context.Context, key, value interface{}) (mappedKey interface{}, err error)

type LocalQueryableStoreWrapper

type LocalQueryableStoreWrapper struct {
	// contains filtered or unexported fields
}

func (*LocalQueryableStoreWrapper) Close

func (l *LocalQueryableStoreWrapper) Close() error

func (*LocalQueryableStoreWrapper) Get

func (l *LocalQueryableStoreWrapper) Get(ctx context.Context, key interface{}) (interface{}, error)

func (*LocalQueryableStoreWrapper) Instances

func (*LocalQueryableStoreWrapper) Iterator

func (*LocalQueryableStoreWrapper) KeyEncoder

func (*LocalQueryableStoreWrapper) Name

func (*LocalQueryableStoreWrapper) PrefixedIterator

func (l *LocalQueryableStoreWrapper) PrefixedIterator(ctx context.Context, keyPrefix interface{}, prefixEncoder encoding.Encoder) (stores.Iterator, error)

func (*LocalQueryableStoreWrapper) String

func (l *LocalQueryableStoreWrapper) String() string

func (*LocalQueryableStoreWrapper) ValEncoder

type MultiStoreIterator

type MultiStoreIterator struct {
	// contains filtered or unexported fields
}

func (*MultiStoreIterator) Close

func (i *MultiStoreIterator) Close()

func (*MultiStoreIterator) Error

func (i *MultiStoreIterator) Error() error

func (*MultiStoreIterator) Key

func (i *MultiStoreIterator) Key() (interface{}, error)

func (*MultiStoreIterator) Next

func (i *MultiStoreIterator) Next()

func (*MultiStoreIterator) SeekToFirst

func (i *MultiStoreIterator) SeekToFirst()

func (*MultiStoreIterator) Valid

func (i *MultiStoreIterator) Valid() bool

func (*MultiStoreIterator) Value

func (i *MultiStoreIterator) Value() (interface{}, error)

type Partitioner

type Partitioner func(ctx context.Context, key, val interface{}, numPartitions int32) (int32, error)

type ProcessingGuarantee

type ProcessingGuarantee int8
const (
	AtLeastOnce ProcessingGuarantee = iota
	ExactlyOnce
)

type QueryableStoreWrapper

type QueryableStoreWrapper interface {
	Instances() []topology.LoggableStateStore
	stores.ReadOnlyStore
}

type RepartitionOpt

type RepartitionOpt func(rpOpts *RepartitionOpts)

func RePartitionAs

func RePartitionAs(topic string) RepartitionOpt

func RePartitionWithKeyEncoder

func RePartitionWithKeyEncoder(enc encoding.Encoder) RepartitionOpt

func RePartitionWithPartitioner

func RePartitionWithPartitioner(partitioner Partitioner) RepartitionOpt

func RePartitionWithTopicNameFormatter

func RePartitionWithTopicNameFormatter(formatter TopicNameFormatter) RepartitionOpt

func RePartitionWithValEncoder

func RePartitionWithValEncoder(enc encoding.Encoder) RepartitionOpt

type RepartitionOpts

type RepartitionOpts struct {
	// contains filtered or unexported fields
}

type Runner

type Runner interface {
	Run(topology topology.Topology, opts ...RunnerOpt) error
	Stop() error
}

type RunnerContext

type RunnerContext interface {
	context.Context
	ConsumerGroupMeta() (*kafka.GroupMeta, error)
	TopicMeta() kafka.TopicMeta
}

type RunnerOpt

type RunnerOpt func(runner *streamRunner)

func NotifyGlobalStoresReady

func NotifyGlobalStoresReady(ch chan struct{}) RunnerOpt

type SinkOption

type SinkOption interface{}

type SourceCtxParamExtractor

type SourceCtxParamExtractor func(record kafka.Record) (key string, val interface{})

SourceCtxParamExtractor extracts a key:val pair from a record. Used to bind key:val pairs into the record Context.

type Stream

type Stream interface {
	Branch(branches ...processors.BranchDetails) []Stream
	Split(opts ...StreamOption) *BranchedStream
	SelectKey(selectKeyFunc processors.SelectKeyFunc, opts ...StreamOption) Stream
	Aggregate(store string, aggregatorFunc processors.AggregatorFunc, opts ...AggregateOpt) Table

	FlatMap(flatMapFunc processors.FlatMapFunc, opts ...StreamOption) Stream
	FlatMapValues(flatMapFunc processors.FlatMapValuesFunc, opts ...StreamOption) Stream
	Map(mapper processors.MapperFunc, opts ...StreamOption) Stream
	MapValue(valTransformFunc processors.MapValueFunc, opts ...StreamOption) Stream
	Filter(filter processors.FilterFunc, opts ...StreamOption) Stream
	Each(eachFunc processors.EachFunc, opts ...StreamOption) Stream
	NewProcessor(node topology.NodeBuilder, opts ...StreamOption) Stream
	JoinGlobalTable(table GlobalTable, keyMapper processors.KeyMapper, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream
	LeftJoinGlobalTable(table GlobalTable, keyMapper processors.KeyMapper, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream
	JoinTable(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream
	LeftJoinTable(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Stream

	// Through redirect the stream through an intermediate topic
	// Deprecated: use Repartition instead
	Through(topic string, options ...DslOption) Stream
	ToTable(store string, options ...TableOpt) Table
	Merge(stream Stream) Stream
	Repartition(topic string, opts ...RepartitionOpt) Stream
	AddStateStore(name string, keyEnc, valEnc encoding.Encoder, options ...state_stores.StoreBuilderOption)
	To(topic string, options ...KSinkOption)
	StreamTopology
	// contains filtered or unexported methods
}

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder(config *Config, opts ...BuilderOpt) *StreamBuilder

func (*StreamBuilder) Build

func (b *StreamBuilder) Build() (topology.Topology, error)

func (*StreamBuilder) GlobalTable

func (b *StreamBuilder) GlobalTable(topic string, keyEnc, valEnc encoding.Encoder, storeName string, options ...GlobalTableOption) GlobalTable

func (*StreamBuilder) KStream

func (b *StreamBuilder) KStream(topic string, keyEnc, valEnc encoding.Encoder, opts ...KSourceOption) Stream

func (*StreamBuilder) NewRunner

func (b *StreamBuilder) NewRunner() Runner

func (*StreamBuilder) StoreRegistry

func (b *StreamBuilder) StoreRegistry() stores.Registry

func (*StreamBuilder) Topology

func (b *StreamBuilder) Topology() topology.Builder

type StreamOption

type StreamOption func(options *StreamOptions)

func DisableRepartition

func DisableRepartition() StreamOption

DisableRepartition disables stream repartitioning even if it's marked for repartitioning. Useful when dealing with custom partitioners and joiners where message partition doesn't rely on its key.

func Named

func Named(name string) StreamOption

type StreamOptions

type StreamOptions struct {
	// contains filtered or unexported fields
}

type StreamTopology

type StreamTopology interface {
	// contains filtered or unexported methods
}

type Table

type Table interface {
	StreamTopology

	ToStream(opts ...StreamOption) Stream
	Filter(filter processors.FilterFunc, opts ...StreamOption) Table
	Each(eachFunc processors.EachFunc, opts ...StreamOption) Table
	Join(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table
	JoinGlobalTable(table GlobalTable, keyMapper processors.KeyMapper, valMapper processors.JoinValueMapper, opts ...JoinOption) Table
	LeftJoin(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table
	RightJoin(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table
	OuterJoin(table Table, valMapper processors.JoinValueMapper, opts ...JoinOption) Table
	// contains filtered or unexported methods
}

type TableOpt

type TableOpt func(opts *TableOpts)

func TableWithKeyEncoder

func TableWithKeyEncoder(enc encoding.Encoder) TableOpt

func TableWithSourceAsChangelog

func TableWithSourceAsChangelog() TableOpt

func TableWithStoreOptions

func TableWithStoreOptions(options ...state_stores.StoreBuilderOption) TableOpt

func TableWithStreamOptions

func TableWithStreamOptions(options ...StreamOption) TableOpt

func TableWithValEncoder

func TableWithValEncoder(enc encoding.Encoder) TableOpt

type TableOpts

type TableOpts struct {
	// contains filtered or unexported fields
}

type Tombstoner

type Tombstoner func(ctx context.Context, key, val interface{}) (tombstone bool)

type TopicNameFormatter

type TopicNameFormatter func(topic string) func(ctx topology.BuilderContext, nodeId topology.NodeId) string

type TopicOpt

type TopicOpt func(tpOpts *AutoTopicOpts)

func PartitionAs

func PartitionAs(src topology.Source) TopicOpt

func WithPartitionCount

func WithPartitionCount(count int32) TopicOpt

func WithReplicaCount

func WithReplicaCount(count int16) TopicOpt

func WithTopicConfigs

func WithTopicConfigs(configs map[string]string) TopicOpt

type Transformer

type Transformer func(ctx topology.NodeContext, key, value interface{})

type ValueMapper

type ValueMapper func(ctx context.Context, left, right interface{}) (joined interface{}, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL