Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferConfig ¶
type BufferConfig struct { // Size defines the min num of records before the flush // starts(This includes messages in the state store changelogs). // Please note that this value has to be lesser than the // producer queue.buffering.max.messages // Deprecated no longer applicable Size int // FlushInterval defines minimum wait time before the flush starts FlushInterval time.Duration }
type FailedMessageHandler ¶
type Generator ¶
type Generator struct{}
func (*Generator) Generate ¶
func (a *Generator) Generate(tps []kafka.TopicPartition, topologyBuilder topology.Topology) TaskGeneration
Generate generates the Task Assignment for a given TopicPartition combination by assigning partitions to SubTopologyBuilders.
type Record ¶
func NewTaskRecord ¶
type Task ¶
type Task interface { ID() TaskID Init() error Restore() error Sync() error Ready() error Start(ctx context.Context, claim kafka.PartitionClaim, groupSession kafka.GroupSession) Store(name string) topology.StateStore Stop() error }
type TaskAssignment ¶
type TaskAssignment []*TaskMapping
func (TaskAssignment) String ¶
func (a TaskAssignment) String() string
type TaskContext ¶
func (TaskContext) TaskID ¶
func (ctx TaskContext) TaskID() string
type TaskGeneration ¶
type TaskGeneration struct {
// contains filtered or unexported fields
}
func (*TaskGeneration) Assign ¶
func (g *TaskGeneration) Assign(assignment ...kafka.TopicPartition) TaskAssignment
func (*TaskGeneration) FindMappingByTP ¶
func (g *TaskGeneration) FindMappingByTP(partition kafka.TopicPartition) *TaskMapping
func (*TaskGeneration) Mappings ¶
func (g *TaskGeneration) Mappings() TaskAssignment
type TaskManager ¶
type TaskManager interface { NewTaskId(prefix string, tp kafka.TopicPartition) TaskID AddTask(ctx topology.BuilderContext, id TaskID, topology topology.SubTopologyBuilder, session kafka.GroupSession) (Task, error) AddGlobalTask(ctx topology.BuilderContext, id TaskID, topology topology.SubTopologyBuilder) (Task, error) RemoveTask(id TaskID) error Task(id TaskID) (Task, error) StoreInstances(name string) []topology.StateStore }
func NewTaskManager ¶
func NewTaskManager( builderCtx topology.BuilderContext, logger log.Logger, partitionConsumer kafka.PartitionConsumer, topologies topology.SubTopologyBuilders, transactional bool, taskOpts ...TaskOpt, ) (TaskManager, error)
type TaskMapping ¶
type TaskMapping struct { TPs []kafka.TopicPartition // contains filtered or unexported fields }
func (TaskMapping) SubTopologyBuilder ¶
func (a TaskMapping) SubTopologyBuilder() topology.SubTopologyBuilder
func (TaskMapping) TaskId ¶
func (a TaskMapping) TaskId() TaskID
type TaskOpt ¶
type TaskOpt func(*taskOptions)
func WithBufferFlushInterval ¶
func WithBufferSize ¶
func WithFailedMessageHandler ¶
func WithFailedMessageHandler(handler FailedMessageHandler) TaskOpt
Click to show internal directories.
Click to hide internal directories.