nsq

package
v0.0.0-...-cbea63e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2021 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HealthCheckId   = "com.hailocab.service.nsq"
	HighWatermarkId = "com.hailocab.service.nsq.highwatermark"
	MaxConnCheckId  = "com.hailocab.service.nsq.maxconns"
)

Variables

View Source
var (
	ErrEmptyBody = fmt.Errorf("Attempted to publish empty body")
)

Functions

func ChannelPaused

func ChannelPaused(topic, channel string) healthcheck.Checker

ChannelPaused asserts that the channel is not paused

func HealthCheck

func HealthCheck() healthcheck.Checker

HealthCheck asserts we can PUB to NSQ

func HighWatermark

func HighWatermark(topic, channel string, mark int) healthcheck.Checker

HighWatermark asserts that no individual nsqd has greater than N messages for a channel Will fail if the channel doesn't exist on at least one NSQ

func MaxNsqdConnHealthCheck

func MaxNsqdConnHealthCheck(maxconns int) healthcheck.Checker

MaxNsqdConnHealthCheck asserts that the total number of established tcp connections to all nsqd's fall below a given max threshold.

func MultiPublish

func MultiPublish(topic string, body [][]byte) error

MultiPublish wraps DefaultPublisher.MultiPublish

func Publish

func Publish(topic string, body []byte) error

Publish wraps DefaultPublisher.Publish

func PublishDeadLetter

func PublishDeadLetter(topic, channel string, body []byte) error

PublishDeadLetter puts messages on the deadletter queue for a topic/channel.

Types

type DefaultGlobalSubscriber

type DefaultGlobalSubscriber struct {
	// contains filtered or unexported fields
}

DefaultGlobalSubscriber is the default global subscriber which encapsulates local and federated DefaultSubscribers.

func (*DefaultGlobalSubscriber) AddHandler

func (s *DefaultGlobalSubscriber) AddHandler(handler nsqlib.Handler)

func (*DefaultGlobalSubscriber) AddHandlers

func (s *DefaultGlobalSubscriber) AddHandlers(handler nsqlib.Handler)

func (*DefaultGlobalSubscriber) Connect

func (s *DefaultGlobalSubscriber) Connect() error

func (*DefaultGlobalSubscriber) Disconnect

func (s *DefaultGlobalSubscriber) Disconnect()

func (*DefaultGlobalSubscriber) IsStarved

func (s *DefaultGlobalSubscriber) IsStarved() bool

func (*DefaultGlobalSubscriber) SetConfig

func (s *DefaultGlobalSubscriber) SetConfig(option string, value interface{}) error

func (*DefaultGlobalSubscriber) SetMaxInFlight

func (s *DefaultGlobalSubscriber) SetMaxInFlight(v int)

type DefaultSubscriber

type DefaultSubscriber struct {
	// contains filtered or unexported fields
}

func (*DefaultSubscriber) AddHandler

func (s *DefaultSubscriber) AddHandler(handler nsqlib.Handler)

func (*DefaultSubscriber) AddHandlers

func (s *DefaultSubscriber) AddHandlers(handler nsqlib.Handler)

func (*DefaultSubscriber) Connect

func (s *DefaultSubscriber) Connect() error

func (*DefaultSubscriber) Disconnect

func (s *DefaultSubscriber) Disconnect()

func (*DefaultSubscriber) IsStarved

func (s *DefaultSubscriber) IsStarved() bool

func (*DefaultSubscriber) SetConfig

func (s *DefaultSubscriber) SetConfig(option string, value interface{}) error

func (*DefaultSubscriber) SetMaxInFlight

func (s *DefaultSubscriber) SetMaxInFlight(v int)

type HostpoolPublisher

type HostpoolPublisher struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

HostpoolPublisher is our default publisher which gets N hosts from config and then allows us to PUB to M of them

func (*HostpoolPublisher) MultiPublish

func (p *HostpoolPublisher) MultiPublish(topic string, body [][]byte) error

MultiPublish pubs X messages at once, synchronously, to N of M NSQs

func (*HostpoolPublisher) Publish

func (publisher *HostpoolPublisher) Publish(topic string, body []byte) error

Publish will PUB a message to N of M NSQs

type MockPublisher

type MockPublisher struct {
	mock.Mock
}

func (*MockPublisher) MultiPublish

func (p *MockPublisher) MultiPublish(topic string, body [][]byte) error

func (*MockPublisher) Publish

func (p *MockPublisher) Publish(topic string, body []byte) error

type Publisher

type Publisher interface {
	MultiPublish(topic string, body [][]byte) error
	Publish(topic string, body []byte) error
}

Publisher is our wrapper round NSQ PUB for auto-config

var DefaultPublisher Publisher = &HostpoolPublisher{
	producers: make(map[string]*nsqlib.Producer),
	hostpool:  hostpool.New([]string{}),
}

DefaultPublisher is a default implementation of the Publisher interface. It is an instance of a HostpoolPublisher

type Subscriber

type Subscriber interface {
	// AddHandler registers something to handle inbound messages
	AddHandler(handler nsqlib.Handler)

	// AddHandlers registers something to handle inbound messages. This function uses
	// config to determine how many handlers should be started.
	AddHandlers(handler nsqlib.Handler)

	// SetMaxInFlight defines how many messages NSQ should punt our way at a time
	SetMaxInFlight(int)

	// IsStarved indicates whether any connection will reach max in flight
	IsStarved() bool

	// Connect initiates the NSQ config-driven connection loop
	// NOTE: this should be the last thing you do
	Connect() error

	// Disconnect stops the consumer and the config loop
	Disconnect()

	// SetConfig sets a config value on the underlying NSQ consumer
	SetConfig(option string, value interface{}) error
}

func NewDefaultGlobalSubscriber

func NewDefaultGlobalSubscriber(topic, channel string) (Subscriber, error)

NewDefaultGlobalSubscriber yields a Subscriber which automatically connects to the configured (via config service) nodes providing the messages for the given topic and its federated counterpart. This allows a client to receive messages pubbed within a local region and those federated globally.

func NewDefaultSubscriber

func NewDefaultSubscriber(topic string, channel string) (Subscriber, error)

NewDefaultSubscriber yields a DefaultSubscriber that automatically connects to the configured (via config service) nsqlookupds to find nodes hosting the messages for the given topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL