Documentation ¶
Index ¶
Constants ¶
View Source
const ( QueueWorkerCount = 20 QueueWorkerBufferSize = QueueWorkerCount * 1_000 // 1k jobs per worker )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DriverFactory ¶ added in v0.14.5
type DriverFactory[T any] struct { // contains filtered or unexported fields }
func NewDriverFactory ¶ added in v0.14.5
func NewDriverFactory[T any](natsConn *nats.Conn) DriverFactory[T]
func (DriverFactory[T]) NewDriver ¶ added in v0.14.5
func (df DriverFactory[T]) NewDriver(channelName string) WorkerDriver[T]
type InMemoryQueueDriver ¶
type InMemoryQueueDriver[T any] struct { // contains filtered or unexported fields }
func NewInMemoryQueueDriver ¶
func NewInMemoryQueueDriver[T any](name string) *InMemoryQueueDriver[T]
func (InMemoryQueueDriver[T]) Enqueue ¶
func (qd InMemoryQueueDriver[T]) Enqueue(item T)
func (*InMemoryQueueDriver[T]) SetListener ¶
func (qd *InMemoryQueueDriver[T]) SetListener(l Listener[T])
func (InMemoryQueueDriver[T]) Start ¶
func (qd InMemoryQueueDriver[T]) Start()
func (InMemoryQueueDriver[T]) Stop ¶
func (qd InMemoryQueueDriver[T]) Stop()
type InputQueueSetter ¶
type NatsDriver ¶ added in v0.14.5
type NatsDriver[T any] struct { // contains filtered or unexported fields }
func NewNatsDriver ¶ added in v0.14.5
func NewNatsDriver[T any](conn *nats.Conn, topic string) *NatsDriver[T]
func (*NatsDriver[T]) Enqueue ¶ added in v0.14.5
func (d *NatsDriver[T]) Enqueue(msg T)
func (*NatsDriver[T]) SetListener ¶ added in v0.14.5
func (d *NatsDriver[T]) SetListener(listener Listener[T])
SetListener implements QueueDriver.
func (*NatsDriver[T]) Start ¶ added in v0.14.5
func (d *NatsDriver[T]) Start()
func (*NatsDriver[T]) Stop ¶ added in v0.14.5
func (d *NatsDriver[T]) Stop()
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
func (*Pipeline[T]) GetQueueForStep ¶
type Queue ¶
type Queue[T any] struct { EnqueuePreprocessorFn func(context.Context, T) T ListenPreprocessorFn func(context.Context, T) (context.Context, T) // contains filtered or unexported fields }
func NewQueue ¶
func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T]) *Queue[T]
func (*Queue[T]) InitializeMetrics ¶
func (*Queue[T]) SetDriver ¶
func (q *Queue[T]) SetDriver(driver QueueDriver[T])
type QueueDriver ¶
type QueueItemProcessor ¶
type Step ¶
type Step[T any] struct { Driver WorkerDriver[T] Processor StepProcessor[T] InputQueueOffset int }
type StepProcessor ¶
type StepProcessor[T any] interface { QueueItemProcessor[T] SetOutputQueue(Enqueuer[T]) }
type WorkerDriver ¶ added in v0.14.5
type WorkerDriver[T any] interface { QueueDriver[T] Start() Stop() }
Click to show internal directories.
Click to hide internal directories.