fifo

package
v0.0.0-...-4cfca81 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyQueue is returned when the queue is empty
	ErrEmptyQueue = errors.New("the queue is empty or does not exists")

	TopicJobsNew       = "darko:jobs:new"
	TopicJobsProcessed = "darko:jobs:processed"
	TopicJobsFailed    = "darko:jobs:failed"
)

Functions

func Pack

func Pack(job Job) (string, error)

Pack the job as base64 string to be enqueued

func Unpack

func Unpack(pack string, job *Job) error

Unpack the job from the queue after dequeue.

Types

type Job

type Job struct {
	// ID is the job unique id. Set by the server.
	ID string
	// PartitionKey is the partition id this job belongs to. Set by the server.
	PartitionKey int
	// PrimaryKey is the primary key.
	// It is used to guarantee the order of the requests and balance the work load.
	PrimaryKey string `json:"primary_key"`
	// CallbackURL is the service endpoint to send the payload back to.
	CallbackURL string `json:"callback_url"`
	// CorrelationID is a unique identifier attached to the request by the client that allow
	// reference to a particular transaction or event chain.
	CorrelationID string `json:"correlation_id"`
	// Payload with the content encoded as base64 that will be forwarded on the client on the callback.
	Payload string `json:"payload"`
}

Job holds data for creating jobs. A job defines something that will be queued and dequeued. In HA mode, a job will also be uniform partitioned on shards.

func (Job) Hash

func (j Job) Hash() int

Hash returns hash code for the primary key

func (Job) NewPayloadReader

func (j Job) NewPayloadReader() io.Reader

type Queue

type Queue interface {
	// Pop dequeue the next item of the topic.
	Pop(topic string) (string, error)
	// Push enqueues the item,
	Push(topic string, pack string) error
}

Queue defines a FIFO queue interface

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements the Queue interface using Redis database as queue.

func NewRedisQueue

func NewRedisQueue(cli *redis.Client, blockTimeout time.Duration) *RedisQueue

NewRedisQueue returns a RedisQueue.

func (*RedisQueue) Pop

func (r *RedisQueue) Pop(topic string) (string, error)

Pop returns the left most item of the list. The command BLPop is used to block the queue when it is empty until the timeout. The item must be added by the producer with RPUSH.

func (*RedisQueue) Push

func (r *RedisQueue) Push(topic string, pack string) error

Push enqueues the item pushing it on the right side of the list. This is it can be dequeue my LPOP or BLPop in FIFO order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL