tasks

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer interface {
	Init() error
	Add(record *Record) error
	Flush() error
	Close() error
	Reset(dueTo error) error
	Records() []*Record
	Closing() bool
	MarkAsCosing()
}

type BufferConfig

type BufferConfig struct {
	// Size defines the min num of records before the flush
	// starts(This includes messages in the state store changelogs).
	// Please note that this value has to be lesser than the
	// producer queue.buffering.max.messages
	// Deprecated no longer applicable
	Size int
	// FlushInterval defines minimum wait time before the flush starts
	FlushInterval time.Duration
}

type FailedMessageHandler

type FailedMessageHandler func(err error, record kafka.Record)

type Generator

type Generator struct{}

func (*Generator) Generate

func (a *Generator) Generate(tps []kafka.TopicPartition, topologyBuilder topology.Topology) TaskGeneration

Generate generates the Task Assignment for a given TopicPartition combination by assigning partitions to SubTopologyBuilders.

type OnFlush

type OnFlush func(records []*Record) error

type Record

type Record struct {
	kafka.Record
	// contains filtered or unexported fields
}

func NewTaskRecord

func NewTaskRecord(record kafka.Record) *Record

type Task

type Task interface {
	ID() TaskID
	Init() error
	Restore() error
	Sync() error
	Ready() error
	Start(ctx context.Context, claim kafka.PartitionClaim, groupSession kafka.GroupSession)
	Store(name string) topology.StateStore
	Stop() error
}

type TaskAssignment

type TaskAssignment []*TaskMapping

func (TaskAssignment) String

func (a TaskAssignment) String() string

type TaskContext

type TaskContext struct {
	context.Context
}

func (TaskContext) TaskID

func (ctx TaskContext) TaskID() string

type TaskGeneration

type TaskGeneration struct {
	// contains filtered or unexported fields
}

func (*TaskGeneration) Assign

func (g *TaskGeneration) Assign(assignment ...kafka.TopicPartition) TaskAssignment

func (*TaskGeneration) FindMappingByTP

func (g *TaskGeneration) FindMappingByTP(partition kafka.TopicPartition) *TaskMapping

func (*TaskGeneration) Mappings

func (g *TaskGeneration) Mappings() TaskAssignment

type TaskID

type TaskID interface {
	String() string
	UniqueID() string
	Partition() int32
	Topics() string
}

type TaskManager

type TaskManager interface {
	NewTaskId(prefix string, tp kafka.TopicPartition) TaskID
	AddTask(ctx topology.BuilderContext, id TaskID, topology topology.SubTopologyBuilder, session kafka.GroupSession) (Task, error)
	AddGlobalTask(ctx topology.BuilderContext, id TaskID, topology topology.SubTopologyBuilder) (Task, error)
	RemoveTask(id TaskID) error
	Task(id TaskID) (Task, error)
	StoreInstances(name string) []topology.StateStore
}

func NewTaskManager

func NewTaskManager(
	builderCtx topology.BuilderContext,
	logger log.Logger,
	partitionConsumer kafka.PartitionConsumer,
	topologies topology.SubTopologyBuilders,
	transactional bool,
	taskOpts ...TaskOpt,
) (TaskManager, error)

type TaskMapping

type TaskMapping struct {
	TPs []kafka.TopicPartition
	// contains filtered or unexported fields
}

func (TaskMapping) SubTopologyBuilder

func (a TaskMapping) SubTopologyBuilder() topology.SubTopologyBuilder

func (TaskMapping) TaskId

func (a TaskMapping) TaskId() TaskID

type TaskOpt

type TaskOpt func(*taskOptions)

func WithBufferFlushInterval

func WithBufferFlushInterval(interval time.Duration) TaskOpt

func WithBufferSize

func WithBufferSize(size int) TaskOpt

func WithFailedMessageHandler

func WithFailedMessageHandler(handler FailedMessageHandler) TaskOpt

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL