Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTopicWithChannelNotFound for error when channel and topic is not found ErrTopicWithChannelNotFound = errors.New("nsq: topic and channel not found") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer for nsq
func WrapConsumers ¶
func WrapConsumers(config ConsumerConfig, backends ...ConsumerBackend) (*Consumer, error)
WrapConsumers of gonsq
func (*Consumer) Handle ¶
func (c *Consumer) Handle(topic, channel string, handler HandlerFunc)
Handle the consumer
func (*Consumer) Use ¶
func (c *Consumer) Use(middleware ...MiddlewareFunc)
Use the middleware use should be called before handle function this function will avoid to add the same middleware twice if the same middleware is used, it will skip the addition
type ConsumerBackend ¶
type ConsumerBackend interface { Topic() string Channel() string Stop() AddHandler(handler gonsq.Handler) AddConcurrentHandlers(handler gonsq.Handler, concurrency int) ConnectToNSQLookupds(addresses []string) error ChangeMaxInFlight(n int) Concurrency() int BufferMultiplier() int }
ConsumerBackend for NSQ
type ConsumerConfig ¶
type ConsumerConfig struct { LookupdsAddr []string // Concurrency is the number of worker intended for handling message from nsq Concurrency int // BufferMultiplier is the multiplier factor of concurrency to set the size of buffer when consuming message // the size of buffer multiplier is number of message being consumed before the buffer will be half full. // For example, 20(default value) buffer multiplier means the worker is able to consume more than 10 message, // before the buffer is half full from the nsqd message consumption. // To fill this configuration correctly, it is needed to observe the consumption rate of the message and the handling rate of the worker. BufferMultiplier int }
ConsumerConfig to supply WrapConsumers
func (*ConsumerConfig) Validate ¶
func (cg *ConsumerConfig) Validate() error
Validate consumer configuration
type HandlerFunc ¶
HandlerFunc for nsq
func Metrics ¶
func Metrics(handler HandlerFunc) HandlerFunc
Metrics middleware for nsq metrics that might be missleading: - throttled - message_in_buffer The metrics might be missleading because the message is not processed in ordered manner.
type Info ¶
type Info struct { WorkerTotal int WorkerCurrent int MessageInBuffer int ThrottleFlag int Throttled int }
Info for message
type Message ¶
Message for nsq
func (*Message) RequeueWithoutBackoff ¶
RequeueWithoutBackoff call gonsq message requeueWithoutBackoff
type MiddlewareFunc ¶
type MiddlewareFunc func(handler HandlerFunc) HandlerFunc
MiddlewareFunc for nsq middleware
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer for nsq
func WrapProducer ¶
func WrapProducer(backend ProducerBackend, topics ...string) *Producer
WrapProducer is a function to wrap the nsq producer
func (*Producer) MultiPublish ¶
MultiPublish message to nsqd
type ProducerBackend ¶
type ProducerBackend interface { Ping() error Publish(topic string, body []byte) error MultiPublish(topic string, body [][]byte) error Stop() }
ProducerBackend for NSQ
type ThrottleMiddleware ¶
type ThrottleMiddleware struct { // TimeDelay means the duration of time to pause message consumption TimeDelay time.Duration }
ThrottleMiddleware implement MiddlewareFunc
func (*ThrottleMiddleware) Throttle ¶
func (tm *ThrottleMiddleware) Throttle(handler HandlerFunc) HandlerFunc
Throttle middleware for nsq. This middleware check whether there is some information about throttling in the message.