pipeline

package
v0.15.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueWorkerCount      = 20
	QueueWorkerBufferSize = QueueWorkerCount * 1_000 // 1k jobs per worker
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DriverFactory added in v0.14.5

type DriverFactory[T any] struct {
	// contains filtered or unexported fields
}

func NewDriverFactory added in v0.14.5

func NewDriverFactory[T any](natsConn *nats.Conn) DriverFactory[T]

func (DriverFactory[T]) NewDriver added in v0.14.5

func (df DriverFactory[T]) NewDriver(channelName string) WorkerDriver[T]

type Enqueuer

type Enqueuer[T any] interface {
	Enqueue(context.Context, T)
}

type InMemoryQueueDriver

type InMemoryQueueDriver[T any] struct {
	// contains filtered or unexported fields
}

func NewInMemoryQueueDriver

func NewInMemoryQueueDriver[T any](name string) *InMemoryQueueDriver[T]

func (InMemoryQueueDriver[T]) Enqueue

func (qd InMemoryQueueDriver[T]) Enqueue(item T)

func (*InMemoryQueueDriver[T]) SetListener

func (qd *InMemoryQueueDriver[T]) SetListener(l Listener[T])

func (InMemoryQueueDriver[T]) Start

func (qd InMemoryQueueDriver[T]) Start()

func (InMemoryQueueDriver[T]) Stop

func (qd InMemoryQueueDriver[T]) Stop()

type InputQueueSetter

type InputQueueSetter[T any] interface {
	SetInputQueue(Enqueuer[T])
}

type Listener

type Listener[T any] interface {
	Listen(T)
}

type NatsDriver added in v0.14.5

type NatsDriver[T any] struct {
	// contains filtered or unexported fields
}

func NewNatsDriver added in v0.14.5

func NewNatsDriver[T any](conn *nats.Conn, topic string) *NatsDriver[T]

func (*NatsDriver[T]) Enqueue added in v0.14.5

func (d *NatsDriver[T]) Enqueue(msg T)

func (*NatsDriver[T]) SetListener added in v0.14.5

func (d *NatsDriver[T]) SetListener(listener Listener[T])

SetListener implements QueueDriver.

func (*NatsDriver[T]) Start added in v0.14.5

func (d *NatsDriver[T]) Start()

func (*NatsDriver[T]) Stop added in v0.14.5

func (d *NatsDriver[T]) Stop()

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](cfg queueConfigurer[T], steps ...Step[T]) *Pipeline[T]

func (*Pipeline[T]) Begin

func (p *Pipeline[T]) Begin(ctx context.Context, item T)

func (*Pipeline[T]) GetQueueForStep

func (p *Pipeline[T]) GetQueueForStep(i int) *Queue[T]

func (*Pipeline[T]) Start

func (p *Pipeline[T]) Start()

func (*Pipeline[T]) Stop

func (p *Pipeline[T]) Stop()

type Queue

type Queue[T any] struct {
	EnqueuePreprocessorFn func(context.Context, T) T
	ListenPreprocessorFn  func(context.Context, T) (context.Context, T)
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T]) *Queue[T]

func (Queue[T]) Enqueue

func (q Queue[T]) Enqueue(ctx context.Context, item T)

func (*Queue[T]) InitializeMetrics

func (q *Queue[T]) InitializeMetrics(meter metric.Meter)

func (Queue[T]) Listen

func (q Queue[T]) Listen(item T)

func (*Queue[T]) SetDriver

func (q *Queue[T]) SetDriver(driver QueueDriver[T])

func (*Queue[T]) Stop

func (q *Queue[T]) Stop()

type QueueDriver

type QueueDriver[T any] interface {
	Enqueue(T)
	SetListener(Listener[T])
}

type QueueItemProcessor

type QueueItemProcessor[T any] interface {
	ProcessItem(context.Context, T)
}

type Step

type Step[T any] struct {
	Driver           WorkerDriver[T]
	Processor        StepProcessor[T]
	InputQueueOffset int
}

type StepProcessor

type StepProcessor[T any] interface {
	QueueItemProcessor[T]
	SetOutputQueue(Enqueuer[T])
}

type WorkerDriver added in v0.14.5

type WorkerDriver[T any] interface {
	QueueDriver[T]
	Start()
	Stop()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL