dispatch

package
v0.0.0-...-3d980af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BucketQueue = make(chan string)
)

Functions

func RunTask

RunTask 启动任务

func StopTask

func StopTask(
	p2r []TransWorker,
	r2r []TransWorker,
	r2p []TransWorker,
	consumers []HandleWorker,
)

Types

type AsyncQueue

type AsyncQueue struct {
	// contains filtered or unexported fields
}

func NewAsyncQueue

func NewAsyncQueue() *AsyncQueue

func (*AsyncQueue) DeleteJob

func (q *AsyncQueue) DeleteJob(id string) error

DeleteJob 删除任务,终止+删除id信息

func (*AsyncQueue) GetJob

func (q *AsyncQueue) GetJob(id string) (*Job, error)

func (*AsyncQueue) RecoverJob

func (q *AsyncQueue) RecoverJob(id string) error

RecoverJob 恢复任务

func (*AsyncQueue) SendScheduleMsg

func (q *AsyncQueue) SendScheduleMsg(job Job, idStr string) error

SendScheduleMsg .

func (*AsyncQueue) StopJob

func (q *AsyncQueue) StopJob(id string) error

StopJob 终止任务

type HandleWorker

type HandleWorker struct {
	Wg *sync.WaitGroup
	// contains filtered or unexported fields
}

func NewHandleWorker

func NewHandleWorker(q *AsyncQueue) *HandleWorker

func (*HandleWorker) HandlerWorker

func (w *HandleWorker) HandlerWorker() []HandleWorker

func (*HandleWorker) Start

func (w *HandleWorker) Start(ctx context.Context, bucketName string)

Start consumer

func (*HandleWorker) Stop

func (w *HandleWorker) Stop()

type Job

type Job struct {
	Type     string         `json:"type" msgpack:"1"`  // 任务类型
	Delay    *time.Duration `json:"delay" msgpack:"2"` // 延迟时间, unix时间戳
	Body     interface{}    `json:"body" msgpack:"3"`
	TTL      int64          `json:"ttl" msgpack:"4"` // 任务超时时间, 秒
	Retry    int            `json:"retryCount" msgpack:"5"`
	CronSpec string         `json:"cronSpec" msgpack:"6"`
}

Job . 数据先存db,再存redis

type TransWorker

type TransWorker struct {
	Wg *sync.WaitGroup
	// contains filtered or unexported fields
}

func NewTransWorker

func NewTransWorker(q *AsyncQueue) *TransWorker

func (*TransWorker) Pending2ReadyTimer

func (p *TransWorker) Pending2ReadyTimer(ctx context.Context, bucketName string)

Pending2ReadyTimer 生产者,多协程完成对应的流程

func (*TransWorker) Pending2ReadyWorker

func (p *TransWorker) Pending2ReadyWorker() []TransWorker

func (*TransWorker) Running2PendingTimer

func (p *TransWorker) Running2PendingTimer(ctx context.Context, bucketName string)

func (*TransWorker) Running2PendingWorker

func (p *TransWorker) Running2PendingWorker() []TransWorker

func (*TransWorker) Running2ReadyTimer

func (p *TransWorker) Running2ReadyTimer(ctx context.Context, bucketName string)

Running2ReadyTimer 生产者,多协程完成对应的流程

func (*TransWorker) Running2ReadyWorker

func (p *TransWorker) Running2ReadyWorker() []TransWorker

func (*TransWorker) Stop

func (p *TransWorker) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL