Documentation ¶
Overview ¶
nsq is the official Go package for https://github.com/bitly/nsq
It provides the building blocks for developing applications on the NSQ platform in Go.
Low-level functions and types are provided to communicate over the NSQ protocol as well as a high-level Reader library to implement robust consumers.
Index ¶
- Constants
- Variables
- func ApiRequest(endpoint string) (*simplejson.Json, error)
- func IsValidChannelName(name string) bool
- func IsValidTopicName(name string) bool
- func NewDeadlineTransport(timeout time.Duration) *http.Transport
- func ReadResponse(r io.Reader) ([]byte, error)
- func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error)
- func SendResponse(w io.Writer, data []byte) (int, error)
- func UnpackResponse(response []byte) (int32, []byte, error)
- type AsyncHandler
- type ChildError
- type ClientErr
- type Command
- func Finish(id MessageID) *Command
- func Identify(js map[string]interface{}) (*Command, error)
- func MultiPublish(topic string, bodies [][]byte) (*Command, error)
- func Nop() *Command
- func Ping() *Command
- func Publish(topic string, body []byte) *Command
- func Ready(count int) *Command
- func Register(topic string, channel string) *Command
- func Requeue(id MessageID, timeoutMs int) *Command
- func StartClose() *Command
- func Subscribe(topic string, channel string) *Command
- func Touch(id MessageID) *Command
- func UnRegister(topic string, channel string) *Command
- type FailedMessageLogger
- type FatalClientErr
- type FinishedMessage
- type Handler
- type LookupPeer
- type Message
- type MessageID
- type PeerInfo
- type Protocol
- type Reader
- func (q *Reader) AddAsyncHandler(handler AsyncHandler)
- func (q *Reader) AddHandler(handler Handler)
- func (q *Reader) ConnectToLookupd(addr string) error
- func (q *Reader) ConnectToNSQ(addr string) error
- func (q *Reader) ConnectionMaxInFlight() int64
- func (q *Reader) IsStarved() bool
- func (q *Reader) MaxInFlight() int
- func (q *Reader) SetMaxBackoffDuration(duration time.Duration)
- func (q *Reader) SetMaxInFlight(maxInFlight int)
- func (q *Reader) Stop()
- type Writer
- func (w *Writer) MultiPublish(topic string, body [][]byte) (int32, []byte, error)
- func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, ...) error
- func (w *Writer) Publish(topic string, body []byte) (int32, []byte, error)
- func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, ...) error
- func (w *Writer) Stop()
- func (w *Writer) String() string
- type WriterTransaction
Constants ¶
const ( // when successful FrameTypeResponse int32 = 0 // when an error occurred FrameTypeError int32 = 1 // when it's a serialized message FrameTypeMessage int32 = 2 )
const ( StateInit = iota StateDisconnected StateConnected StateSubscribed // close has started. responses are ok, but no new messages will be sent StateClosing )
const DefaultClientTimeout = 60 * time.Second
The amount of time nsqd will allow a client to idle, can be overriden
const MsgIdLength = 16
The number of bytes for a Message.Id
const VERSION = "0.3.2"
Variables ¶
var ErrAlreadyConnected = errors.New("already connected")
returned from ConnectToNSQ() when already connected
var ErrNotConnected = errors.New("not connected")
var ErrOverMaxInFlight = errors.New("over configure max-inflight")
return from updateRdy if over max-in-flight
var ErrStopped = errors.New("stopped")
var MagicV1 = []byte(" V1")
var MagicV2 = []byte(" V2")
Functions ¶
func ApiRequest ¶
ApiRequest is a helper function to perform an HTTP request and parse our NSQ daemon's expected response format, with deadlines.
{"status_code":200, "status_txt":"OK", "data":{...}}
func IsValidChannelName ¶
IsValidChannelName checks a channel name for correctness
func IsValidTopicName ¶
IsValidTopicName checks a topic name for correctness
func NewDeadlineTransport ¶ added in v0.2.16
A custom http.Transport with support for deadline timeouts
func ReadResponse ¶
ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... size data
func SendFramedResponse ¶ added in v0.2.16
SendFramedResponse is a server side utility function to prefix data with a length header and frame header and write to the supplied Writer
func SendResponse ¶
SendResponse is a server side utility function to prefix data with a length header and write to the supplied Writer
func UnpackResponse ¶
UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... frame ID data
Returns a triplicate of: frame type, data ([]byte), error
Types ¶
type AsyncHandler ¶
type AsyncHandler interface {
HandleMessage(message *Message, responseChannel chan *FinishedMessage)
}
AsyncHandler is the asynchronous interface to Reader.
Implement this interface for handlers that wish to defer responding until later. This is particularly useful if you want to batch work together.
An AsyncHandler must send:
&FinishedMessage{messageID, requeueDelay, true|false}
To the supplied responseChannel to indicate that a message is processed.
type ChildError ¶ added in v0.2.17
type ChildError interface {
Parent() error
}
type ClientErr ¶
ClientErr provides a way for NSQ daemons to log a human reabable error string and return a machine readable string to the client.
see docs/protocol.md for error codes by command
func NewClientErr ¶
NewClientErr creates a ClientErr with the supplied human and machine readable strings
type Command ¶
Command represents a command from a client to an NSQ daemon
func Finish ¶
Finish creates a new Command to indiciate that a given message (by id) has been processed successfully
func Identify ¶
Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.
The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.
nsqd currently supports the following keys:
short_id - short identifier, typically client's short hosname long_id - long identifier, typically client's long hostname buffer_size - size in bytes for nsqd to buffer before writing to the wire for this client
nsqlookupd currently supports the following keys:
version - the version of the nsqd peer tcp_port - the nsqd port where TCP clients can connect http_port - the nsqd port where HTTP clients can connect address - the address where clients can connect (generally DNS resolvable hostname)
func MultiPublish ¶ added in v0.2.16
MultiPublish creates a new Command to write more than one message to a given topic. This is useful for high-throughput situations to avoid roundtrips and saturate the pipe.
func Nop ¶
func Nop() *Command
Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats
func Ping ¶
func Ping() *Command
Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client
func Ready ¶
Ready creates a new Command to specify the number of messages a client is willing to receive
func Requeue ¶
Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given timeout (in ms) NOTE: a timeout of 0 indicates immediate requeue
func StartClose ¶
func StartClose() *Command
StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection
func Touch ¶ added in v0.2.17
Touch creates a new Command to reset the timeout for a given message (by id)
func UnRegister ¶
Unregister creates a new Command to remove a topic/channel for the connected nsqd
type FailedMessageLogger ¶
type FailedMessageLogger interface {
LogFailedMessage(message *Message)
}
FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Reader specified MaxAttemptCount)
type FatalClientErr ¶ added in v0.2.17
func NewFatalClientErr ¶ added in v0.2.17
func NewFatalClientErr(parent error, code string, description string) *FatalClientErr
NewClientErr creates a ClientErr with the supplied human and machine readable strings
func (*FatalClientErr) Error ¶ added in v0.2.17
func (e *FatalClientErr) Error() string
Error returns the machine readable form
func (*FatalClientErr) Parent ¶ added in v0.2.17
func (e *FatalClientErr) Parent() error
Parent returns the parent error
type FinishedMessage ¶
FinishedMessage is the data type used over responseChannel in AsyncHandlers
type Handler ¶
Handler is the synchronous interface to Reader.
Implement this interface for handlers that return whether or not message processing completed successfully.
When the return value is nil Reader will automatically handle FINishing.
When the returned value is non-nil Reader will automatically handle REQueing.
type LookupPeer ¶
type LookupPeer struct { Info PeerInfo // contains filtered or unexported fields }
LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
A LookupPeer instance is designed to connect lazily to nsqlookupd and reconnect gracefully (i.e. it is all handled by the library). Clients can simply use the Command interface to perform a round-trip.
func NewLookupPeer ¶
func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer
NewLookupPeer creates a new LookupPeer instance connecting to the supplied address.
The supplied connectCallback will be called *every* time the instance connects.
func (*LookupPeer) Close ¶
func (lp *LookupPeer) Close() error
Close implements the io.Closer interface
func (*LookupPeer) Command ¶
func (lp *LookupPeer) Command(cmd *Command) ([]byte, error)
Command performs a round-trip for the specified Command.
It will lazily connect to nsqlookupd and gracefully handle reconnecting in the event of a failure.
It returns the response from nsqlookupd as []byte
func (*LookupPeer) Connect ¶
func (lp *LookupPeer) Connect() error
Connect will Dial the specified address, with timeouts
func (*LookupPeer) Read ¶
func (lp *LookupPeer) Read(data []byte) (int, error)
Read implements the io.Reader interface, adding deadlines
func (*LookupPeer) String ¶
func (lp *LookupPeer) String() string
String returns the specified address
type Message ¶
Message is the fundamental data type containing the id, body, and metadata
func DecodeMessage ¶
DecodeMessage deseralizes data (as []byte) and creates a new Message
func NewMessage ¶
NewMessage creates a Message, initializes some metadata, and returns a pointer
func (*Message) EncodeBytes ¶
EncodeBytes serializes the message into a new, returned, []byte
type MessageID ¶ added in v0.2.16
type MessageID [MsgIdLength]byte
type PeerInfo ¶
type PeerInfo struct { TcpPort int `json:"tcp_port"` HttpPort int `json:"http_port"` Version string `json:"version"` Address string `json:"address"` //TODO: remove for 1.0 BroadcastAddress string `json:"broadcast_address"` }
PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable)
type Reader ¶
type Reader struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms MessagesReceived uint64 // an atomic counter - # of messages received MessagesFinished uint64 // an atomic counter - # of messages FINished MessagesRequeued uint64 // an atomic counter - # of messages REQueued sync.RWMutex TopicName string // name of topic to subscribe to ChannelName string // name of channel to subscribe to LookupdPollInterval time.Duration // duration between polling lookupd for new connections LookupdPollJitter float64 // Maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time. MaxAttemptCount uint16 // maximum number of times this reader will attempt to process a message DefaultRequeueDelay time.Duration // the default duration when REQueueing MaxRequeueDelay time.Duration // the maximum duration when REQueueing (for doubling backoff) LowRdyIdleTimeout time.Duration // the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers) VerboseLogging bool // enable verbose logging ShortIdentifier string // an identifier to send to nsqd when connecting (defaults: short hostname) LongIdentifier string // an identifier to send to nsqd when connecting (defaults: long hostname) ReadTimeout time.Duration // the deadline set for network reads WriteTimeout time.Duration // the deadline set for network writes ExitChan chan int // read from this channel to block your main loop TLSv1 bool // negotiate enabling TLS TLSConfig *tls.Config // client TLS configuration // contains filtered or unexported fields }
Reader is a high-level type to consume from NSQ.
A Reader instance is supplied handler(s) that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: AsyncHandler and Handler for details on implementing those interfaces to create handlers.
If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.
func NewReader ¶
NewReader creates a new instance of Reader for the specified topic/channel
The returned Reader instance is setup with sane default values. To modify configuration, update the values on the returned instance before connecting.
func (*Reader) AddAsyncHandler ¶
func (q *Reader) AddAsyncHandler(handler AsyncHandler)
AddAsyncHandler adds an AsyncHandler for messages received by this Reader.
See AsyncHandler for details on implementing this interface.
It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.
func (*Reader) AddHandler ¶
AddHandler adds a Handler for messages received by this Reader.
See Handler for details on implementing this interface.
It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.
func (*Reader) ConnectToLookupd ¶
ConnectToLookupd adds a nsqlookupd address to the list for this Reader instance.
If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.
A goroutine is spawned to handle continual polling.
func (*Reader) ConnectToNSQ ¶
ConnectToNSQ takes a nsqd address to connect directly to.
It is recommended to use ConnectToLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.
func (*Reader) ConnectionMaxInFlight ¶
ConnectionMaxInFlight calculates the per-connection max-in-flight count.
This may change dynamically based on the number of connections to nsqd the Reader is responsible for.
func (*Reader) IsStarved ¶
IsStarved indicates whether any connections for this reader are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (*Reader) MaxInFlight ¶
MaxInFlight returns the configured maximum number of messages to allow in-flight.
func (*Reader) SetMaxBackoffDuration ¶ added in v0.2.17
SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing
func (*Reader) SetMaxInFlight ¶
SetMaxInFlight sets the maximum number of messages this reader instance will allow in-flight.
If already connected, it updates the reader RDY state for each connection.
type Writer ¶ added in v0.2.20
type Writer struct { net.Conn WriteTimeout time.Duration Addr string HeartbeatInterval time.Duration ShortIdentifier string LongIdentifier string // contains filtered or unexported fields }
func (*Writer) MultiPublish ¶ added in v0.2.20
func (*Writer) MultiPublishAsync ¶ added in v0.2.22
func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, args ...interface{}) error
func (*Writer) PublishAsync ¶ added in v0.2.22
func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, args ...interface{}) error