queue

package module
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2021 License: MIT Imports: 10 Imported by: 45

README

Queue

Run Tests codecov

Queue is a Golang library for spawning and managing a Goroutine pool, Allowing you to create multiple worker according to limit CPU number of machine.

Features

  • Support buffered channel queue.
  • Support NSQ (A realtime distributed messaging platform) as backend.
  • Support NATS (Connective Technology for Adaptive Edge & Distributed Systems) as backend.
  • Support Redis Pub/Sub as backend.

Queue Scenario

In Single Container or VM

queue01

Multile Container with Queue service like NSQ, NATs or Redis

queue02

Installation

Install the stable version:

go get github.com/golang-queue/queue

Install the latest verison:

go get github.com/golang-queue/queue@master

Usage

Basic usage of Pool (use Task function)

By calling QueueTask() method, it schedules the task executed by worker (goroutines) in the Pool.

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/golang-queue/queue"
)

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initial queue pool
  q := queue.NewPool(5)
  // shutdown the service and notify all the worker
  // wait all jobs are complete.
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.QueueTask(func(ctx context.Context) error {
        rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
        return nil
      }); err != nil {
        panic(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(20 * time.Millisecond)
  }
}
Basic usage of Pool (use message queue)

Define the new message struct and implement the Bytes() func to encode message. Give the WithFn func to handle the message from Queue.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
)

type job struct {
  Name    string
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initial queue pool
  q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m queue.QueuedMessage) error {
    v, ok := m.(*job)
    if !ok {
      if err := json.Unmarshal(m.Bytes(), &v); err != nil {
        return err
      }
    }

    rets <- "Hi, " + v.Name + ", " + v.Message
    return nil
  }))
  // shutdown the service and notify all the worker
  // wait all jobs are complete.
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Name:    "Gopher",
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Println(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }
}

Using NSQ as Queue

See the NSQ documentation.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    5,
    queue.WithWorker(w),
  )

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Using NATs as Queue

See the NATs documentation

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nats"
  "github.com/golang-queue/queue"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nats.NewWorker(
    nats.WithAddr("127.0.0.1:4222"),
    nats.WithSubj("example"),
    nats.WithQueue("foobar"),
    nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the five worker
  q.Start()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Using Redis(Pub/Sub) as Queue

See the redis documentation

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/redisdb"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := redisdb.NewWorker(
    redisdb.WithAddr("127.0.0.1:6379"),
    redisdb.WithChannel("foobar"),
    redisdb.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the five worker
  q.Start()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMissingWorker = errors.New("missing worker module")

ErrMissingWorker missing define worker

View Source
var ErrQueueShutdown = errors.New("queue has been closed and released")

ErrQueueShutdown the queue is released and closed.

Functions

This section is empty.

Types

type Consumer added in v0.0.7

type Consumer struct {
	// contains filtered or unexported fields
}

Worker for simple queue using channel

func NewConsumer added in v0.0.7

func NewConsumer(opts ...Option) *Consumer

NewConsumer for struc

func (*Consumer) AfterRun added in v0.0.7

func (s *Consumer) AfterRun() error

AfterRun run script after start worker

func (*Consumer) BeforeRun added in v0.0.7

func (s *Consumer) BeforeRun() error

BeforeRun run script before start worker

func (*Consumer) BusyWorkers added in v0.0.8

func (s *Consumer) BusyWorkers() uint64

func (*Consumer) Capacity added in v0.0.7

func (s *Consumer) Capacity() int

Capacity for channel

func (*Consumer) Queue added in v0.0.7

func (s *Consumer) Queue(job QueuedMessage) error

Queue send notification to queue

func (*Consumer) Run added in v0.0.7

func (s *Consumer) Run() error

Run start the worker

func (*Consumer) Shutdown added in v0.0.7

func (s *Consumer) Shutdown() error

Shutdown worker

func (*Consumer) Usage added in v0.0.7

func (s *Consumer) Usage() int

Usage for count of channel usage

type Job

type Job struct {
	Task    TaskFunc      `json:"-"`
	Timeout time.Duration `json:"timeout"`
	Body    []byte        `json:"body"`
}

Job with Timeout

func (Job) Bytes

func (j Job) Bytes() []byte

Bytes get string body

func (Job) Encode

func (j Job) Encode() []byte

type Logger

type Logger interface {
	Infof(format string, args ...interface{})
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Info(args ...interface{})
	Error(args ...interface{})
	Fatal(args ...interface{})
}

Logger interface is used throughout gorush

func NewEmptyLogger

func NewEmptyLogger() Logger

NewEmptyLogger for simple logger.

Example
l := NewEmptyLogger()
l.Info("test")
l.Infof("test")
l.Error("test")
l.Errorf("test")
l.Fatal("test")
l.Fatalf("test")
Output:

func NewLogger

func NewLogger() Logger

NewLogger for simple logger.

type Metric added in v0.0.10

type Metric interface {
	IncBusyWorker()
	DecBusyWorker()
	BusyWorkers() uint64
}

Metric interface

func NewMetric added in v0.0.11

func NewMetric() Metric

NewMetric for default metric structure

type Option

type Option func(*Options)

Option for queue system

func WithFn added in v0.0.7

func WithFn(fn func(context.Context, QueuedMessage) error) Option

WithFn set custom job function

func WithLogger

func WithLogger(l Logger) Option

WithLogger set custom logger

func WithMetric added in v0.0.10

func WithMetric(m Metric) Option

WithMetric set custom Metric

func WithQueueSize added in v0.0.7

func WithQueueSize(num int) Option

WithQueueSize set worker count

func WithTimeOut added in v0.0.9

func WithTimeOut(t time.Duration) Option

WithTimeOut set custom timeout

func WithWorker

func WithWorker(w Worker) Option

WithWorker set custom worker

func WithWorkerCount

func WithWorkerCount(num int) Option

WithWorkerCount set worker count

type Options added in v0.0.7

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions added in v0.0.7

func NewOptions(opts ...Option) *Options

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

A Queue is a message queue.

func NewPool added in v0.0.7

func NewPool(size int, opts ...Option) *Queue

func NewQueue

func NewQueue(opts ...Option) (*Queue, error)

NewQueue returns a Queue.

func (*Queue) Capacity

func (q *Queue) Capacity() int

Capacity for queue max size

func (*Queue) Queue

func (q *Queue) Queue(job QueuedMessage) error

Queue to queue all job

func (*Queue) QueueTask

func (q *Queue) QueueTask(task TaskFunc) error

QueueTask to queue job task

func (*Queue) QueueTaskWithTimeout

func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error

QueueTaskWithTimeout to queue job task with timeout

func (*Queue) QueueWithTimeout

func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error

Queue to queue all job

func (*Queue) Release added in v0.0.7

func (q *Queue) Release()

Workers returns the numbers of workers has been created.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown stops all queues.

func (*Queue) Start

func (q *Queue) Start()

Start to enable all worker

func (*Queue) Usage

func (q *Queue) Usage() int

Usage for count of queue usage

func (*Queue) Wait

func (q *Queue) Wait()

Wait all process

func (*Queue) Workers

func (q *Queue) Workers() int

Workers returns the numbers of workers has been created.

type QueuedMessage

type QueuedMessage interface {
	Bytes() []byte
}

QueuedMessage ...

type TaskFunc

type TaskFunc func(context.Context) error

TaskFunc is the task function

type Worker

type Worker interface {
	// BeforeRun is called before starting the worker
	BeforeRun() error
	// Run is called to start the worker
	Run() error
	// BeforeRun is called after starting the worker
	AfterRun() error
	// Shutdown is called if stop all worker
	Shutdown() error
	// Queue to send message in Queue (single channel, NSQ or AWS SQS)
	Queue(job QueuedMessage) error
	// Capacity queue capacity = cap(channel name)
	Capacity() int
	// Usage is how many message in queue
	Usage() int
	// BusyWorkers return count of busy worker currently
	BusyWorkers() uint64
}

Worker interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL