gokogeri

package module
v0.0.0-...-7408474 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2023 License: MIT Imports: 12 Imported by: 0

README

gokogeri

gokogeri is a Go package for asynchronous job processing that tries to implement some of the most useful parts of Sidekiq in a compatible way.

It is still in the early stages of development and is not ready to be used.

Quick Start

You can find a complete example in example/example.go.

Configuration

Start with the default configuration and adjust it according to your needs.

cfg := redis.NewDefaultConfig()
cfg.URL = "redis://localhost/4"
Logging

Use the default logger

logger := gokogeri.DefaultLogger()

or disable logging

logger := zerolog.Nop()
Connecting to Redis
cm := redis.NewConnManager(cfg)
defer cm.Close()
Adding jobs
var job gokogeri.Job
job.SetQueue("critical")
job.SetClass("CriticalJob")

enqueuer := gokogeri.NewEnqueuer(cm)
enqueuer.Enqueue(ctx, &job)
Processing jobs

Create a node, which represents an instance of a server that is processing jobs.

node := gokogeri.NewNode(logger, cm, cfg.LongPollTimeout)

Define the queues that you want to process, along with the workers and the number of instances (goroutines) of those workers.

You should probably implement the Worker interface. The examples use WorkerFunc.

// 1 dedicated goroutine for processing critical jobs

instances := 1
node.ProcessQueues(
    gokogeri.OrderedQueueSet{"critical"},
    gokogeri.WorkerFunc(func(ctx context.Context, j *gokogeri.Job) error {
        var err error

        // do the work

        return err
    }),
    instances,
)

Use an OrderedQueueSet to process queues in a strictly defined order of priorities.

Use a RandomQueueSet to process queues in random order, with the likelihood of each queue being checked first based on their relative weights.

// 5 instances processing jobs from two queues, with weight ratios 3 to 1.
// The high_priority queue has a 75% chance of being checked first.
// The low_priority queue has a 25% chance of being checked first.

qs := gokogeri.NewRandomQueueSet()
qs.Add("low_priority", 1)
qs.Add("high_priority", 3)

instances := 5
node.ProcessQueues(
    qs,
    gokogeri.WorkerFunc(func(ctx context.Context, j *gokogeri.Job) error {
        var err error

        // do the work

        return err
    }),
    instances,
)

Run the node. This will block until the node is stopped, so you should probably run it in another gorutine.

node.Run()

When you want to stop the node, call Stop, and pass a shutdown context as a timeout.

shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

node.Stop(shutdownCtx)

New jobs will not be taken from queues any more. Workers that are currently processing jobs will be given a grace period and allowed to finish. Once the Context provided to Stop expires, the Context passed to every Worker will be cancelled.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultLogger

func DefaultLogger() zerolog.Logger

DefaultLogger returns a JSON logger, writing to stdout, with a timestamp and a minimum level of info.

Types

type ConnProvider

type ConnProvider interface {
	// Conn returns a connection, which can come from a shared pool. The caller will call Close on the connection when
	// it is done with it.
	Conn(context.Context) (redis.Conn, error)

	// DialLongPoll returns a new, dedicated connection, with a long read timeout and a normal write timeout. The caller
	// will close the connection.
	DialLongPoll(context.Context) (redis.Conn, error)
}

ConnProvider provides Redis connections, while encapsulating the process of establishing and configuring the connections. It is safe for concurrent use.

The provided Context should only affect the process of establishing a connection. If the context expires afterwards, it should not affect the use of the connection.

type Enqueuer

type Enqueuer struct {
	// contains filtered or unexported fields
}

Enqueuer puts jobs in queues.

func NewEnqueuer

func NewEnqueuer(cp ConnProvider) *Enqueuer

NewEnqueuer returns a new instance.

func (*Enqueuer) Enqueue

func (e *Enqueuer) Enqueue(ctx context.Context, j *Job) error

Enqueue adds the job to the queue configured in the job, or the default one, if no queue is configured.

type Job

type Job struct {
	// contains filtered or unexported fields
}

func (*Job) Args

func (j *Job) Args() []interface{}

func (*Job) Class

func (j *Job) Class() string

Class returns the Ruby class that implements the job.

func (*Job) CreatedAt

func (j *Job) CreatedAt() time.Time

func (*Job) EnqueuedAt

func (j *Job) EnqueuedAt() time.Time

func (*Job) ID

func (j *Job) ID() string

func (*Job) Queue

func (j *Job) Queue() string

func (*Job) Retry

func (j *Job) Retry() bool

Retry reports whether the job should be retried if it fails.

func (*Job) RetryTimes

func (j *Job) RetryTimes() int

RetryTimes returns the number of times the job should be retried, or 0 if the default value should be used.

func (*Job) SetArgs

func (j *Job) SetArgs(args []interface{}) *Job

func (*Job) SetClass

func (j *Job) SetClass(c string) *Job

func (*Job) SetCreatedAt

func (j *Job) SetCreatedAt(t time.Time) *Job

func (*Job) SetID

func (j *Job) SetID(id string) *Job

func (*Job) SetQueue

func (j *Job) SetQueue(q string) *Job

func (*Job) SetRetry

func (j *Job) SetRetry(retry bool) *Job

SetRetry configures whether the job should be retried if it fails.

func (*Job) SetRetryTimes

func (j *Job) SetRetryTimes(n int) *Job

SetRetryTimes configures the number of times the job should be retried. The minimum allowed value is 0 and the maximum 100. Values outside of that range will be ignored.

Calling this function always enables retries, because 0 represents the default value for the number of retries. To disable retries, use SetRetry(false).

type Node

type Node struct {
	// contains filtered or unexported fields
}

A Node represents a single server instance processing as many queues with as many Worker instances as are needed.

func NewNode

func NewNode(log zerolog.Logger, cp ConnProvider, longPollTimeout int) *Node

NewNode returns a new instance.

func (*Node) ProcessQueues

func (n *Node) ProcessQueues(qs QueueSet, w Worker, instances int)

ProcessQueues configures the Node to process the given set of queues using the desired number of Worker instances.

You can call ProcessQueues many times with different sets of queues and Workers. Do not call it any more after calling Run.

func (*Node) Run

func (n *Node) Run()

Run starts the process of getting jobs from queues and passing them to Workers. It blocks until the Node is shut down. See Stop for more.

func (*Node) Stop

func (n *Node) Stop(ctx context.Context)

Stop initiates worker shutdown. Once the shutdown process is complete, the call to Run will return.

New jobs will not be taken from queues any more.

Workers that are currently processing jobs will be given a grace period and allowed to finish. Once the Context provided to Stop expires, the Context passed to every Worker will be cancelled.

Stop blocks until the shutdown process has completed.

type OrderedQueueSet

type OrderedQueueSet []string

An OrderedQueueSet always returns the queues in the desired order.

func (OrderedQueueSet) GetQueues

func (qs OrderedQueueSet) GetQueues() []string

GetQueues implements QueueSet.

func (OrderedQueueSet) Names

func (qs OrderedQueueSet) Names() []string

Names implements QueueSet.

type QueueSet

type QueueSet interface {
	// GetQueues returns the queues sorted by the desired priority.
	GetQueues() []string

	// Names returns a list of queue names in the same order as they were configured, ignoring the strategy of the set,
	// such as randomization, for example. This is currently used for logging.
	Names() []string
}

A QueueSet implements a strategy for deciding which queues should be checked first by a group of workers. The set will be consulted every time there is a need to get the next job, so it is OK to return a different slice on every call to GetQueues.

type RandomQueueSet

type RandomQueueSet struct {
	// contains filtered or unexported fields
}

A RandomQueueSet returns the queues in random order, with the likelihood of each queue being first based on their relative weights.

func NewRandomQueueSet

func NewRandomQueueSet() *RandomQueueSet

NewRandomQueueSet returns a new instance.

func (*RandomQueueSet) Add

func (qs *RandomQueueSet) Add(q string, weight int)

Add adds a queue with the given relative weight.

Example:

qs.Add("low_priority", 1)
qs.Add("high_priority", 3)

The "low_priority" queue has a 25% chance of being checked first: 1 / (1 + 3).

The "high_priority" queue has a 75% chance of being checked first: 3 / (1 + 3).

func (*RandomQueueSet) GetQueues

func (qs *RandomQueueSet) GetQueues() []string

GetQueues implements QueueSet.

func (*RandomQueueSet) Names

func (qs *RandomQueueSet) Names() []string

Names implements QueueSet.

type Worker

type Worker interface {
	// Work is safe for concurrent use.
	Work(context.Context, *Job) error
}

A Worker processes jobs from one or more queues.

type WorkerFunc

type WorkerFunc func(context.Context, *Job) error

WorkerFunc is an adapter to allow the use of functions as Workers.

func (WorkerFunc) Work

func (w WorkerFunc) Work(ctx context.Context, j *Job) error

Work implements Worker by delegating to the wrapped function.

Directories

Path Synopsis
internal
sidekiq
Package sidekiq contains helper functions for compatibility with Sidekiq.
Package sidekiq contains helper functions for compatibility with Sidekiq.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL