tasker

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2024 License: GPL-3.0 Imports: 16 Imported by: 0

README

kafka-tasker

A fault tolerant tasker using Kafka

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateTopic

func CreateTopic(port, topic string) error

func GetLogger

func GetLogger() zerolog.Logger

func InitKafkaWriter

func InitKafkaWriter(ctx context.Context, kafkaBrokers, kafkaTopic string) *kafka.Writer

func ProcessFnWithRetryPersistence

func ProcessFnWithRetryPersistence(ctx context.Context, kMsg kafka.Message, partFn TaskIdPartIdTaskDetailsFn, dao AttemptInf, executor ExecutionInf, retryUnknown bool, maxTries int) error

func WrapError

func WrapError(err error, wrap string) error

Types

type AttemptInf

type AttemptInf interface {
	FindPrevAttempts(ctx context.Context, taskId string) ([]TaskAttempt, error)
	CreateNewAttempt(ctx context.Context, att TaskAttempt) error
	UpdateAttemptStatus(ctx context.Context, att TaskAttempt, status TaskStatus, msg string) error
}

type CleanupFunc

type CleanupFunc func()

func InitKafkaForTesting

func InitKafkaForTesting() (CleanupFunc, string, error)

Inspired by https://franklinlindemberg.medium.com/how-to-use-kafka-with-testcontainers-in-golang-applications-9266c738c879 Ingenious solution to start the kafka container with a custom cmd, read the exposed port, setup kafka using the exposed port, and then start kafka

type CommitterInf

type CommitterInf interface {
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error
}

type ErrorFn

type ErrorFn func(ctx context.Context, kMsg kafka.Message) error

type ExecutionInf

type ExecutionInf interface {
	Execute(ctx context.Context, taskId string, attemptId int, prevAttempts []TaskAttempt, taskDetails any) (bool, string, error)
}

type KafkaMultiTasker

type KafkaMultiTasker struct {
	// contains filtered or unexported fields
}

func NewKafkaMultiTasker

func NewKafkaMultiTasker(ctx context.Context, config MultiTaskerConfig) (KafkaMultiTasker, error)

func (*KafkaMultiTasker) Start

func (c *KafkaMultiTasker) Start(ctx context.Context)

type LogConsumer

type LogConsumer struct {
	// contains filtered or unexported fields
}

func (*LogConsumer) Accept

func (lc *LogConsumer) Accept(l testcontainers.Log)

Accept prints the log to stdout

type MultiTaskerConfig

type MultiTaskerConfig struct {
	Reader          *kafka.Reader
	PartFn          TaskIdPartIdTaskDetailsFn
	ProcessFn       ProcessFn
	ErrorHandler    ErrorFn
	NumTasksPerPart int
}

type NoopCommitter

type NoopCommitter struct {
}

func (*NoopCommitter) CommitMessages

func (c *NoopCommitter) CommitMessages(ctx context.Context, msgs ...kafka.Message) error

type ProcessFn

type ProcessFn func(ctx context.Context, kMsg kafka.Message) (bool, error)

type SimpleCommitter

type SimpleCommitter struct {
	Reader *kafka.Reader
}

func (*SimpleCommitter) CommitMessages

func (c *SimpleCommitter) CommitMessages(ctx context.Context, msgs ...kafka.Message) error

type TaskAttempt

type TaskAttempt struct {
	TaskId    string     `json:"taskId,omitempty"`
	AttemptId int        `json:"attemptId,omitempty"`
	TimeMs    int64      `json:"timeMs,omitempty"`
	Status    TaskStatus `json:"status,omitempty"`
	Msg       string     `json:"msg,omitempty"`
}

type TaskIdPartIdTaskDetailsFn

type TaskIdPartIdTaskDetailsFn func(ctx context.Context, kMsg kafka.Message) (string, string, any)

type TaskStatus

type TaskStatus int
const (
	TASK_STATUS_PRE       TaskStatus = 1
	TASK_STATUS_ATTEMPT   TaskStatus = 2
	TASK_STATUS_SUCCESS   TaskStatus = 3
	TASK_STATUS_FAILURE   TaskStatus = 4
	TASK_STATUS_CANCELLED TaskStatus = 5
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL