nsq

package
v0.2.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2013 License: MIT Imports: 23 Imported by: 0

README

go-nsq

go-nsq is the official Go package for NSQ.

It provides the building blocks for developing applications on the NSQ platform in Go.

Low-level functions and types are provided to communicate over the NSQ protocol as well as a high-level Reader and Writer library to implement consumers and producers.

See the examples directory for utilities built using this package that provide support for common tasks.

Installing

$ go get github.com/bitly/nsq/nsq

Importing

import "github.com/bitly/nsq/nsq"

Docs

See godoc for pretty documentation or:

# in the nsq package directory
$ go doc

Documentation

Overview

nsq is the official Go package for https://github.com/bitly/nsq

It provides the building blocks for developing applications on the NSQ platform in Go.

Low-level functions and types are provided to communicate over the NSQ protocol as well as a high-level Reader library to implement robust consumers.

Index

Constants

View Source
const (
	// when successful
	FrameTypeResponse int32 = 0
	// when an error occurred
	FrameTypeError int32 = 1
	// when it's a serialized message
	FrameTypeMessage int32 = 2
)
View Source
const (
	StateInit = iota
	StateDisconnected
	StateConnected
	StateSubscribed
	// close has started. responses are ok, but no new messages will be sent
	StateClosing
)
View Source
const DefaultClientTimeout = 60 * time.Second

The amount of time nsqd will allow a client to idle, can be overriden

View Source
const MsgIdLength = 16

The number of bytes for a Message.Id

View Source
const VERSION = "0.3.2"

Variables

View Source
var ErrAlreadyConnected = errors.New("already connected")

returned from ConnectToNSQ() when already connected

View Source
var ErrNotConnected = errors.New("not connected")
View Source
var ErrOverMaxInFlight = errors.New("over configure max-inflight")

return from updateRdy if over max-in-flight

View Source
var ErrStopped = errors.New("stopped")
View Source
var MagicV1 = []byte("  V1")
View Source
var MagicV2 = []byte("  V2")

Functions

func ApiRequest

func ApiRequest(endpoint string) (*simplejson.Json, error)

ApiRequest is a helper function to perform an HTTP request and parse our NSQ daemon's expected response format, with deadlines.

{"status_code":200, "status_txt":"OK", "data":{...}}

func IsValidChannelName

func IsValidChannelName(name string) bool

IsValidChannelName checks a channel name for correctness

func IsValidTopicName

func IsValidTopicName(name string) bool

IsValidTopicName checks a topic name for correctness

func NewDeadlineTransport added in v0.2.16

func NewDeadlineTransport(timeout time.Duration) *http.Transport

A custom http.Transport with support for deadline timeouts

func ReadResponse

func ReadResponse(r io.Reader) ([]byte, error)

ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
    size       data

func SendFramedResponse added in v0.2.16

func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error)

SendFramedResponse is a server side utility function to prefix data with a length header and frame header and write to the supplied Writer

func SendResponse

func SendResponse(w io.Writer, data []byte) (int, error)

SendResponse is a server side utility function to prefix data with a length header and write to the supplied Writer

func UnpackResponse

func UnpackResponse(response []byte) (int32, []byte, error)

UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:

[x][x][x][x][x][x][x][x]...
|  (int32) || (binary)
|  4-byte  || N-byte
------------------------...
  frame ID     data

Returns a triplicate of: frame type, data ([]byte), error

Types

type AsyncHandler

type AsyncHandler interface {
	HandleMessage(message *Message, responseChannel chan *FinishedMessage)
}

AsyncHandler is the asynchronous interface to Reader.

Implement this interface for handlers that wish to defer responding until later. This is particularly useful if you want to batch work together.

An AsyncHandler must send:

&FinishedMessage{messageID, requeueDelay, true|false}

To the supplied responseChannel to indicate that a message is processed.

type ChildError added in v0.2.17

type ChildError interface {
	Parent() error
}

type ClientErr

type ClientErr struct {
	ParentErr error
	Code      string
	Desc      string
}

ClientErr provides a way for NSQ daemons to log a human reabable error string and return a machine readable string to the client.

see docs/protocol.md for error codes by command

func NewClientErr

func NewClientErr(parent error, code string, description string) *ClientErr

NewClientErr creates a ClientErr with the supplied human and machine readable strings

func (*ClientErr) Error

func (e *ClientErr) Error() string

Error returns the machine readable form

func (*ClientErr) Parent added in v0.2.17

func (e *ClientErr) Parent() error

Parent returns the parent error

type Command

type Command struct {
	Name   []byte
	Params [][]byte
	Body   []byte
}

Command represents a command from a client to an NSQ daemon

func Finish

func Finish(id MessageID) *Command

Finish creates a new Command to indiciate that a given message (by id) has been processed successfully

func Identify

func Identify(js map[string]interface{}) (*Command, error)

Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.

The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.

nsqd currently supports the following keys:

short_id - short identifier, typically client's short hosname
long_id - long identifier, typically client's long hostname
buffer_size - size in bytes for nsqd to buffer before writing to the wire for this client

nsqlookupd currently supports the following keys:

version - the version of the nsqd peer
tcp_port - the nsqd port where TCP clients can connect
http_port - the nsqd port where HTTP clients can connect
address - the address where clients can connect (generally DNS resolvable hostname)

func MultiPublish added in v0.2.16

func MultiPublish(topic string, bodies [][]byte) (*Command, error)

MultiPublish creates a new Command to write more than one message to a given topic. This is useful for high-throughput situations to avoid roundtrips and saturate the pipe.

func Nop

func Nop() *Command

Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats

func Ping

func Ping() *Command

Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client

func Publish

func Publish(topic string, body []byte) *Command

Publish creates a new Command to write a message to a given topic

func Ready

func Ready(count int) *Command

Ready creates a new Command to specify the number of messages a client is willing to receive

func Register

func Register(topic string, channel string) *Command

Register creates a new Command to add a topic/channel for the connected nsqd

func Requeue

func Requeue(id MessageID, timeoutMs int) *Command

Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given timeout (in ms) NOTE: a timeout of 0 indicates immediate requeue

func StartClose

func StartClose() *Command

StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection

func Subscribe

func Subscribe(topic string, channel string) *Command

Subscribe creates a new Command to subscribe to the given topic/channel

func Touch added in v0.2.17

func Touch(id MessageID) *Command

Touch creates a new Command to reset the timeout for a given message (by id)

func UnRegister

func UnRegister(topic string, channel string) *Command

Unregister creates a new Command to remove a topic/channel for the connected nsqd

func (*Command) String

func (c *Command) String() string

String returns the name and parameters of the Command

func (*Command) Write added in v0.2.16

func (c *Command) Write(w io.Writer) error

Write serializes the Command to the supplied Writer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type FailedMessageLogger

type FailedMessageLogger interface {
	LogFailedMessage(message *Message)
}

FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Reader specified MaxAttemptCount)

type FatalClientErr added in v0.2.17

type FatalClientErr struct {
	ParentErr error
	Code      string
	Desc      string
}

func NewFatalClientErr added in v0.2.17

func NewFatalClientErr(parent error, code string, description string) *FatalClientErr

NewClientErr creates a ClientErr with the supplied human and machine readable strings

func (*FatalClientErr) Error added in v0.2.17

func (e *FatalClientErr) Error() string

Error returns the machine readable form

func (*FatalClientErr) Parent added in v0.2.17

func (e *FatalClientErr) Parent() error

Parent returns the parent error

type FinishedMessage

type FinishedMessage struct {
	Id             MessageID
	RequeueDelayMs int
	Success        bool
}

FinishedMessage is the data type used over responseChannel in AsyncHandlers

type Handler

type Handler interface {
	HandleMessage(message *Message) error
}

Handler is the synchronous interface to Reader.

Implement this interface for handlers that return whether or not message processing completed successfully.

When the return value is nil Reader will automatically handle FINishing.

When the returned value is non-nil Reader will automatically handle REQueing.

type LookupPeer

type LookupPeer struct {
	Info PeerInfo
	// contains filtered or unexported fields
}

LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd

A LookupPeer instance is designed to connect lazily to nsqlookupd and reconnect gracefully (i.e. it is all handled by the library). Clients can simply use the Command interface to perform a round-trip.

func NewLookupPeer

func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer

NewLookupPeer creates a new LookupPeer instance connecting to the supplied address.

The supplied connectCallback will be called *every* time the instance connects.

func (*LookupPeer) Close

func (lp *LookupPeer) Close() error

Close implements the io.Closer interface

func (*LookupPeer) Command

func (lp *LookupPeer) Command(cmd *Command) ([]byte, error)

Command performs a round-trip for the specified Command.

It will lazily connect to nsqlookupd and gracefully handle reconnecting in the event of a failure.

It returns the response from nsqlookupd as []byte

func (*LookupPeer) Connect

func (lp *LookupPeer) Connect() error

Connect will Dial the specified address, with timeouts

func (*LookupPeer) Read

func (lp *LookupPeer) Read(data []byte) (int, error)

Read implements the io.Reader interface, adding deadlines

func (*LookupPeer) String

func (lp *LookupPeer) String() string

String returns the specified address

func (*LookupPeer) Write

func (lp *LookupPeer) Write(data []byte) (int, error)

Write implements the io.Writer interface, adding deadlines

type Message

type Message struct {
	Id        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16
}

Message is the fundamental data type containing the id, body, and metadata

func DecodeMessage

func DecodeMessage(byteBuf []byte) (*Message, error)

DecodeMessage deseralizes data (as []byte) and creates a new Message

func NewMessage

func NewMessage(id MessageID, body []byte) *Message

NewMessage creates a Message, initializes some metadata, and returns a pointer

func (*Message) EncodeBytes

func (m *Message) EncodeBytes() ([]byte, error)

EncodeBytes serializes the message into a new, returned, []byte

func (*Message) Write added in v0.2.16

func (m *Message) Write(w io.Writer) error

Write serializes the message into the supplied writer.

It is suggested that the target Writer is buffered to avoid performing many system calls.

type MessageID added in v0.2.16

type MessageID [MsgIdLength]byte

type PeerInfo

type PeerInfo struct {
	TcpPort          int    `json:"tcp_port"`
	HttpPort         int    `json:"http_port"`
	Version          string `json:"version"`
	Address          string `json:"address"` //TODO: remove for 1.0
	BroadcastAddress string `json:"broadcast_address"`
}

PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable)

type Protocol

type Protocol interface {
	IOLoop(conn net.Conn) error
}

Protocol describes the basic behavior of any protocol in the system

type Reader

type Reader struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	MessagesReceived uint64 // an atomic counter - # of messages received
	MessagesFinished uint64 // an atomic counter - # of messages FINished
	MessagesRequeued uint64 // an atomic counter - # of messages REQueued

	sync.RWMutex

	TopicName           string        // name of topic to subscribe to
	ChannelName         string        // name of channel to subscribe to
	LookupdPollInterval time.Duration // duration between polling lookupd for new connections
	LookupdPollJitter   float64       // Maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
	MaxAttemptCount     uint16        // maximum number of times this reader will attempt to process a message
	DefaultRequeueDelay time.Duration // the default duration when REQueueing
	MaxRequeueDelay     time.Duration // the maximum duration when REQueueing (for doubling backoff)
	LowRdyIdleTimeout   time.Duration // the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
	VerboseLogging      bool          // enable verbose logging
	ShortIdentifier     string        // an identifier to send to nsqd when connecting (defaults: short hostname)
	LongIdentifier      string        // an identifier to send to nsqd when connecting (defaults: long hostname)
	ReadTimeout         time.Duration // the deadline set for network reads
	WriteTimeout        time.Duration // the deadline set for network writes
	ExitChan            chan int      // read from this channel to block your main loop
	TLSv1               bool          // negotiate enabling TLS
	TLSConfig           *tls.Config   // client TLS configuration
	// contains filtered or unexported fields
}

Reader is a high-level type to consume from NSQ.

A Reader instance is supplied handler(s) that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: AsyncHandler and Handler for details on implementing those interfaces to create handlers.

If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.

func NewReader

func NewReader(topic string, channel string) (*Reader, error)

NewReader creates a new instance of Reader for the specified topic/channel

The returned Reader instance is setup with sane default values. To modify configuration, update the values on the returned instance before connecting.

func (*Reader) AddAsyncHandler

func (q *Reader) AddAsyncHandler(handler AsyncHandler)

AddAsyncHandler adds an AsyncHandler for messages received by this Reader.

See AsyncHandler for details on implementing this interface.

It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.

func (*Reader) AddHandler

func (q *Reader) AddHandler(handler Handler)

AddHandler adds a Handler for messages received by this Reader.

See Handler for details on implementing this interface.

It's ok to start more than one handler simultaneously, they are concurrently executed in goroutines.

func (*Reader) ConnectToLookupd

func (q *Reader) ConnectToLookupd(addr string) error

ConnectToLookupd adds a nsqlookupd address to the list for this Reader instance.

If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.

A goroutine is spawned to handle continual polling.

func (*Reader) ConnectToNSQ

func (q *Reader) ConnectToNSQ(addr string) error

ConnectToNSQ takes a nsqd address to connect directly to.

It is recommended to use ConnectToLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.

func (*Reader) ConnectionMaxInFlight

func (q *Reader) ConnectionMaxInFlight() int64

ConnectionMaxInFlight calculates the per-connection max-in-flight count.

This may change dynamically based on the number of connections to nsqd the Reader is responsible for.

func (*Reader) IsStarved

func (q *Reader) IsStarved() bool

IsStarved indicates whether any connections for this reader are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)

func (*Reader) MaxInFlight

func (q *Reader) MaxInFlight() int

MaxInFlight returns the configured maximum number of messages to allow in-flight.

func (*Reader) SetMaxBackoffDuration added in v0.2.17

func (q *Reader) SetMaxBackoffDuration(duration time.Duration)

SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing

func (*Reader) SetMaxInFlight

func (q *Reader) SetMaxInFlight(maxInFlight int)

SetMaxInFlight sets the maximum number of messages this reader instance will allow in-flight.

If already connected, it updates the reader RDY state for each connection.

func (*Reader) Stop

func (q *Reader) Stop()

Stop will gracefully stop the Reader

type Writer added in v0.2.20

type Writer struct {
	net.Conn

	WriteTimeout      time.Duration
	Addr              string
	HeartbeatInterval time.Duration
	ShortIdentifier   string
	LongIdentifier    string
	// contains filtered or unexported fields
}

func NewWriter added in v0.2.20

func NewWriter(addr string) *Writer

func (*Writer) MultiPublish added in v0.2.20

func (w *Writer) MultiPublish(topic string, body [][]byte) (int32, []byte, error)

func (*Writer) MultiPublishAsync added in v0.2.22

func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, args ...interface{}) error

func (*Writer) Publish added in v0.2.20

func (w *Writer) Publish(topic string, body []byte) (int32, []byte, error)

func (*Writer) PublishAsync added in v0.2.22

func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, args ...interface{}) error

func (*Writer) Stop added in v0.2.20

func (w *Writer) Stop()

func (*Writer) String added in v0.2.22

func (w *Writer) String() string

type WriterTransaction added in v0.2.22

type WriterTransaction struct {
	FrameType int32
	Data      []byte
	Error     error
	Args      []interface{}
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL