Documentation ¶
Index ¶
- type CompressionConfig
- type Config
- type ConsumerConfig
- type LookupdConfig
- type NSQConsumer
- func (c *NSQConsumer) AddConcurrentHandlers(handler nsqio.Handler, concurrency int)
- func (c *NSQConsumer) AddHandler(handler nsqio.Handler)
- func (c *NSQConsumer) BufferMultiplier() int
- func (c *NSQConsumer) ChangeMaxInFlight(n int)
- func (c *NSQConsumer) Channel() string
- func (c *NSQConsumer) Concurrency() int
- func (c *NSQConsumer) ConnectToNSQLookupds(addresses []string) error
- func (c *NSQConsumer) Stop()
- func (c *NSQConsumer) Topic() string
- type NSQProducer
- type ProducerConfig
- type QueueConfig
- type TimeoutConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CompressionConfig ¶
type CompressionConfig struct { Deflate bool `toml:"deflate" yaml:"deflate"` DeflateLevel int `toml:"deflate_level" yaml:"deflate_level"` Snappy bool `toml:"snappy" yaml:"snappy"` }
CompressionConfig to support compression
type Config ¶
type Config struct { Hostname string Lookupd LookupdConfig Timeout TimeoutConfig Queue QueueConfig Compression CompressionConfig }
Config of nsqio
type ConsumerConfig ¶
type ConsumerConfig struct { Hostname string Topic string Channel string Lookupd LookupdConfig Timeout TimeoutConfig Queue QueueConfig Compression CompressionConfig Concurrency int // BufferMultiplier means the length of the buffer per concurrent worker // is the multiplier factor of concurrency to set the size of buffer when consuming message // the size of buffer multiplier is number of message being consumed before the buffer will be half full // for example, 20(default value) buffer multiplier means the worker is able to consume more than 10 message // before the buffer is half full from the nsqd message consumption. // To fill this configuration correctly, it is needed to observe the consumption rate of the message and the handling rate of the worker. BufferMultiplier int }
ConsumerConfig for nsq consumer
func (*ConsumerConfig) Validate ¶
func (cf *ConsumerConfig) Validate() error
Validate consumer configuration
type LookupdConfig ¶
type LookupdConfig struct { PoolInterval time.Duration `toml:"pool_interval" yaml:"pool_interval"` PollJitter float64 `toml:"pool_jitter" yaml:"pool_jitter"` }
LookupdConfig for lookupd configuration
type NSQConsumer ¶
type NSQConsumer struct {
// contains filtered or unexported fields
}
NSQConsumer backend
func NewConsumer ¶
func NewConsumer(ctx context.Context, config ConsumerConfig) (*NSQConsumer, error)
NewConsumer for nsq
func (*NSQConsumer) AddConcurrentHandlers ¶
func (c *NSQConsumer) AddConcurrentHandlers(handler nsqio.Handler, concurrency int)
AddConcurrentHandlers add concurrent handler to nsq
func (*NSQConsumer) AddHandler ¶
func (c *NSQConsumer) AddHandler(handler nsqio.Handler)
AddHandler to nsq
func (*NSQConsumer) BufferMultiplier ¶
func (c *NSQConsumer) BufferMultiplier() int
BufferMultiplier return the buffer multiplier number for a given consumer
func (*NSQConsumer) ChangeMaxInFlight ¶
func (c *NSQConsumer) ChangeMaxInFlight(n int)
ChangeMaxInFlight will change max in flight number in nsq consumer
func (*NSQConsumer) Channel ¶
func (c *NSQConsumer) Channel() string
Channel return the channel of consumer
func (*NSQConsumer) Concurrency ¶
func (c *NSQConsumer) Concurrency() int
Concurrency return the concurrency number for a given consumer
func (*NSQConsumer) ConnectToNSQLookupds ¶
func (c *NSQConsumer) ConnectToNSQLookupds(addresses []string) error
ConnectToNSQLookupds connecting to several nsq lookupd
type NSQProducer ¶
type NSQProducer struct {
// contains filtered or unexported fields
}
NSQProducer backend
func NewProducer ¶
func NewProducer(ctx context.Context, config ProducerConfig) (*NSQProducer, error)
NewProducer return a new producer
func (*NSQProducer) MultiPublish ¶
func (np *NSQProducer) MultiPublish(topic string, body [][]byte) error
MultiPublish to nsqd
type ProducerConfig ¶
type ProducerConfig struct { Hostname string Address string Compression CompressionConfig Timeout TimeoutConfig }
ProducerConfig struct
type QueueConfig ¶
type QueueConfig struct { MaxInFlight int `toml:"max_in_flight" yaml:"max_in_flight"` MsgTimeout time.Duration `toml:"message_timeout" yaml:"message_timeout"` MaxRequeueDelay time.Duration `toml:"max_requeue_delay" yaml:"max_requeue_delay"` DefaultRequeueDelay time.Duration `toml:"default_requeue_delay" yaml:"default_requeue_delay"` }
QueueConfig for message configuration