Documentation ¶
Overview ¶
Package redis provides an simple, EXPERIMENTAL queuing transport backed by a redis list. Use at your own risk.
Current behavior (expected to change):
- the outbound uses `LPUSH` to place an RPC at rest onto the redis list that's acting as a queue
- the inbound uses the atomic `BRPOPLPUSH` operation to dequeue items and place them in a processing list
- processing failure/success cause a permanent removal of an item
Sample usage:
client-side: redisOutbound := redis.NewOutbound( redis.NewRedis5Client(redisAddr), "my-queue-key", // where to enqueue items ) ... dispatcher := yarpc.NewDispatcher(Config{ ... Outbounds: yarpc.Outbounds{ "some-service": { Oneway: redisOutbound }, } }) server-side: redisInbound := redis.NewInbound( redis.NewRedis5Client(redisAddr), "my-queue-key", // where to dequeue items from "my-processing-key", // where to put items while processing time.Second, // wait for up to timeout, when reading queue ) ... dispatcher := yarpc.NewDispatcher(Config{ Name: "some-service", Inbounds: yarpc.Inbounds{ redisInbound }, ... })
From here, standard Oneway RPCs made from the client to 'some-service' will be transported to the server through a Redis queue.
USE OF THIS PACKAGE SHOULD BE FOR EXPERIMENTAL PURPOSES ONLY. BEHAVIOR IS EXPECTED TO CHANGE.
Index ¶
- type Client
- type Inbound
- func (i *Inbound) Introspect() introspection.InboundStatus
- func (i *Inbound) IsRunning() bool
- func (i *Inbound) SetRouter(router transport.Router)
- func (i *Inbound) Start() error
- func (i *Inbound) Stop() error
- func (i *Inbound) Transports() []transport.Transport
- func (i *Inbound) WithRouter(router transport.Router) *Inbound
- func (i *Inbound) WithTracer(tracer opentracing.Tracer) *Inbound
- type Outbound
- func (o *Outbound) CallOneway(ctx context.Context, req *transport.Request) (transport.Ack, error)
- func (o *Outbound) Introspect() introspection.OutboundStatus
- func (o *Outbound) IsRunning() bool
- func (o *Outbound) Start() error
- func (o *Outbound) Stop() error
- func (o *Outbound) Transports() []transport.Transport
- func (o *Outbound) WithTracer(tracer opentracing.Tracer) *Outbound
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { transport.Lifecycle // LPush adds item to the queue LPush(queue string, item []byte) error // This MUST return an error if the blocking call does not receive an item // BRPopLPush moves an item from the primary queue into a processing list. // within the timeout. BRPopLPush(from, to string, timeout time.Duration) ([]byte, error) // LRem removes one item from the queue key LRem(queue string, item []byte) error // Endpoint returns the enpoint configured for this client. Endpoint() string // ConnectionState returns the status of the connection(s). ConnectionState() string }
Client is a subset of redis commands used to manage a queue
func NewRedis5Client ¶
NewRedis5Client creates a new Client implementation using gopkg.in/redis.v5
type Inbound ¶
type Inbound struct {
// contains filtered or unexported fields
}
Inbound is a redis inbound that reads from the given queueKey. This will wait for an item in the queue or until the timout is reached before trying to read again.
func NewInbound ¶
NewInbound creates a redis Inbound that satisfies transport.Inbound.
queueKey - key for the queue in redis processingKey - key for the list we'll store items we've popped from the queue timeout - how long the inbound will block on reading from redis
func (*Inbound) Introspect ¶
func (i *Inbound) Introspect() introspection.InboundStatus
Introspect returns the state of the inbound for introspection purposes.
func (*Inbound) SetRouter ¶
SetRouter configures a router to handle incoming requests. This satisfies the transport.Inbound interface, and would be called by a dispatcher when it starts.
func (*Inbound) Transports ¶
Transports returns nil for now
func (*Inbound) WithRouter ¶
WithRouter configures a router to handle incoming requests, as a chained method for convenience.
func (*Inbound) WithTracer ¶
WithTracer configures a tracer on this inbound.
type Outbound ¶
type Outbound struct {
// contains filtered or unexported fields
}
Outbound is a redis OnewayOutbound that puts an RPC into the given queue key
func NewOnewayOutbound ¶
NewOnewayOutbound creates a redis Outbound that satisfies transport.OnewayOutbound queueKey - key for the queue in redis
func (*Outbound) CallOneway ¶
CallOneway makes a oneway request using redis
func (*Outbound) Introspect ¶
func (o *Outbound) Introspect() introspection.OutboundStatus
Introspect returns basic status about this outbound.
func (*Outbound) Transports ¶
Transports returns nil for now
func (*Outbound) WithTracer ¶
WithTracer configures a tracer for the outbound