redis

package
v1.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2017 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package redis provides an simple, EXPERIMENTAL queuing transport backed by a redis list. Use at your own risk.

Current behavior (expected to change):

  • the outbound uses `LPUSH` to place an RPC at rest onto the redis list that's acting as a queue
  • the inbound uses the atomic `BRPOPLPUSH` operation to dequeue items and place them in a processing list
  • processing failure/success cause a permanent removal of an item

Sample usage:

client-side:
   redisOutbound := redis.NewOutbound(
       redis.NewRedis5Client(redisAddr),
       "my-queue-key",   // where to enqueue items
   )
   ...
   dispatcher := yarpc.NewDispatcher(Config{
                   ...
                   Outbounds: yarpc.Outbounds{
                     "some-service": { Oneway: redisOutbound },
                   }
               })

server-side:
   redisInbound := redis.NewInbound(
       redis.NewRedis5Client(redisAddr),
       "my-queue-key",       // where to dequeue items from
       "my-processing-key",  // where to put items while processing
       time.Second,          // wait for up to timeout, when reading queue
   )
   ...
   dispatcher := yarpc.NewDispatcher(Config{
                   Name: "some-service",
                   Inbounds: yarpc.Inbounds{ redisInbound },
                   ...
               })

From here, standard Oneway RPCs made from the client to 'some-service' will be transported to the server through a Redis queue.

USE OF THIS PACKAGE SHOULD BE FOR EXPERIMENTAL PURPOSES ONLY. BEHAVIOR IS EXPECTED TO CHANGE.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	transport.Lifecycle

	// LPush adds item to the queue
	LPush(queue string, item []byte) error
	// This MUST return an error if the blocking call does not receive an item
	// BRPopLPush moves an item from the primary queue into a processing list.
	// within the timeout.
	BRPopLPush(from, to string, timeout time.Duration) ([]byte, error)
	// LRem removes one item from the queue key
	LRem(queue string, item []byte) error

	// Endpoint returns the enpoint configured for this client.
	Endpoint() string

	// ConnectionState returns the status of the connection(s).
	ConnectionState() string
}

Client is a subset of redis commands used to manage a queue

func NewRedis5Client

func NewRedis5Client(addr string) Client

NewRedis5Client creates a new Client implementation using gopkg.in/redis.v5

type Inbound

type Inbound struct {
	// contains filtered or unexported fields
}

Inbound is a redis inbound that reads from the given queueKey. This will wait for an item in the queue or until the timout is reached before trying to read again.

func NewInbound

func NewInbound(client Client, queueKey, processingKey string, timeout time.Duration) *Inbound

NewInbound creates a redis Inbound that satisfies transport.Inbound.

queueKey - key for the queue in redis processingKey - key for the list we'll store items we've popped from the queue timeout - how long the inbound will block on reading from redis

func (*Inbound) Introspect

func (i *Inbound) Introspect() introspection.InboundStatus

Introspect returns the state of the inbound for introspection purposes.

func (*Inbound) IsRunning

func (i *Inbound) IsRunning() bool

IsRunning returns whether the inbound is still processing requests.

func (*Inbound) SetRouter

func (i *Inbound) SetRouter(router transport.Router)

SetRouter configures a router to handle incoming requests. This satisfies the transport.Inbound interface, and would be called by a dispatcher when it starts.

func (*Inbound) Start

func (i *Inbound) Start() error

Start starts the inbound, reading from the queueKey

func (*Inbound) Stop

func (i *Inbound) Stop() error

Stop ends the connection to redis

func (*Inbound) Transports

func (i *Inbound) Transports() []transport.Transport

Transports returns nil for now

func (*Inbound) WithRouter

func (i *Inbound) WithRouter(router transport.Router) *Inbound

WithRouter configures a router to handle incoming requests, as a chained method for convenience.

func (*Inbound) WithTracer

func (i *Inbound) WithTracer(tracer opentracing.Tracer) *Inbound

WithTracer configures a tracer on this inbound.

type Outbound

type Outbound struct {
	// contains filtered or unexported fields
}

Outbound is a redis OnewayOutbound that puts an RPC into the given queue key

func NewOnewayOutbound

func NewOnewayOutbound(client Client, queueKey string) *Outbound

NewOnewayOutbound creates a redis Outbound that satisfies transport.OnewayOutbound queueKey - key for the queue in redis

func (*Outbound) CallOneway

func (o *Outbound) CallOneway(ctx context.Context, req *transport.Request) (transport.Ack, error)

CallOneway makes a oneway request using redis

func (*Outbound) Introspect

func (o *Outbound) Introspect() introspection.OutboundStatus

Introspect returns basic status about this outbound.

func (*Outbound) IsRunning

func (o *Outbound) IsRunning() bool

IsRunning returns whether the Outbound is running.

func (*Outbound) Start

func (o *Outbound) Start() error

Start creates connection to the redis instance

func (*Outbound) Stop

func (o *Outbound) Stop() error

Stop stops the redis connection

func (*Outbound) Transports

func (o *Outbound) Transports() []transport.Transport

Transports returns nil for now

func (*Outbound) WithTracer

func (o *Outbound) WithTracer(tracer opentracing.Tracer) *Outbound

WithTracer configures a tracer for the outbound

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL