Documentation ¶
Index ¶
- Constants
- Variables
- func ContainsAny[E comparable](s []E, v []E) bool
- func KVStore[K, V any](storeBuilder func(name string, p int32) (StoreBackend, error), ...) func(name string, p int32) (Store, error)
- func MustRegisterProcessor[Kin, Vin, Kout, Vout any](t *TopologyBuilder, p ProcessorBuilder[Kin, Vin, Kout, Vout], name string, ...)
- func MustRegisterSink[K, V any](t *TopologyBuilder, name, topic string, keySerializer Serializer[K], ...)
- func MustRegisterSource[K, V any](t *TopologyBuilder, name string, topic string, keyDeserializer Deserializer[K], ...)
- func MustSetParent(t *TopologyBuilder, parent, child string)
- func NewPartitionGroupBalancer(log *slog.Logger, pgs []*PartitionGroup) kgo.GroupBalancer
- func NullLogger() *slog.Logger
- func RegisterProcessor[Kin, Vin, Kout, Vout any](t *TopologyBuilder, p ProcessorBuilder[Kin, Vin, Kout, Vout], name string, ...) error
- func RegisterSink[K, V any](t *TopologyBuilder, name, topic string, keySerializer Serializer[K], ...) error
- func RegisterSource[K, V any](t *TopologyBuilder, name string, topic string, keyDeserializer Deserializer[K], ...) error
- func RegisterStore(t *TopologyBuilder, storeBuilder StoreBuilder, name string)
- func SetParent(t *TopologyBuilder, parent, child string) error
- type App
- type AssignedOrRevoked
- type BalanceError
- type Deserializer
- type Flusher
- type Input
- type InputProcessor
- type InternalProcessorContext
- type KeyValueStore
- func (t *KeyValueStore[K, V]) Checkpoint(ctx context.Context, id string) error
- func (t *KeyValueStore[K, V]) Close() error
- func (t *KeyValueStore[K, V]) Flush() error
- func (t *KeyValueStore[K, V]) Get(k K) (V, error)
- func (t *KeyValueStore[K, V]) Init() error
- func (t *KeyValueStore[K, V]) Set(k K, v V) error
- type Nexter
- type Node
- type NullWriter
- type Option
- type Output
- type PartitionGroup
- type PartitionGroupBalancer
- func (w *PartitionGroupBalancer) IsCooperative() bool
- func (w *PartitionGroupBalancer) JoinGroupMetadata(topicInterests []string, currentAssignment map[string][]int32, ...) []byte
- func (w *PartitionGroupBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (b kgo.GroupMemberBalancer, topics map[string]struct{}, err error)
- func (w *PartitionGroupBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error)
- func (w *PartitionGroupBalancer) ProtocolName() string
- type Processor
- type ProcessorBuilder
- type ProcessorContext
- type ProcessorInterceptor
- type ProcessorNode
- type Record
- type RecordProcessor
- type RoutineState
- type SerDe
- type Serializer
- type SinkNode
- type SourceNode
- type Store
- type StoreBackend
- type StoreBackendBuilder
- type StoreBuilder
- type Task
- func (t *Task) ClearOffsets()
- func (t *Task) Close(ctx context.Context) error
- func (t *Task) Flush(ctx context.Context) error
- func (t *Task) GetOffsetsToCommit() map[string]kgo.EpochOffset
- func (t *Task) Init() error
- func (t *Task) Process(ctx context.Context, records ...*kgo.Record) error
- func (t *Task) String() string
- type TaskManager
- func (t *TaskManager) Assigned(assigned map[string][]int32) error
- func (t *TaskManager) Close(ctx context.Context) error
- func (t *TaskManager) Commit(ctx context.Context) error
- func (t *TaskManager) Revoked(revoked map[string][]int32) error
- func (t *TaskManager) TaskFor(topic string, partition int32) (*Task, error)
- type Topology
- type TopologyBuilder
- type TopologyProcessor
- type TopologySink
- type TopologySource
- type TopologyStore
- type Worker
- type WrappingMemberBalancer
Constants ¶
const ( StateCreated = "CREATED" StatePartitionsAssigned = "PARTITIONS_ASSIGNED" StateRunning = "RUNNING" StateCloseRequested = "CLOSE_REQUESTED" StateClosed = "CLOSED" )
Variables ¶
var ErrInternal = errors.New("internal")
var (
ErrKeyNotFound = errors.New("store: key not found")
)
var ErrNodeAlreadyExists = errors.New("node exists already")
var ErrNodeNotFound = errors.New("node not found")
var ErrTaskNotFound = errors.New("task not found")
var WithBrokers = func(brokers []string) Option { return func(s *App) { s.brokers = brokers } }
var WithCommitInterval = func(commitInterval time.Duration) Option { return func(s *App) { s.commitInterval = commitInterval } }
var WithLog = func(log *slog.Logger) Option { return func(s *App) { s.log = log } }
var WithWorkersCount = func(n int) Option { return func(s *App) { s.numRoutines = n } }
Functions ¶
func ContainsAny ¶
func ContainsAny[E comparable](s []E, v []E) bool
Contains reports whether v is present in s.
func MustRegisterProcessor ¶
func MustRegisterProcessor[Kin, Vin, Kout, Vout any](t *TopologyBuilder, p ProcessorBuilder[Kin, Vin, Kout, Vout], name string, parent string, stores ...string)
func MustRegisterSink ¶
func MustRegisterSink[K, V any](t *TopologyBuilder, name, topic string, keySerializer Serializer[K], valueSerializer Serializer[V], parent string)
func MustRegisterSource ¶
func MustRegisterSource[K, V any](t *TopologyBuilder, name string, topic string, keyDeserializer Deserializer[K], valueDeserializer Deserializer[V])
func MustSetParent ¶
func MustSetParent(t *TopologyBuilder, parent, child string)
func NewPartitionGroupBalancer ¶
func NewPartitionGroupBalancer(log *slog.Logger, pgs []*PartitionGroup) kgo.GroupBalancer
func NullLogger ¶
func RegisterProcessor ¶
func RegisterProcessor[Kin, Vin, Kout, Vout any](t *TopologyBuilder, p ProcessorBuilder[Kin, Vin, Kout, Vout], name string, parent string, stores ...string) error
func RegisterSink ¶
func RegisterSink[K, V any](t *TopologyBuilder, name, topic string, keySerializer Serializer[K], valueSerializer Serializer[V], parent string) error
func RegisterSource ¶
func RegisterSource[K, V any](t *TopologyBuilder, name string, topic string, keyDeserializer Deserializer[K], valueDeserializer Deserializer[V]) error
func RegisterStore ¶
func RegisterStore(t *TopologyBuilder, storeBuilder StoreBuilder, name string)
func SetParent ¶
func SetParent(t *TopologyBuilder, parent, child string) error
Types ¶
type AssignedOrRevoked ¶
type BalanceError ¶
type BalanceError struct {
// contains filtered or unexported fields
}
func (*BalanceError) IntoSyncAssignment ¶
func (e *BalanceError) IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment
func (*BalanceError) IntoSyncAssignmentOrError ¶
func (e *BalanceError) IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error)
type Deserializer ¶
type InputProcessor ¶
InputProcessor is a partial interface covering only the generic input K/V, without requiring the caller to know the generic types of the output.
type InternalProcessorContext ¶
type InternalProcessorContext[Kout any, Vout any] struct { // contains filtered or unexported fields }
func NewInternalkProcessorContext ¶
func NewInternalkProcessorContext[Kout any, Vout any]( outputs map[string]InputProcessor[Kout, Vout], stores map[string]Store, ) *InternalProcessorContext[Kout, Vout]
func (*InternalProcessorContext[Kout, Vout]) Forward ¶
func (c *InternalProcessorContext[Kout, Vout]) Forward(ctx context.Context, k Kout, v Vout)
func (*InternalProcessorContext[Kout, Vout]) ForwardTo ¶
func (c *InternalProcessorContext[Kout, Vout]) ForwardTo(ctx context.Context, k Kout, v Vout, childName string)
func (*InternalProcessorContext[Kout, Vout]) GetStore ¶
func (c *InternalProcessorContext[Kout, Vout]) GetStore(name string) Store
type KeyValueStore ¶
type KeyValueStore[K, V any] struct { // contains filtered or unexported fields }
func NewKeyValueStore ¶
func NewKeyValueStore[K, V any]( store StoreBackend, keySerializer Serializer[K], valueSerializer Serializer[V], keyDeserializer Deserializer[K], valueDeserializer Deserializer[V], ) *KeyValueStore[K, V]
func (*KeyValueStore[K, V]) Checkpoint ¶
func (t *KeyValueStore[K, V]) Checkpoint(ctx context.Context, id string) error
func (*KeyValueStore[K, V]) Close ¶
func (t *KeyValueStore[K, V]) Close() error
func (*KeyValueStore[K, V]) Flush ¶
func (t *KeyValueStore[K, V]) Flush() error
func (*KeyValueStore[K, V]) Get ¶
func (t *KeyValueStore[K, V]) Get(k K) (V, error)
func (*KeyValueStore[K, V]) Init ¶
func (t *KeyValueStore[K, V]) Init() error
func (*KeyValueStore[K, V]) Set ¶
func (t *KeyValueStore[K, V]) Set(k K, v V) error
type Nexter ¶
type Nexter[K, V any] interface { AddNext(InputProcessor[K, V]) }
type Node ¶
Node does not know about any specific types of nodes, because it would otherwise need to have an ounbounded number of generic types. Generic types are hidden inside the actual implementations using the Node interfaces.
type NullWriter ¶
type NullWriter struct{}
type PartitionGroup ¶
type PartitionGroup struct {
// contains filtered or unexported fields
}
PartitionGroup is a sub-graph of nodes that must be co-partitioned as they depend on each other.
type PartitionGroupBalancer ¶
type PartitionGroupBalancer struct {
// contains filtered or unexported fields
}
PartitionGroupBalancer is a balancer that uses kgo's Cooperative-sticky balancer under the hood, but enforces co-partitioning as defined by the given PartitionGroups.
func (*PartitionGroupBalancer) IsCooperative ¶
func (w *PartitionGroupBalancer) IsCooperative() bool
func (*PartitionGroupBalancer) JoinGroupMetadata ¶
func (*PartitionGroupBalancer) MemberBalancer ¶
func (w *PartitionGroupBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (b kgo.GroupMemberBalancer, topics map[string]struct{}, err error)
func (*PartitionGroupBalancer) ParseSyncAssignment ¶
func (w *PartitionGroupBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error)
func (*PartitionGroupBalancer) ProtocolName ¶
func (w *PartitionGroupBalancer) ProtocolName() string
type Processor ¶
type Processor[Kin any, Vin any, Kout any, Vout any] interface { Init(ProcessorContext[Kout, Vout]) error Close() error Process(ctx context.Context, k Kin, v Vin) error }
Processor is a low-level interface. The implementation can retain the ProcessorContext passed into Init and use it to access state stores and forward data to downstream nodes. This is fairly low-level and allows for a lot of flexibility, but may be inconvenient to use for more specialized use cases. More high-level interfaces can be built on top of this, i.e. a Processor that receives input, and forwards it to only one downstream node.
type ProcessorBuilder ¶
ProcessorBuilder creates an actual processor for a specific TopicPartition.
type ProcessorContext ¶
type ProcessorContext[Kout any, Vout any] interface { // Forward to all child nodes. Forward(ctx context.Context, k Kout, v Vout) // Forward to specific child node. Panics if child node is not found. ForwardTo(ctx context.Context, k Kout, v Vout, childName string) // TBD: should forward return error ? or are errs...ignored? // Get state store by name. Returns nil if not found. GetStore(name string) Store }
type ProcessorInterceptor ¶
type ProcessorNode ¶
type ProcessorNode[Kin any, Vin any, Kout any, Vout any] struct { // contains filtered or unexported fields }
func (*ProcessorNode[Kin, Vin, Kout, Vout]) Close ¶
func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Close() error
func (*ProcessorNode[Kin, Vin, Kout, Vout]) Init ¶
func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Init() error
type RecordProcessor ¶
type RoutineState ¶
type RoutineState string
type SerDe ¶
type SerDe[T any] struct { Serializer Serializer[T] Deserializer Deserializer[T] }
type Serializer ¶
type SinkNode ¶
type SinkNode[K any, V any] struct { KeySerializer Serializer[K] ValueSerializer Serializer[V] // contains filtered or unexported fields }
func NewSinkNode ¶
func NewSinkNode[K, V any](client *kgo.Client, topic string, keySerializer Serializer[K], valueSerializer Serializer[V]) *SinkNode[K, V]
type SourceNode ¶
type SourceNode[K any, V any] struct { KeyDeserializer Deserializer[K] ValueDeserializer Deserializer[V] DownstreamProcessors []InputProcessor[K, V] }
SourceNode[K,V] receives kgo records, and forward these to all downstream processors.
func (*SourceNode[K, V]) AddNext ¶
func (n *SourceNode[K, V]) AddNext(next InputProcessor[K, V])
type StoreBackend ¶
type StoreBackendBuilder ¶
type StoreBackendBuilder func(name string, p int32) (StoreBackend, error)
type StoreBuilder ¶
TODO/FIXME make store name part of params
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func (*Task) ClearOffsets ¶
func (t *Task) ClearOffsets()
func (*Task) GetOffsetsToCommit ¶
func (t *Task) GetOffsetsToCommit() map[string]kgo.EpochOffset
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
func (*TaskManager) Assigned ¶
func (t *TaskManager) Assigned(assigned map[string][]int32) error
Assigned handles topic-partition assignment: it creates tasks as needed.
func (*TaskManager) Commit ¶
func (t *TaskManager) Commit(ctx context.Context) error
Commit triggers a commit. This flushes all tasks' stores, and then performs a commit of all tasks' processed records.
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
Topology is a fully built DAG that can be used in a kstreams app.
func (*Topology) CreateTask ¶
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() (*Topology, error)
func (*TopologyBuilder) MustBuild ¶
func (tb *TopologyBuilder) MustBuild() *Topology
type TopologyProcessor ¶
type TopologySource ¶
type TopologyStore ¶
type TopologyStore struct { Name string Build StoreBuilder }
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func NewWorker ¶
func NewWorker(log *slog.Logger, name string, t *Topology, group string, brokers []string, commitInterval time.Duration) (*Worker, error)
Config
type WrappingMemberBalancer ¶
type WrappingMemberBalancer struct {
// contains filtered or unexported fields
}
func (*WrappingMemberBalancer) Balance ¶
func (wb *WrappingMemberBalancer) Balance(topics map[string]int32) kgo.IntoSyncAssignment
func (*WrappingMemberBalancer) BalanceOrError ¶
func (wb *WrappingMemberBalancer) BalanceOrError(topics map[string]int32) (kgo.IntoSyncAssignment, error)