Documentation ¶
Index ¶
- Variables
- type Consumer
- type Job
- type Logger
- type Metric
- type Option
- type OptionFunc
- type Options
- type Queue
- func (q *Queue) BusyWorkers() int
- func (q *Queue) FailureTasks() int
- func (q *Queue) Queue(job core.QueuedMessage) error
- func (q *Queue) QueueTask(task TaskFunc) error
- func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error
- func (q *Queue) QueueWithTimeout(timeout time.Duration, job core.QueuedMessage) error
- func (q *Queue) Release()
- func (q *Queue) Shutdown()
- func (q *Queue) Start()
- func (q *Queue) SubmittedTasks() int
- func (q *Queue) SuccessTasks() int
- func (q *Queue) UpdateWorkerCount(num int)
- func (q *Queue) Wait()
- type TaskFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTaskInQueue there is nothing in the queue ErrNoTaskInQueue = errors.New("golang-queue: no task in queue") // ErrQueueHasBeenClosed the current queue is closed ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed") )
var ErrMissingWorker = errors.New("missing worker module")
ErrMissingWorker missing define worker
var ErrQueueShutdown = errors.New("queue has been closed and released")
ErrQueueShutdown the queue is released and closed.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶ added in v0.0.7
type Consumer struct {
// contains filtered or unexported fields
}
Consumer for simple queue using buffer channel
func NewConsumer ¶ added in v0.0.7
NewConsumer for create new consumer instance
func (*Consumer) Queue ¶ added in v0.0.7
func (s *Consumer) Queue(task core.QueuedMessage) error
Queue send task to the buffer channel
func (*Consumer) Request ¶ added in v0.1.0
func (s *Consumer) Request() (core.QueuedMessage, error)
Request a new task from channel
type Job ¶
type Job struct { Task TaskFunc `json:"-"` // Timeout is the duration the task can be processed by Handler. // zero if not specified Timeout time.Duration `json:"timeout"` // Payload is the payload data of the task. Payload []byte `json:"body"` }
Job describes a task and its metadata.
type Logger ¶
type Logger interface { Infof(format string, args ...interface{}) Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Info(args ...interface{}) Error(args ...interface{}) Fatal(args ...interface{}) }
Logger interface is used throughout gorush
func NewEmptyLogger ¶
func NewEmptyLogger() Logger
NewEmptyLogger for simple logger.
Example ¶
l := NewEmptyLogger() l.Info("test") l.Infof("test") l.Error("test") l.Errorf("test") l.Fatal("test") l.Fatalf("test")
Output:
type Metric ¶ added in v0.0.10
type Metric interface { IncBusyWorker() DecBusyWorker() BusyWorkers() uint64 SuccessTasks() uint64 FailureTasks() uint64 SubmittedTasks() uint64 IncSuccessTask() IncFailureTask() IncSubmittedTask() }
Metric interface
type Option ¶
type Option interface {
Apply(*Options)
}
An Option configures a mutex.
func WithQueueSize ¶ added in v0.0.7
WithQueueSize set worker count
func WithTimeOut ¶ added in v0.0.9
WithTimeOut set custom timeout
type OptionFunc ¶ added in v0.1.0
type OptionFunc func(*Options)
OptionFunc is a function that configures a queue.
func (OptionFunc) Apply ¶ added in v0.1.0
func (f OptionFunc) Apply(option *Options)
Apply calls f(option)
type Options ¶ added in v0.0.7
type Options struct {
// contains filtered or unexported fields
}
Options for custom args in Queue
func NewOptions ¶ added in v0.0.7
NewOptions initialize the default value for the options
type Queue ¶
A Queue is a message queue.
func NewPool ¶ added in v0.0.7
NewPool initializes a new pool
Example (QueueTask) ¶
package main import ( "context" "fmt" "log" "time" "github.com/golang-queue/queue" ) func main() { taskN := 7 rets := make(chan int, taskN) // allocate a pool with 5 goroutines to deal with those tasks p := queue.NewPool(5) // don't forget to release the pool in the end defer p.Release() // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i if err := p.QueueTask(func(context.Context) error { // sleep and return the index time.Sleep(20 * time.Millisecond) rets <- idx return nil }); err != nil { log.Println(err) } } // wait until all tasks done for i := 0; i < taskN; i++ { fmt.Println("index:", <-rets) } }
Output: index: 3 index: 0 index: 2 index: 4 index: 5 index: 6 index: 1
Example (QueueTaskTimeout) ¶
package main import ( "context" "fmt" "log" "time" "github.com/golang-queue/queue" ) func main() { taskN := 7 rets := make(chan int, taskN) resps := make(chan error, 1) // allocate a pool with 5 goroutines to deal with those tasks q := queue.NewPool(5) // don't forget to release the pool in the end defer q.Release() // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i if err := q.QueueTaskWithTimeout(100*time.Millisecond, func(ctx context.Context) error { // panic job if idx == 5 { panic("system error") } // timeout job if idx == 6 { time.Sleep(105 * time.Millisecond) } select { case <-ctx.Done(): resps <- ctx.Err() default: } rets <- idx return nil }); err != nil { log.Println(err) } } // wait until all tasks done for i := 0; i < taskN-1; i++ { fmt.Println("index:", <-rets) } close(resps) for e := range resps { fmt.Println(e.Error()) } fmt.Println("success task count:", q.SuccessTasks()) fmt.Println("failure task count:", q.FailureTasks()) fmt.Println("submitted task count:", q.SubmittedTasks()) }
Output: index: 3 index: 0 index: 2 index: 4 index: 6 index: 1 context deadline exceeded success task count: 5 failure task count: 2 submitted task count: 7
func (*Queue) BusyWorkers ¶ added in v0.1.0
BusyWorkers returns the numbers of workers in the running process.
func (*Queue) FailureTasks ¶ added in v0.1.0
BusyWorkers returns the numbers of failure tasks.
func (*Queue) QueueTaskWithTimeout ¶
QueueTaskWithTimeout to queue job task with timeout
func (*Queue) QueueWithTimeout ¶
QueueWithTimeout to queue all job with specified timeout.
func (*Queue) SubmittedTasks ¶ added in v0.1.0
BusyWorkers returns the numbers of submitted tasks.
func (*Queue) SuccessTasks ¶ added in v0.1.0
BusyWorkers returns the numbers of success tasks.
func (*Queue) UpdateWorkerCount ¶ added in v0.1.0
UpdateWorkerCount to update worker number dynamically.