pipeline

package
v0.0.0-...-ed06964 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueWorkerCount      = 20
	QueueWorkerBufferSize = QueueWorkerCount * 1_000 // 1k jobs per worker
)

Variables

This section is empty.

Functions

func NewPostgresQueueDriver

func NewPostgresQueueDriver[T any](pool *pgxpool.Pool, channelName string) *postgresQueueDriver[T]

Types

type Enqueuer

type Enqueuer[T any] interface {
	Enqueue(context.Context, T)
}

type InMemoryQueueDriver

type InMemoryQueueDriver[T any] struct {
	// contains filtered or unexported fields
}

func NewInMemoryQueueDriver

func NewInMemoryQueueDriver[T any](name string) *InMemoryQueueDriver[T]

func (InMemoryQueueDriver[T]) Enqueue

func (qd InMemoryQueueDriver[T]) Enqueue(item T)

func (*InMemoryQueueDriver[T]) SetListener

func (qd *InMemoryQueueDriver[T]) SetListener(l Listener[T])

func (InMemoryQueueDriver[T]) Start

func (qd InMemoryQueueDriver[T]) Start()

func (InMemoryQueueDriver[T]) Stop

func (qd InMemoryQueueDriver[T]) Stop()

type InputQueueSetter

type InputQueueSetter[T any] interface {
	SetInputQueue(Enqueuer[T])
}

type Listener

type Listener[T any] interface {
	Listen(T)
}

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](cfg queueConfigurer[T], steps ...Step[T]) *Pipeline[T]

func (*Pipeline[T]) Begin

func (p *Pipeline[T]) Begin(ctx context.Context, item T)

func (*Pipeline[T]) GetQueueForStep

func (p *Pipeline[T]) GetQueueForStep(i int) *Queue[T]

func (*Pipeline[T]) Start

func (p *Pipeline[T]) Start()

func (*Pipeline[T]) Stop

func (p *Pipeline[T]) Stop()

type Queue

type Queue[T any] struct {
	EnqueuePreprocessorFn func(context.Context, T) T
	ListenPreprocessorFn  func(context.Context, T) (context.Context, T)
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T]) *Queue[T]

func (Queue[T]) Enqueue

func (q Queue[T]) Enqueue(ctx context.Context, item T)

func (*Queue[T]) InitializeMetrics

func (q *Queue[T]) InitializeMetrics(meter metric.Meter)

func (Queue[T]) Listen

func (q Queue[T]) Listen(item T)

func (*Queue[T]) SetDriver

func (q *Queue[T]) SetDriver(driver QueueDriver[T])

func (*Queue[T]) Stop

func (q *Queue[T]) Stop()

type QueueDriver

type QueueDriver[T any] interface {
	Enqueue(T)
	SetListener(Listener[T])
}

type QueueItemProcessor

type QueueItemProcessor[T any] interface {
	ProcessItem(context.Context, T)
}

type Step

type Step[T any] struct {
	Driver           workerDriver[T]
	Processor        StepProcessor[T]
	InputQueueOffset int
}

type StepProcessor

type StepProcessor[T any] interface {
	QueueItemProcessor[T]
	SetOutputQueue(Enqueuer[T])
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL