beanq

package module
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: MIT Imports: 44 Imported by: 0

README

Beanq

Beanq is a message queue system developed based on the Redis data type Stream, which currently supports normal queues, delay queues, and sequence queues.

Notice

To ensure data safety, it is recommended to enable AOF persistence. For the differences between AOF and RDB, please refer to the official documentation Redis persistence

Process

  1. Normal Queue
    Alt

    When messages are published to the queue, they are consumed immediately. Concurrently, a child coroutine is initiated to monitor for any dead-letter messages. Should dead-letter messages be detected, they are directly moved to a log queue to facilitate further handling or analysis.
    In this context:
    1.1 "published to the queue" means that messages are added to the queue for processing.
    1.2 "consumed directly" implies that these messages are processed as soon as they become available.
    1.3 "subprocess coroutine" refers to an auxiliary concurrent process that runs alongside the main process.
    1.4 "dead-letter messages" are messages that could not be delivered or processed successfully and have been moved to a special queue for handling.
    1.5 "log queue" is a designated queue where dead-letter messages are stored for later examination or retry.

  2. Delay Queue
    Alt

    The delay queue system supports messages with priority levels. The format for storing these messages is 1734399237.999, where:
    2.1 The preceding segment (e.g., 1734399237) indicates the Unix timestamp in seconds, defining when the message should be processed.
    2.2 The succeeding segment (e.g., .999) represents the priority level of the message.
    2.3 The system allows for a maximum priority level of 999. Messages with a higher numerical value in the priority segment will be given precedence and therefore consumed before those with lower values. This mechanism ensures that critical or time-sensitive messages are handled as a priority within the delay queue framework.

  3. Sequence Queue
    Alt

    In synchronous messaging, the status of a message is synchronized to a Redis hash at every stage of the message-sending process.
    This means that as the message progresses through different stages (e.g., creation, sending, delivery), its status is updated in a Redis hash.
    3.1 The client then performs synchronous checks against this Redis hash to retrieve the current status of the message. Based on the information retrieved from the Redis hash, the client can then return or provide feedback about the message accordingly.
    3.2 This approach ensures that the client always has the most up-to-date information regarding the message's status, allowing for immediate responses based on the latest state of the message within the system.

Example Explanation

Start and enter the container.

docker compose up -d --build

docker exec -it beanq-example bash

delay example:

make delay

normal example:

make normal

sequential example:

make sequential

When you want to exit the container, please remember to execute the clean command, as env.json needs to be restored.

make clean

Documentation

Index

Constants

View Source
const (
	ExecuteSuccess = iota + 1
	ExecuteFailed
	RollbackSuccess
	RollbackFailed
	RollbackResultProcessFailed
)

Variables

View Source
var (
	NilHandle = errors.New("beanq:handle is nil")
	NilCancel = errors.New("beanq:cancel is nil")
)

consumer...

View Source
var ErrExtendFailed = bstatus.BqError("failed to extend lock")

ErrExtendFailed is the error resulting if Redsync fails to extend the lock.

View Source
var ErrFailed = bstatus.BqError("failed to acquire lock")

ErrFailed is the error resulting if Redsync fails to acquire the lock after exhausting all retries.

View Source
var ErrLockAlreadyExpired = bstatus.BqError("failed to unlock, lock was already expired")

ErrLockAlreadyExpired is the error resulting if trying to unlock the lock which already expired.

Functions

func GetBrokerDriver

func GetBrokerDriver[T any]() T

func WfOptionMux

func WfOptionMux(mux WFMux) func(workflow *Workflow)

func WfOptionRecordErrorHandler

func WfOptionRecordErrorHandler(handler func(error)) func(workflow *Workflow)

Types

type BQClient

type BQClient struct {
	// contains filtered or unexported fields
}

func (*BQClient) Dynamic

func (b *BQClient) Dynamic(options ...DynamicOption) *BQClient

Dynamic only support Sequential type for now.

func (*BQClient) GetId

func (b *BQClient) GetId() string

func (*BQClient) Priority

func (b *BQClient) Priority(priority float64) *BQClient

func (BQClient) Publish

func (t BQClient) Publish(channel, topic string, payload []byte) error

func (BQClient) PublishAtTime

func (t BQClient) PublishAtTime(channel, topic string, payload []byte, atTime time.Time) error

func (*BQClient) PublishInSequential

func (b *BQClient) PublishInSequential(channel, topic string, payload []byte) *SequentialCmd

func (*BQClient) SetId

func (b *BQClient) SetId(id string) *BQClient

func (BQClient) Subscribe

func (t BQClient) Subscribe(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)

func (BQClient) SubscribeDelay

func (t BQClient) SubscribeDelay(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)

func (BQClient) SubscribeSequential

func (t BQClient) SubscribeSequential(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)

func (*BQClient) WithContext

func (b *BQClient) WithContext(ctx context.Context) *BQClient

type BaseTask

type BaseTask struct {
	// contains filtered or unexported fields
}

BaseTask ...

func (*BaseTask) Execute

func (t *BaseTask) Execute() error

func (*BaseTask) ID

func (t *BaseTask) ID() string

func (*BaseTask) OnExecute

func (t *BaseTask) OnExecute(fn func(task Task) error) *BaseTask

func (*BaseTask) OnRollback

func (t *BaseTask) OnRollback(fn func(task Task) error) *BaseTask

func (*BaseTask) Rollback

func (t *BaseTask) Rollback() error

func (*BaseTask) Statement

func (t *BaseTask) Statement() []byte

func (*BaseTask) WithRecordStatement

func (t *BaseTask) WithRecordStatement(statement []byte) *BaseTask

type BeanqConfig

type BeanqConfig struct {
	Health   Health `json:"health"`
	Broker   string `json:"broker"`
	UI       `json:"ui"`
	DebugLog `json:"debugLog"`
	Queue
	History                  `json:"history"`
	Redis                    Redis         `json:"redis"`
	DeadLetterIdleTime       time.Duration `json:"deadLetterIdle"`
	DeadLetterTicker         time.Duration `json:"deadLetterTicker"`
	KeepFailedJobsInHistory  time.Duration `json:"keepFailedJobsInHistory"`
	KeepSuccessJobsInHistory time.Duration `json:"keepSuccessJobsInHistory"`
	PublishTimeOut           time.Duration `json:"publishTimeOut"`
	ConsumeTimeOut           time.Duration `json:"consumeTimeOut"`
	MinConsumers             int64         `json:"minConsumers"`
	JobMaxRetries            int           `json:"jobMaxRetries"`
	ConsumerPoolSize         int           `json:"consumerPoolSize"`
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(config *BeanqConfig) *Broker

func (*Broker) AddConsumer

func (t *Broker) AddConsumer(moodType btype.MoodType, channel, topic string, subscribe IConsumeHandle) error

func (*Broker) Dequeue

func (t *Broker) Dequeue(ctx context.Context, channel, topic string, do public.CallBack)

func (*Broker) Enqueue

func (t *Broker) Enqueue(ctx context.Context, data map[string]any) error

func (*Broker) Migrate

func (t *Broker) Migrate(ctx context.Context, data []map[string]any) error

func (*Broker) Start

func (t *Broker) Start(ctx context.Context)

func (*Broker) Status

func (t *Broker) Status(ctx context.Context, channel, topic, id string) (map[string]string, error)

func (*Broker) WaitSignal

func (t *Broker) WaitSignal(cancel context.CancelFunc) <-chan bool

type Client

type Client struct {
	Topic     string        `json:"topic"`
	Channel   string        `json:"channel"`
	MaxLen    int64         `json:"maxLen"`
	Retry     int           `json:"retry"`
	Priority  float64       `json:"priority"`
	TimeToRun time.Duration `json:"timeToRun"`
	// contains filtered or unexported fields
}

Client beanq's client

func New

func New(config *BeanqConfig, options ...ClientOption) *Client

func (*Client) BQ

func (c *Client) BQ() *BQClient

func (*Client) CheckAckStatus

func (c *Client) CheckAckStatus(ctx context.Context, channel, topic, id string) (*Message, error)

func (*Client) Ping

func (c *Client) Ping()

Ping this method can be called by user for checking the status of broker

func (*Client) ServeHttp

func (c *Client) ServeHttp(ctx context.Context)

func (*Client) Wait

func (c *Client) Wait(ctx context.Context)

type ClientOption

type ClientOption func(client *Client)

func WithCaptureExceptionOption

func WithCaptureExceptionOption(handler func(ctx context.Context, err any)) ClientOption

type DebugLog

type DebugLog struct {
	Path string `json:"path"`
	On   bool   `json:"on"`
}

type DefaultHandle

type DefaultHandle struct {
	DoHandle func(ctx context.Context, message *Message) error
	DoCancel func(ctx context.Context, message *Message) error
	DoError  func(ctx context.Context, err error)
}

func (DefaultHandle) Cancel

func (c DefaultHandle) Cancel(ctx context.Context, message *Message) error

func (DefaultHandle) Error

func (c DefaultHandle) Error(ctx context.Context, err error)

func (DefaultHandle) Handle

func (c DefaultHandle) Handle(ctx context.Context, message *Message) error

type DelayFunc

type DelayFunc func(tries int) time.Duration

A DelayFunc is used to decide the amount of time to wait between retries.

type DynamicOption

type DynamicOption func(option *dynamicOption)

func DynamicKeyOpt

func DynamicKeyOpt(key string) DynamicOption

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

type Health

type Health struct {
	Port string `json:"port"`
	Host string `json:"host"`
}

type History

type History struct {
	Mongo struct {
		Database              string
		Collection            string
		UserName              string
		Password              string
		Host                  string
		Port                  string
		ConnectTimeOut        time.Duration
		MaxConnectionPoolSize uint64
		MaxConnectionLifeTime time.Duration
	}
	On bool
}

type IBaseCmd

type IBaseCmd interface {
	// contains filtered or unexported methods
}

IBaseCmd BaseCmd public method

type IBaseSubscribeCmd

type IBaseSubscribeCmd interface {
	IBaseCmd
	Run(ctx context.Context)
}

IBaseSubscribeCmd BaseSubscribeCmd subscribe method

type IConsumeCancel

type IConsumeCancel interface {
	Cancel(ctx context.Context, message *Message) error
}

type IConsumeError

type IConsumeError interface {
	Error(ctx context.Context, err error)
}

type IConsumeHandle

type IConsumeHandle interface {
	Handle(ctx context.Context, message *Message) error
}

type Message

type Message struct {
	ExecuteTime  time.Time        `json:"executeTime"`
	EndTime      time.Time        `json:"endTime"`
	BeginTime    time.Time        `json:"beginTime"`
	Response     any              `json:"response"`
	Info         bstatus.FlagInfo `json:"info"`
	Level        bstatus.LevelMsg `json:"level"`
	Topic        string           `json:"topic"`
	Channel      string           `json:"channel"`
	Payload      string           `json:"payload"`
	AddTime      string           `json:"addTime"`
	Consumer     string           `json:"consumer"`
	RunTime      string           `json:"runTime"`
	MoodType     btype.MoodType   `json:"moodType"`
	Status       bstatus.Status   `json:"status"`
	Id           string           `json:"id"`
	Retry        int              `json:"retry"`
	TimeToRun    time.Duration    `json:"timeToRun"`
	MaxLen       int64            `json:"maxLen"`
	Priority     float64          `json:"priority"`
	PendingRetry int64            `json:"pendingRetry"`
}

func (Message) MarshalBinary

func (m Message) MarshalBinary() (data []byte, err error)

func (Message) ToMap

func (m Message) ToMap() map[string]any

type MessageM

type MessageM map[string]any

func (MessageM) ToMessage

func (data MessageM) ToMessage() *Message

type MessageS

type MessageS map[string]string

func (MessageS) ToMessage

func (data MessageS) ToMessage() *Message

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

A Mutex is a distributed mutual exclusion lock.

func (*Mutex) Eval

func (m *Mutex) Eval(ctx context.Context, client redis.UniversalClient, script *Script, keysAndArgs ...interface{}) (interface{}, error)

func (*Mutex) ExtendContext

func (m *Mutex) ExtendContext(ctx context.Context) (bool, error)

ExtendContext resets the mutex's expiry and returns the status of expiry extension.

func (*Mutex) LockContext

func (m *Mutex) LockContext(ctx context.Context) error

LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.

func (*Mutex) Name

func (m *Mutex) Name() string

Name returns mutex name (i.e. the Redis key).

func (*Mutex) UnlockContext

func (m *Mutex) UnlockContext(ctx context.Context) (bool, error)

UnlockContext unlocks m and returns the status of unlock.

func (*Mutex) Until

func (m *Mutex) Until() time.Time

Until returns the time of validity of acquired lock. The value will be zero value until a lock is acquired.

func (*Mutex) Value

func (m *Mutex) Value() string

Value returns the current random value. The value will be empty until a lock is acquired (or WithValue option is used).

type MuxClient

type MuxClient struct {
	// contains filtered or unexported fields
}

func NewMuxClient

func NewMuxClient(client redis.UniversalClient) *MuxClient

func (*MuxClient) NewMutex

func (p *MuxClient) NewMutex(name string, options ...MuxOption) *Mutex

func (*MuxClient) SetExpireTime

func (p *MuxClient) SetExpireTime(expireTime time.Duration) *MuxClient

func (*MuxClient) SetPrefix

func (p *MuxClient) SetPrefix(prefix string) *MuxClient

type MuxOption

type MuxOption interface {
	Apply(*Mutex)
}

An MuxOption configures a mutex.

func WithDriftFactor

func WithDriftFactor(factor float64) MuxOption

WithDriftFactor can be used to set the clock drift factor. The default value is 0.01.

func WithExpiry

func WithExpiry(expiry time.Duration) MuxOption

WithExpiry can be used to set the expiry of a mutex to the given value. The default is 8s.

func WithFailFast

func WithFailFast(b bool) MuxOption

WithFailFast can be used to quickly acquire and release the lock. When some Redis servers are blocking, we do not need to wait for responses from all the Redis servers response. As long as the quorum is met, we can assume the lock is acquired. The effect of this parameter is to achieve low latency, avoid Redis blocking causing Lock/Unlock to not return for a long time.

func WithGenValueFunc

func WithGenValueFunc(genValueFunc func() (string, error)) MuxOption

WithGenValueFunc can be used to set the custom value generator.

func WithRetryDelay

func WithRetryDelay(delay time.Duration) MuxOption

WithRetryDelay can be used to set the amount of time to wait between retries. The default value is rand(50ms, 250ms).

func WithRetryDelayFunc

func WithRetryDelayFunc(delayFunc DelayFunc) MuxOption

WithRetryDelayFunc can be used to override default delay behavior.

func WithSetNXOnExtend

func WithSetNXOnExtend() MuxOption

WithSetNXOnExtend improves extending logic to extend the key if exist and if not, tries to set a new key in redis Useful if your redises restart often and you want to reduce the chances of losing the lock

func WithShufflePools

func WithShufflePools(b bool) MuxOption

WithShufflePools can be used to shuffle Redis pools to reduce centralized access in concurrent scenarios.

func WithTimeoutFactor

func WithTimeoutFactor(factor float64) MuxOption

WithTimeoutFactor can be used to set the timeout factor. The default value is 0.05.

func WithTries

func WithTries(tries int) MuxOption

WithTries can be used to set the number of times lock acquire is attempted. The default value is 32.

func WithValue

func WithValue(v string) MuxOption

WithValue can be used to assign the random value without having to call lock. This allows the ownership of a lock to be "transferred" and allows the lock to be unlocked from elsewhere.

type OptionFunc

type OptionFunc func(*Mutex)

OptionFunc is a function that configures a mutex.

func (OptionFunc) Apply

func (f OptionFunc) Apply(mutex *Mutex)

Apply calls f(mutex)

type Publish

type Publish struct {
	// contains filtered or unexported fields
}

Publish command:publish

type Queue

type Queue struct {
	Topic        string
	DelayChannel string
	DelayTopic   string
	Channel      string
	MaxLen       int64
	Priority     float64
	TimeToRun    time.Duration
}

type Redis

type Redis struct {
	Host               string        `json:"host"`
	Port               string        `json:"port"`
	Password           string        `json:"password"`
	Prefix             string        `json:"prefix"`
	Database           int           `json:"database"`
	MaxLen             int64         `json:"maxLen"`
	MinIdleConnections int           `json:"minIdleConnections"`
	DialTimeout        time.Duration `json:"dialTimeout"`
	ReadTimeout        time.Duration `json:"readTimeout"`
	WriteTimeout       time.Duration `json:"writeTimeout"`
	PoolTimeout        time.Duration `json:"poolTimeout"`
	MaxRetries         int           `json:"maxRetries"`
	PoolSize           int           `json:"poolSize"`
}

type Script

type Script struct {
	Src      string
	Hash     string
	KeyCount int
}

Script encapsulates the source, hash and key count for a Lua script.

func NewScript

func NewScript(keyCount int, src string) *Script

NewScript returns a new script object. If keyCount is greater than or equal to zero, then the count is automatically inserted in the EVAL command argument list. If keyCount is less than zero, then the application supplies the count as the first value in the keysAndArgs argument to the Do, Send and SendHash methods.

type SequentialCmd

type SequentialCmd struct {
	// contains filtered or unexported fields
}

func (*SequentialCmd) Error

func (s *SequentialCmd) Error() error

func (*SequentialCmd) WaitingAck

func (s *SequentialCmd) WaitingAck() (*Message, error)

WaitingAck ...

type Subscribe

type Subscribe struct {
	// contains filtered or unexported fields
}

Subscribe command:subscribe

func (*Subscribe) Run

func (t *Subscribe) Run(ctx context.Context)

Run will to be implemented

type Task

type Task interface {
	ID() string
	Execute() error
	Rollback() error
	Statement() []byte
}

type TaskStatus

type TaskStatus struct {
	// contains filtered or unexported fields
}

func (*TaskStatus) Error

func (t *TaskStatus) Error() error

func (*TaskStatus) Statement

func (t *TaskStatus) Statement() string

func (*TaskStatus) Status

func (t *TaskStatus) Status() string

func (*TaskStatus) String

func (t *TaskStatus) String() string

type UI

type UI struct {
	Stmt struct {
		Host     string `json:"host"`
		Port     string `json:"port"`
		User     string `json:"user"`
		Password string `json:"password"`
	}
	GoogleAuth struct {
		ClientId     string
		ClientSecret string
		CallbackUrl  string
	}
	SendGrid struct {
		Key         string
		FromName    string
		FromAddress string
	}
	Root struct {
		UserName string `json:"username"`
		Password string `json:"password"`
	} `json:"root"`
	Issuer    string        `json:"issuer"`
	Subject   string        `json:"subject"`
	JwtKey    string        `json:"jwtKey"`
	Port      string        `json:"port"`
	ExpiresAt time.Duration `json:"expiresAt"`
}

type WFMux

type WFMux interface {
	Name() string
	Value() string
	Until() time.Time
	LockContext(ctx context.Context) error
	UnlockContext(ctx context.Context) (bool, error)
	ExtendContext(ctx context.Context) (bool, error)
}

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(ctx context.Context, message *Message) *Workflow

func (*Workflow) CurrentTask

func (w *Workflow) CurrentTask() Task

func (*Workflow) GetGid

func (w *Workflow) GetGid() string

func (*Workflow) Init

func (w *Workflow) Init(opts ...func(workflow *Workflow))

func (*Workflow) Message

func (w *Workflow) Message() *Message

func (*Workflow) NewTask

func (w *Workflow) NewTask(ids ...string) *BaseTask

func (*Workflow) OnRollbackResult

func (w *Workflow) OnRollbackResult(handler func(taskID string, err error) error) *Workflow

OnRollbackResult handle rollback error

func (*Workflow) Results

func (w *Workflow) Results() []error

func (*Workflow) Run

func (w *Workflow) Run() (err error)

func (*Workflow) SetGid

func (w *Workflow) SetGid(gid string)

func (*Workflow) TrackRecord

func (w *Workflow) TrackRecord(taskID string, status TaskStatus)

type WorkflowHandler

type WorkflowHandler func(ctx context.Context, wf *Workflow) error

func (WorkflowHandler) Handle

func (c WorkflowHandler) Handle(ctx context.Context, message *Message) error

type WorkflowRecord

type WorkflowRecord struct {
	// contains filtered or unexported fields
}

func NewWorkflowRecord

func NewWorkflowRecord() *WorkflowRecord

func (*WorkflowRecord) SyncWrite

func (w *WorkflowRecord) SyncWrite(ctx context.Context, data any) error

func (*WorkflowRecord) Write

func (w *WorkflowRecord) Write(ctx context.Context, data any)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL