Documentation ¶
Index ¶
- Constants
- Variables
- func ChannelPaused(topic, channel string) healthcheck.Checker
- func HealthCheck() healthcheck.Checker
- func HighWatermark(topic, channel string, mark int) healthcheck.Checker
- func MaxNsqdConnHealthCheck(maxconns int) healthcheck.Checker
- func MultiPublish(topic string, body [][]byte) error
- func Publish(topic string, body []byte) error
- func PublishDeadLetter(topic, channel string, body []byte) error
- type DefaultGlobalSubscriber
- func (s *DefaultGlobalSubscriber) AddHandler(handler nsqlib.Handler)
- func (s *DefaultGlobalSubscriber) Connect() error
- func (s *DefaultGlobalSubscriber) Disconnect()
- func (s *DefaultGlobalSubscriber) IsStarved() bool
- func (s *DefaultGlobalSubscriber) SetConfig(option string, value interface{}) error
- func (s *DefaultGlobalSubscriber) SetMaxInFlight(v int)
- type DefaultSubscriber
- func (s *DefaultSubscriber) AddHandler(handler nsqlib.Handler)
- func (s *DefaultSubscriber) Connect() error
- func (s *DefaultSubscriber) Disconnect()
- func (s *DefaultSubscriber) IsStarved() bool
- func (s *DefaultSubscriber) SetConfig(option string, value interface{}) error
- func (s *DefaultSubscriber) SetMaxInFlight(v int)
- type HostpoolPublisher
- type MockPublisher
- type Publisher
- type Subscriber
Constants ¶
const ( HealthCheckId = "com.hailocab.service.nsq" HighWatermarkId = "com.hailocab.service.nsq.highwatermark" MaxConnCheckId = "com.hailocab.service.nsq.maxconns" )
Variables ¶
var (
ErrEmptyBody = fmt.Errorf("Attempted to publish empty body")
)
Functions ¶
func ChannelPaused ¶
func ChannelPaused(topic, channel string) healthcheck.Checker
ChannelPaused asserts that the channel is not paused
func HighWatermark ¶
func HighWatermark(topic, channel string, mark int) healthcheck.Checker
HighWatermark asserts that no individual nsqd has greater than N messages for a channel Will fail if the channel doesn't exist on at least one NSQ
func MaxNsqdConnHealthCheck ¶
func MaxNsqdConnHealthCheck(maxconns int) healthcheck.Checker
MaxNsqdConnHealthCheck asserts that the total number of established tcp connections to all nsqd's fall below a given max threshold.
func MultiPublish ¶
MultiPublish wraps DefaultPublisher.MultiPublish
func PublishDeadLetter ¶
PublishDeadLetter puts messages on the deadletter queue for a topic/channel.
Types ¶
type DefaultGlobalSubscriber ¶
type DefaultGlobalSubscriber struct {
// contains filtered or unexported fields
}
DefaultGlobalSubscriber is the default global subscriber which encapsulates local and federated DefaultSubscribers.
func (*DefaultGlobalSubscriber) AddHandler ¶
func (s *DefaultGlobalSubscriber) AddHandler(handler nsqlib.Handler)
func (*DefaultGlobalSubscriber) Connect ¶
func (s *DefaultGlobalSubscriber) Connect() error
func (*DefaultGlobalSubscriber) Disconnect ¶
func (s *DefaultGlobalSubscriber) Disconnect()
func (*DefaultGlobalSubscriber) IsStarved ¶
func (s *DefaultGlobalSubscriber) IsStarved() bool
func (*DefaultGlobalSubscriber) SetConfig ¶
func (s *DefaultGlobalSubscriber) SetConfig(option string, value interface{}) error
func (*DefaultGlobalSubscriber) SetMaxInFlight ¶
func (s *DefaultGlobalSubscriber) SetMaxInFlight(v int)
type DefaultSubscriber ¶
type DefaultSubscriber struct {
// contains filtered or unexported fields
}
func (*DefaultSubscriber) AddHandler ¶
func (s *DefaultSubscriber) AddHandler(handler nsqlib.Handler)
func (*DefaultSubscriber) Connect ¶
func (s *DefaultSubscriber) Connect() error
func (*DefaultSubscriber) Disconnect ¶
func (s *DefaultSubscriber) Disconnect()
func (*DefaultSubscriber) IsStarved ¶
func (s *DefaultSubscriber) IsStarved() bool
func (*DefaultSubscriber) SetConfig ¶
func (s *DefaultSubscriber) SetConfig(option string, value interface{}) error
func (*DefaultSubscriber) SetMaxInFlight ¶
func (s *DefaultSubscriber) SetMaxInFlight(v int)
type HostpoolPublisher ¶
HostpoolPublisher is our default publisher which gets N hosts from config and then allows us to PUB to M of them
func (*HostpoolPublisher) MultiPublish ¶
func (p *HostpoolPublisher) MultiPublish(topic string, body [][]byte) error
MultiPublish pubs X messages at once, synchronously, to N of M NSQs
type MockPublisher ¶
func (*MockPublisher) MultiPublish ¶
func (p *MockPublisher) MultiPublish(topic string, body [][]byte) error
type Publisher ¶
type Publisher interface { MultiPublish(topic string, body [][]byte) error Publish(topic string, body []byte) error }
Publisher is our wrapper round NSQ PUB for auto-config
type Subscriber ¶
type Subscriber interface { // AddHandler registers something to handle inbound messages AddHandler(handler nsqlib.Handler) // SetMaxInFlight defines how many messages NSQ should punt our way at a time SetMaxInFlight(int) // IsStarved indicates whether any connection will reach max in flight IsStarved() bool // Connect initiates the NSQ config-driven connection loop // NOTE: this should be the last thing you do Connect() error // Disconnect stops the consumer and the config loop Disconnect() // SetConfig sets a config value on the underlying NSQ consumer SetConfig(option string, value interface{}) error }
func NewDefaultGlobalSubscriber ¶
func NewDefaultGlobalSubscriber(topic, channel string) (Subscriber, error)
NewDefaultGlobalSubscriber yields a Subscriber which automatically connects to the configured (via config service) nodes providing the messages for the given topic and its federated counterpart. This allows a client to receive messages pubbed within a local region and those federated globally.
func NewDefaultSubscriber ¶
func NewDefaultSubscriber(topic string, channel string) (Subscriber, error)
NewDefaultSubscriber yields a DefaultSubscriber that automatically connects to the configured (via config service) nsqlookupds to find nodes hosting the messages for the given topic