taskqueue

package
v1.21.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2024 License: MIT Imports: 9 Imported by: 0

README

x/taskqueue - Golang task queue with transactional support

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Description

type Description struct {
	Change []string
	Entity string
	Filter string
}

type FunctionProcessor

type FunctionProcessor[T any] struct {
	F func(task Task[T])
}

func (*FunctionProcessor[T]) Process

func (proc *FunctionProcessor[T]) Process(task Task[T]) error

type Processor

type Processor[T any] interface {
	Process(task Task[T]) error
}

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

func NewInMemoryQueue

func NewInMemoryQueue[T any]() *Queue[T]

func (*Queue[T]) Close

func (q *Queue[T]) Close() error

func (*Queue[T]) Delete

func (*Queue[T]) Delete(ctx context.Context, tasks []Task[T]) error

func (*Queue[T]) Pop

func (q *Queue[T]) Pop(ctx context.Context) ([]Task[T], error)

func (*Queue[T]) Push

func (q *Queue[T]) Push(ctx context.Context, task Task[T]) error

type Queuer

type Queuer[T any] interface {
	Push(ctx context.Context, task Task[T]) error
	Pop(ctx context.Context) ([]Task[T], error)
	Delete(ctx context.Context, tasks []Task[T]) error
}

type SQSQueue

type SQSQueue[T any] struct {
	// contains filtered or unexported fields
}

SQSQueue is a queue that uses AWS SQS as a backend.

func NewSQSQueue

func NewSQSQueue(c *sqs.Client, queueURL string) *SQSQueue[schemaless.Record[schema.Schema]]

func (*SQSQueue[T]) Delete

func (queue *SQSQueue[T]) Delete(ctx context.Context, tasks []Task[T]) error

func (*SQSQueue[T]) Pop

func (queue *SQSQueue[T]) Pop(ctx context.Context) ([]Task[T], error)

func (*SQSQueue[T]) Push

func (queue *SQSQueue[T]) Push(ctx context.Context, task Task[T]) error

type Task

type Task[T any] struct {
	ID   string
	Data T
	Meta map[string]string
}

type TaskQueue

type TaskQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewTaskQueue

func NewTaskQueue[T any](
	desc *Description,
	queue Queuer[schemaless.Record[T]],
	find schemaless.Repository[T],
	stream schemaless.AppendLoger[T],
	proc Processor[schemaless.Record[T]],
) *TaskQueue[T]

func (*TaskQueue[T]) RunCDC

func (q *TaskQueue[T]) RunCDC(ctx context.Context) error

func (*TaskQueue[T]) RunProcessor

func (q *TaskQueue[T]) RunProcessor(ctx context.Context) error

func (*TaskQueue[T]) RunSelector

func (q *TaskQueue[T]) RunSelector(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL