queue

package
v1.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2025 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	Tasks []Task
	Ctx   context.Context
	// contains filtered or unexported fields
}

func (*Batch) Cancel

func (b *Batch) Cancel()

func (*Batch) Done

func (b *Batch) Done()

type BeforeScheduleHook

type BeforeScheduleHook interface {
	BeforeSchedule() bool
}

type DiskQueue

type DiskQueue struct {
	// Logger for the queue. Wrappers of this queue should use this logger.
	Logger logrus.FieldLogger
	// contains filtered or unexported fields
}

func NewDiskQueue

func NewDiskQueue(opt DiskQueueOptions) (*DiskQueue, error)

func (*DiskQueue) Close

func (q *DiskQueue) Close() error

Close the queue, prevent further pushes and unregister it from the scheduler.

func (*DiskQueue) DequeueBatch

func (q *DiskQueue) DequeueBatch() (batch *Batch, err error)

func (*DiskQueue) Drop

func (q *DiskQueue) Drop() error

func (*DiskQueue) Flush

func (q *DiskQueue) Flush() error

func (*DiskQueue) ID

func (q *DiskQueue) ID() string

func (*DiskQueue) Init

func (q *DiskQueue) Init() error

func (*DiskQueue) Metrics

func (q *DiskQueue) Metrics() *Metrics

func (*DiskQueue) Pause

func (q *DiskQueue) Pause()

func (*DiskQueue) Push

func (q *DiskQueue) Push(record []byte) error

func (*DiskQueue) Resume

func (q *DiskQueue) Resume()

func (*DiskQueue) Scheduler

func (q *DiskQueue) Scheduler() *Scheduler

func (*DiskQueue) Size

func (q *DiskQueue) Size() int64

func (*DiskQueue) Wait

func (q *DiskQueue) Wait()

type DiskQueueOptions

type DiskQueueOptions struct {
	// Required
	ID          string
	Scheduler   *Scheduler
	Dir         string
	TaskDecoder TaskDecoder

	// Optional
	Logger           logrus.FieldLogger
	StaleTimeout     time.Duration
	ChunkSize        uint64
	OnBatchProcessed func()
	Metrics          *Metrics
}

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(
	logger logrus.FieldLogger,
	prom *monitoring.PrometheusMetrics,
	labels prometheus.Labels,
) *Metrics

func (*Metrics) DiskUsage

func (m *Metrics) DiskUsage(size int64)

func (*Metrics) Paused

func (m *Metrics) Paused(id string)

func (*Metrics) Registered

func (m *Metrics) Registered(id string)

func (*Metrics) Resumed

func (m *Metrics) Resumed(id string)

func (*Metrics) Size

func (m *Metrics) Size(size uint64)

func (*Metrics) TasksProcessed

func (m *Metrics) TasksProcessed(start time.Time, count int)

func (*Metrics) Unregistered

func (m *Metrics) Unregistered(id string)

type Queue

type Queue interface {
	ID() string
	Size() int64
	DequeueBatch() (batch *Batch, err error)
	Metrics() *Metrics
}

type Scheduler

type Scheduler struct {
	SchedulerOptions
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(opts SchedulerOptions) *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close() error

func (*Scheduler) PauseQueue

func (s *Scheduler) PauseQueue(id string)

func (*Scheduler) RegisterQueue

func (s *Scheduler) RegisterQueue(q Queue)

func (*Scheduler) ResumeQueue

func (s *Scheduler) ResumeQueue(id string)

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context)

Manually schedule the queues.

func (*Scheduler) Start

func (s *Scheduler) Start()

func (*Scheduler) UnregisterQueue

func (s *Scheduler) UnregisterQueue(id string)

func (*Scheduler) Wait

func (s *Scheduler) Wait(id string)

func (*Scheduler) WaitAll

func (s *Scheduler) WaitAll()

type SchedulerOptions

type SchedulerOptions struct {
	Logger logrus.FieldLogger
	// Number of workers to process tasks. Defaults to the number of CPUs - 1.
	Workers int
	// The interval at which the scheduler checks the queues for tasks. Defaults to 1 second.
	ScheduleInterval time.Duration
	// Function to be called when the scheduler is closed
	OnClose func()
}

type Task

type Task interface {
	Op() uint8
	Key() uint64
	Execute(ctx context.Context) error
}

type TaskDecoder

type TaskDecoder interface {
	DecodeTask([]byte) (Task, error)
}

type TaskGrouper

type TaskGrouper interface {
	NewGroup(op uint8, tasks ...Task) Task
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(logger logrus.FieldLogger, retryInterval time.Duration) (*Worker, chan *Batch)

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL