queue

package
v2.1.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2019 License: BSD-3-Clause Imports: 9 Imported by: 0

README

Description

Queue Lib for glib, internal support Redis Only.

Install

go get github.com/carltd/glib/v2/queue

Usage

Publisher
import (
    "github.com/carltd/glib/v2/queue/message"
    
    "github.com/carltd/glib/v2/queue"
    _ "github.com/carltd/glib/v2/queue/queue_redis"
)

pub, _ := queue.NewPublisher("redis", "redis://127.0.0.1:6379")
pub.Publish("subject", &message.Message{
    Body: util.MustMessageBody(nil, /* point to your protobuffer struct */ ),
})
pub.Close()
Consumer
import (
    "github.com/carltd/glib/v2/queue/message"
    
    "github.com/carltd/glib/v2/queue"
    _ "github.com/carltd/glib/v2/queue/queue_redis"
)

con, _ := queue.NewConsumer("redis", "redis://127.0.0.1:6379")
sub,_ := con.Subscribe("subject", "cluster-group")
msg, _ := sub.NextMessage(time.Second)
// logic for msg
con.Close()

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrTimeout = errors.New("consumer get message timeout")

Functions

func Drivers

func Drivers() []string

Drivers returns a sorted list of the names of the registered drivers.

func Register

func Register(name string, d driver)

Register makes a queue driver available by the provided name. If Register is called twice with the same name or if driver is nil, it panics.

Types

type Consumer

type Consumer interface {
	// Unicast mode
	Dequeue(subject, group string, timeout time.Duration, msg proto.Message) (*message.Meta, error)
	// Broadcast mode
	Subscribe(subject, group string) (Subscriber, error)
	io.Closer
}

func NewConsumer

func NewConsumer(driverName, queueAddrs string) (Consumer, error)
Example
package main

import (
	"log"
	"strings"
	"time"

	"github.com/carltd/glib/v2/queue"
	"github.com/carltd/glib/v2/queue/util"

	_ "github.com/carltd/glib/v2/queue/queue_redis"
)

func main() {
	c, err := queue.NewConsumer("redis", "redis://:123456@localhost")
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	sub, err := c.Subscribe("subject", "appName")
	if err != nil {
		log.Fatal(err)
	}

	msg, err := sub.NextMessage(time.Second)
	if err != nil {
		if !strings.Contains(err.Error(), "timeout") {
			log.Fatal(err)
		}
		// retry?
	}

	// t is the message's publish time
	t, _ := util.TimestampFromMessageID(msg.MessageId)
	_ = t

	_ = msg
	// msg.MessageId
	// msg.Priority
	// msg.Options    some options for the msg
	// msg.Body
	// ptypes.UnmarshalAny(msg.Body, &YourProtoBufferStructPoint)
}
Output:

type Publisher

type Publisher interface {
	// Unicast mode
	Enqueue(subject string, msg *message.Message) error
	// Broadcast mode
	Publish(subject string, msg *message.Message) error
	io.Closer
}

func NewPublisher

func NewPublisher(driverName, queueAddrs string) (Publisher, error)
Example
package main

import (
	"log"

	"github.com/carltd/glib/v2/queue"
	"github.com/carltd/glib/v2/queue/message"

	_ "github.com/carltd/glib/v2/queue/queue_redis"
)

func main() {
	pub, err := queue.NewPublisher("redis", "redis://:123456@localhost")
	if err != nil {
		log.Fatal(err)
	}
	defer pub.Close()

	err = pub.Publish("subject", &message.Message{})
	if err != nil {
		log.Fatal(err)
	}
}
Output:

type Subscriber

type Subscriber interface {
	NextMessage(timeout time.Duration) (*message.Message, error)

	// get next message with context(Deadline or Timeout)
	NextMessageWithContext(ctx context.Context) (*message.Message, error)
	io.Closer
}

Directories

Path Synopsis
Package message is a generated protocol buffer package.
Package message is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL