delayqueue

package
v0.0.0-...-1be4ecf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Marshal

func Marshal(body interface{}) (out []byte, err error)

func PublishDelayTask

func PublishDelayTask(queue string, payload any, delaySecond int) (err error)

func Unmarshal

func Unmarshal(in []byte, body interface{}) error

Types

type DelayQueue

type DelayQueue interface {
	StartWorker(ctx context.Context)
	// PublishDelayTask publish a task to delay queue
	PublishDelayTask(queue string, payload any, delaySecond int)
}

func NewQueue

func NewQueue(ctx context.Context, queueName string, redisCli redis.Cmdable, callback QueueCallback, opts ...Option) (DelayQueue, error)

type Job

type Job struct {
	DeliveryTs    int    `json:"delivery-ts" msgpack:"deliveryTs"`
	Retry         int    `json:"retry" msgpack:"retry"`
	ExecEndTime   string `json:"execEndTime" msgpack:"execEndTime"`
	Msg           string `json:"msg" msgpack:"msg"`
	ExecTs        int    `json:"execTs" msgpack:"execTs"`
	ExecStartTime string `json:"execStartTime" msgpack:"execStartTime"`
	Status        int    `json:"status" msgpack:"status"`
	Timeout       int    `json:"timeout" msgpack:"timeout"`
	ExecNum       int    `json:"execNum" msgpack:"execNum"`
	Payload       []byte `json:"payload" msgpack:"payload"`
	PublishTime   string `json:"publishTime" msgpack:"publishTime"`
	DeliveryTime  string `json:"deliveryTime" msgpack:"deliveryTime"`
}

type MsgPackSerializer

type MsgPackSerializer struct{}

func (MsgPackSerializer) Marshal

func (s MsgPackSerializer) Marshal(body interface{}) (out []byte, err error)

func (MsgPackSerializer) Unmarshal

func (s MsgPackSerializer) Unmarshal(in []byte, body interface{}) error

type Option

type Option func(*delayQueue)

func WithConcurrent

func WithConcurrent(concurrent int) Option

func WithSerializer

func WithSerializer(serializer Serializer) Option

type QueueCallback

type QueueCallback func(payload []byte) bool

type Serializer

type Serializer interface {
	// Unmarshal deserialize the in bytes into body
	Unmarshal(in []byte, body interface{}) error

	// Marshal returns the bytes serialized from body.
	Marshal(body interface{}) (out []byte, err error)
}

Serializer defines body serialization interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL