redisdb

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2023 License: MIT Imports: 10 Imported by: 1

README

redis

Run Testing codecov Go Report Card

Redis Pub/Sub as backend for Queue package

Setup

start the redis server

redis-server

screen

start the redis cluster, see the config

# server 01
mkdir server01 && cd server01 && redis-server redis.conf --port 6379
# server 02
mkdir server02 && cd server02 && redis-server redis.conf --port 6380

Example

For single server

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/redisdb"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := redisdb.NewWorker(
    redisdb.WithAddr("127.0.0.1:6379"),
    redisdb.WithChannel("foobar"),
    redisdb.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    5,
    queue.WithWorker(w),
  )

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Fatal(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Testing

go test -v ./...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*options)

Option for queue system

func WithAddr

func WithAddr(addr string) Option

WithAddr setup the addr of redis

func WithChannel

func WithChannel(channel string) Option

WithChannel setup the channel of redis

func WithChannelSize

func WithChannelSize(size int) Option

WithChannelSize redis channel size

func WithCluster added in v0.0.4

func WithCluster(enable bool) Option

WithCluster redis cluster

func WithConnectionString

func WithConnectionString(connectionString string) Option

WithConnectionString redis connection string

func WithDB

func WithDB(db int) Option

WithPassword redis password

func WithLogger

func WithLogger(l queue.Logger) Option

WithLogger set custom logger

func WithMasterName added in v0.1.0

func WithMasterName(masterName string) Option

WithMasterName sentinel master name

func WithPassword

func WithPassword(passwd string) Option

WithPassword redis password

func WithRunFunc

func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option

WithRunFunc setup the run func of queue

func WithSentinel added in v0.1.0

func WithSentinel(enable bool) Option

WithSentinel redis sentinel

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker for Redis

func NewWorker

func NewWorker(opts ...Option) *Worker

NewWorker creates a new Worker instance with the provided options. It initializes a Redis client based on the options and establishes a connection to the Redis server. The Worker is responsible for subscribing to a Redis channel and receiving messages from it. It returns the created Worker instance.

func (*Worker) Queue

func (w *Worker) Queue(job core.QueuedMessage) error

Queue send notification to queue

func (*Worker) Request added in v0.0.8

func (w *Worker) Request() (core.QueuedMessage, error)

Request a new task

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error

Run to execute new task

func (*Worker) Shutdown

func (w *Worker) Shutdown() error

Shutdown worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL