Documentation ¶
Index ¶
- func MockChannel() (Publisher, ConsumerChannel)
- func MockFinish(channel ConsumerChannel, count uint) error
- type Consumer
- type ConsumerChannel
- type DelayRelay
- type DelayRelayFilter
- type Delivery
- type IncludeAll
- type InvertFilter
- type MockURITranslator
- type MultiPublisher
- type Publisher
- type RedisURITranslator
- type Relay
- type RelayConsumer
- type RelayFilter
- type URITranslator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MockChannel ¶
func MockChannel() (Publisher, ConsumerChannel)
func MockFinish ¶
func MockFinish(channel ConsumerChannel, count uint) error
Types ¶
type ConsumerChannel ¶
type ConsumerChannel interface { AddConsumer(Consumer) bool StartConsuming() bool StopConsuming() bool ReturnAllUnacked() int PurgeRejected() int Publisher() Publisher }
func ConsumerFromURI ¶
func ConsumerFromURI(uri string, redisClient *redis.Client) (ConsumerChannel, error)
func NewQueueConsumerChannel ¶
func NewQueueConsumerChannel(channelName string, redisClient *redis.Client) ConsumerChannel
NewQueueConsumerChannel returns a ConsumerChannel that uses Redis queues for communication. Each message delivered through this ConsumerChannel will be delivered to only one consumer, assuming the consumer Acks the message.
func NewTopicConsumerChannel ¶
func NewTopicConsumerChannel(channelName string, redisClient *redis.Client) ConsumerChannel
NewTopicConsumerChannel returns a ConsumerChannel that uses Redis PubSub for communication. Each message delivered through this consumer channel will be delivered once to each consumer. Note, however, that network issues that prevent delivery of a message may lead to messages going completely undelivered. Consumers may Ack or Reject the messages, but this is a no-op.
type DelayRelay ¶
type DelayRelay struct { *Relay // contains filtered or unexported fields }
func NewDelayRelay ¶
func NewDelayRelay(sourcePublisher Publisher, channel ConsumerChannel, publisher Publisher, sentinel string) DelayRelay
NewDelayRelay creates a DelayRelay that consumes from `channel` and relays messages to `publisher`. It also requires `sourcePublisher`, which must be able to publish messages to `channel`, and `sentinel` which should be a string that would never be a normal message from the consumer channel. Messages will pile up on the DelayRelay until DelayRelay.Flush() is called, at which point any queued messages will flush to `publisher`.
Note: `channel` should be a queue based channel, not a topic based channel. Topic based channels process messages in parallel, and thus won't block until Flush() is called
func (*DelayRelay) Flush ¶
func (relay *DelayRelay) Flush()
type DelayRelayFilter ¶
type DelayRelayFilter struct {
// contains filtered or unexported fields
}
func (*DelayRelayFilter) Filter ¶
func (filter *DelayRelayFilter) Filter(delivery Delivery) bool
type IncludeAll ¶
type IncludeAll struct {
// contains filtered or unexported fields
}
func (*IncludeAll) Filter ¶
func (filter *IncludeAll) Filter(delivery Delivery) bool
type InvertFilter ¶
type InvertFilter struct {
Subfilter RelayFilter
}
func (*InvertFilter) Filter ¶
func (filter *InvertFilter) Filter(delivery Delivery) bool
type MockURITranslator ¶
type MockURITranslator struct {
// contains filtered or unexported fields
}
func (*MockURITranslator) ConsumerFromURI ¶
func (rut *MockURITranslator) ConsumerFromURI(uri string) (ConsumerChannel, error)
func (*MockURITranslator) PublisherFromURI ¶
func (rut *MockURITranslator) PublisherFromURI(uri string) (Publisher, error)
type MultiPublisher ¶
type MultiPublisher []Publisher
func (MultiPublisher) Publish ¶
func (mp MultiPublisher) Publish(payload string) bool
type Publisher ¶
func MockPublisher ¶
func NewRedisQueuePublisher ¶
func NewRedisTopicPublisher ¶
func PublisherFromURI ¶
type RedisURITranslator ¶
type RedisURITranslator struct {
// contains filtered or unexported fields
}
func (*RedisURITranslator) ConsumerFromURI ¶
func (rut *RedisURITranslator) ConsumerFromURI(uri string) (ConsumerChannel, error)
func (*RedisURITranslator) PublisherFromURI ¶
func (rut *RedisURITranslator) PublisherFromURI(uri string) (Publisher, error)
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
func NewRelay ¶
func NewRelay(channel ConsumerChannel, publishers []Publisher, filter RelayFilter, concurrency int) Relay
type RelayConsumer ¶
type RelayConsumer struct {
// contains filtered or unexported fields
}
func (*RelayConsumer) Consume ¶
func (consumer *RelayConsumer) Consume(delivery Delivery)
type RelayFilter ¶
RelayFilter objects provide a predicate function to determine whether a message should be passed to the next stage
type URITranslator ¶
type URITranslator interface { ConsumerFromURI(string) (ConsumerChannel, error) PublisherFromURI(string) (Publisher, error) }
func NewMockURITranslator ¶
func NewMockURITranslator(redisClient *redis.Client) URITranslator
func NewRedisURITranslator ¶
func NewRedisURITranslator(redisClient *redis.Client) URITranslator