Documentation ¶
Index ¶
- func CreateTopic(port, topic string) error
- func GetLogger() zerolog.Logger
- func InitKafkaWriter(ctx context.Context, kafkaBrokers, kafkaTopic string) *kafka.Writer
- func ProcessFnWithRetryPersistence(ctx context.Context, kMsg kafka.Message, partFn TaskIdPartIdTaskDetailsFn, ...) error
- func WrapError(err error, wrap string) error
- type AttemptInf
- type CleanupFunc
- type CommitterInf
- type ErrorFn
- type ExecutionInf
- type KafkaMultiTasker
- type LogConsumer
- type MultiTaskerConfig
- type NoopCommitter
- type ProcessFn
- type SimpleCommitter
- type TaskAttempt
- type TaskIdPartIdTaskDetailsFn
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateTopic ¶
func InitKafkaWriter ¶
func ProcessFnWithRetryPersistence ¶
func ProcessFnWithRetryPersistence(ctx context.Context, kMsg kafka.Message, partFn TaskIdPartIdTaskDetailsFn, dao AttemptInf, executor ExecutionInf, retryUnknown bool, maxTries int) error
Types ¶
type AttemptInf ¶
type AttemptInf interface { FindPrevAttempts(ctx context.Context, taskId string) ([]TaskAttempt, error) CreateNewAttempt(ctx context.Context, att TaskAttempt) error UpdateAttemptStatus(ctx context.Context, att TaskAttempt, status TaskStatus, msg string) error }
type CleanupFunc ¶
type CleanupFunc func()
func InitKafkaForTesting ¶
func InitKafkaForTesting() (CleanupFunc, string, error)
Inspired by https://franklinlindemberg.medium.com/how-to-use-kafka-with-testcontainers-in-golang-applications-9266c738c879 Ingenious solution to start the kafka container with a custom cmd, read the exposed port, setup kafka using the exposed port, and then start kafka
type CommitterInf ¶
type ExecutionInf ¶
type KafkaMultiTasker ¶
type KafkaMultiTasker struct {
// contains filtered or unexported fields
}
func NewKafkaMultiTasker ¶
func NewKafkaMultiTasker(ctx context.Context, config MultiTaskerConfig) (KafkaMultiTasker, error)
func (*KafkaMultiTasker) Start ¶
func (c *KafkaMultiTasker) Start(ctx context.Context)
type LogConsumer ¶
type LogConsumer struct {
// contains filtered or unexported fields
}
func (*LogConsumer) Accept ¶
func (lc *LogConsumer) Accept(l testcontainers.Log)
Accept prints the log to stdout
type MultiTaskerConfig ¶
type SimpleCommitter ¶
func (*SimpleCommitter) CommitMessages ¶
type TaskAttempt ¶
type TaskAttempt struct { TaskId string `json:"taskId,omitempty"` AttemptId int `json:"attemptId,omitempty"` TimeMs int64 `json:"timeMs,omitempty"` Status TaskStatus `json:"status,omitempty"` Msg string `json:"msg,omitempty"` }
type TaskStatus ¶
type TaskStatus int
const ( TASK_STATUS_PRE TaskStatus = 1 TASK_STATUS_ATTEMPT TaskStatus = 2 TASK_STATUS_SUCCESS TaskStatus = 3 TASK_STATUS_FAILURE TaskStatus = 4 TASK_STATUS_CANCELLED TaskStatus = 5 )
Click to show internal directories.
Click to hide internal directories.