Documentation ¶
Index ¶
- Constants
- Variables
- func ApiRequest(endpoint string) (*simplejson.Json, error)
- func Frame(w io.Writer, frameType int32, data []byte) error
- func IsValidChannelName(name string) bool
- func IsValidTopicName(name string) bool
- func ReadMagic(r io.Reader) (int32, error)
- func ReadResponse(r io.Reader) ([]byte, error)
- func SendCommand(w io.Writer, cmd *Command) error
- func SendResponse(w io.Writer, data []byte) (int, error)
- func UnpackResponse(response []byte) (int32, []byte, error)
- type AsyncHandler
- type ClientErr
- type Command
- func Announce(topic string, channel string, port int, ips []string) *Command
- func Finish(id []byte) *Command
- func Identify(version string, tcpPort int, httpPort int, address string) *Command
- func Nop() *Command
- func Ping() *Command
- func Publish(topic string, body []byte) *Command
- func Ready(count int) *Command
- func Register(topic string, channel string) *Command
- func Requeue(id []byte, timeoutMs int) *Command
- func StartClose() *Command
- func Subscribe(topic string, channel string, shortIdentifier string, longIdentifier string) *Command
- func UnRegister(topic string, channel string) *Command
- type FailedMessageLogger
- type FinishedMessage
- type Handler
- type LookupPeer
- type Message
- type PeerInfo
- type Protocol
- type Reader
- func (q *Reader) AddAsyncHandler(handler AsyncHandler)
- func (q *Reader) AddHandler(handler Handler)
- func (q *Reader) ConnectToLookupd(addr string) error
- func (q *Reader) ConnectToNSQ(addr string) error
- func (q *Reader) ConnectionMaxInFlight() int
- func (q *Reader) IsStarved() bool
- func (q *Reader) MaxInFlight() int
- func (q *Reader) SetMaxInFlight(maxInFlight int)
- func (q *Reader) Stop()
Constants ¶
const ( FrameTypeResponse int32 = 0 FrameTypeError int32 = 1 FrameTypeMessage int32 = 2 )
const ( StateInit = iota StateDisconnected StateConnected StateSubscribed StateClosing // close has started. responses are ok, but no new messages will be sent )
const DefaultClientTimeout = 60 * time.Second
const MaxReadyCount = 2500
const MsgIdLength = 16
const VERSION = "0.2.4"
Variables ¶
var ErrAlreadyConnected = errors.New("already connected")
var MagicV1 = []byte(" V1")
var MagicV2 = []byte(" V2")
Functions ¶
func ApiRequest ¶
func IsValidChannelName ¶
func IsValidTopicName ¶
Types ¶
type AsyncHandler ¶
type AsyncHandler interface {
HandleMessage(message *Message, responseChannel chan *FinishedMessage)
}
an async handler that must send a &FinishedMessage{messageID, requeueDelay, true|false} onto responseChannel to indicate that a message has been finished. This is usefull if you want to batch work together and delay response that processing is complete
type Command ¶
func Announce ¶
Announce creates a new Command to announce the existence of a given topic and/or channel. NOTE: if channel == "." then it is considered n/a
func Finish ¶
Finish creates a new Command to indiciate that a given message (by id) has been processed successfully
func Identify ¶
Identify is the first message sent to the Lookupd and provides information about the client
func Ping ¶
func Ping() *Command
Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client
func Ready ¶
Ready creates a new Command to specify the number of messages a client is willing to receive
func Requeue ¶
Requeue creats a new Command to indicate that a given message (by id) should be requeued after the given timeout (in ms) NOTE: a timeout of 0 indicates immediate requeue
func StartClose ¶
func StartClose() *Command
StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected to ACK after which it can finish pending messages and close the connection
func Subscribe ¶
func Subscribe(topic string, channel string, shortIdentifier string, longIdentifier string) *Command
Subscribe creates a new Command to subscribe to the given topic/channel
func UnRegister ¶
UNREGISTER removes a topic/channel from this nsqd
type FailedMessageLogger ¶
type FailedMessageLogger interface {
LogFailedMessage(message *Message)
}
type FinishedMessage ¶
type LookupPeer ¶
type LookupPeer struct { PeerInfo PeerInfo // contains filtered or unexported fields }
LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
func NewLookupPeer ¶
func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer
NewLookupPeer creates a new LookupPeer instance
func (*LookupPeer) Close ¶
func (lp *LookupPeer) Close() error
func (*LookupPeer) Connect ¶
func (lp *LookupPeer) Connect() error
func (*LookupPeer) String ¶
func (lp *LookupPeer) String() string
type Message ¶
Message is the fundamental data type containing the id, body, and meta-data
func DecodeMessage ¶
DecodeMessage deseralizes data (as []byte) and creates/returns a pointer to a new Message
func NewMessage ¶
NewMessage creates a Message, initializes some meta-data, and returns a pointer
func (*Message) EncodeBytes ¶
EncodeBytes serializes the message into a new []byte
type Reader ¶
type Reader struct { TopicName string // name of topic to subscribe to ChannelName string // name of channel to subscribe to LookupdPollInterval time.Duration // seconds between polling lookupd's (+/- random 1/10th this value) MaxAttemptCount uint16 DefaultRequeueDelay time.Duration MaxRequeueDelay time.Duration VerboseLogging bool ShortIdentifier string // an identifier to send to nsqd when connecting (defaults: short hostname) LongIdentifier string // an identifier to send to nsqd when connecting (defaults: long hostname) ReadTimeout time.Duration WriteTimeout time.Duration MessagesReceived uint64 MessagesFinished uint64 MessagesRequeued uint64 ExitChan chan int // contains filtered or unexported fields }
func (*Reader) AddAsyncHandler ¶
func (q *Reader) AddAsyncHandler(handler AsyncHandler)
this starts an async handler on the Reader it's ok to start more than one handler simultaneously
func (*Reader) AddHandler ¶
this starts a handler on the Reader it's ok to start more than one handler simultaneously
func (*Reader) ConnectToLookupd ¶
func (*Reader) ConnectToNSQ ¶
func (*Reader) ConnectionMaxInFlight ¶
calculate the max in flight count per connection this may change dynamically based on the number of connections
func (*Reader) IsStarved ¶
IsStarved indicates whether any connections for this reader are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (*Reader) MaxInFlight ¶
max number of messages to allow in-flight at a time
func (*Reader) SetMaxInFlight ¶
update the reader ready state, updating each connection as appropriate