Documentation ¶
Overview ¶
Package nsq is the official Go package for NSQ (http://nsq.io/)
It provides high-level Consumer and Producer types as well as low-level functions to communicate over the NSQ protocol
Index ¶
- Constants
- Variables
- func IsValidChannelName(name string) bool
- func IsValidTopicName(name string) bool
- func ReadResponse(r io.Reader) ([]byte, error)
- func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)
- func UnpackResponse(response []byte) (int32, []byte, error)
- type AuthResponse
- type BackoffStrategy
- type Command
- func Auth(secret string) (*Command, error)
- func DeferredPublish(topic string, delay time.Duration, body []byte) *Command
- func Finish(id MessageID) *Command
- func Identify(js map[string]interface{}) (*Command, error)
- func MultiPublish(topic string, bodies [][]byte) (*Command, error)
- func Nop() *Command
- func Ping() *Command
- func Publish(topic string, body []byte) *Command
- func Ready(count int) *Command
- func Register(topic string, channel string) *Command
- func Requeue(id MessageID, delay time.Duration) *Command
- func StartClose() *Command
- func Subscribe(topic string, channel string) *Command
- func Touch(id MessageID) *Command
- func UnRegister(topic string, channel string) *Command
- type Config
- type ConfigFlag
- type Conn
- func (c *Conn) Close() error
- func (c *Conn) Connect() (*IdentifyResponse, error)
- func (c *Conn) Flush() error
- func (c *Conn) IsClosing() bool
- func (c *Conn) LastMessageTime() time.Time
- func (c *Conn) LastRDY() int64
- func (c *Conn) MaxRDY() int64
- func (c *Conn) RDY() int64
- func (c *Conn) Read(p []byte) (int, error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) SetLogger(l logger, lvl LogLevel, format string)
- func (c *Conn) SetRDY(rdy int64)
- func (c *Conn) String() string
- func (c *Conn) Write(p []byte) (int, error)
- func (c *Conn) WriteCommand(cmd *Command) error
- type ConnDelegate
- type Consumer
- func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int)
- func (r *Consumer) AddHandler(handler Handler)
- func (r *Consumer) ChangeMaxInFlight(maxInFlight int)
- func (r *Consumer) ConnectToNSQD(addr string) error
- func (r *Consumer) ConnectToNSQDs(addresses []string) error
- func (r *Consumer) ConnectToNSQLookupd(addr string) error
- func (r *Consumer) ConnectToNSQLookupds(addresses []string) error
- func (r *Consumer) DisconnectFromNSQD(addr string) error
- func (r *Consumer) DisconnectFromNSQLookupd(addr string) error
- func (r *Consumer) GetMaxInFlight() int32
- func (r *Consumer) IsStarved() bool
- func (r *Consumer) SetBehaviorDelegate(cb interface{})
- func (r *Consumer) SetLogger(l logger, lvl LogLevel)
- func (r *Consumer) Stats() *ConsumerStats
- func (r *Consumer) Stop()
- type ConsumerStats
- type DiscoveryFilter
- type ErrIdentify
- type ErrProtocol
- type ExponentialStrategy
- type FailedMessageLogger
- type FullJitterStrategy
- type Handler
- type HandlerFunc
- type IdentifyResponse
- type LogLevel
- type Message
- func (m *Message) DisableAutoResponse()
- func (m *Message) Finish()
- func (m *Message) HasResponded() bool
- func (m *Message) IsAutoResponseDisabled() bool
- func (m *Message) Requeue(delay time.Duration)
- func (m *Message) RequeueWithoutBackoff(delay time.Duration)
- func (m *Message) Touch()
- func (m *Message) WriteTo(w io.Writer) (int64, error)
- type MessageDelegate
- type MessageID
- type Producer
- func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error
- func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte, ...) error
- func (w *Producer) MultiPublish(topic string, body [][]byte) error
- func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, ...) error
- func (w *Producer) Ping() error
- func (w *Producer) Publish(topic string, body []byte) error
- func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, ...) error
- func (w *Producer) SetLogger(l logger, lvl LogLevel)
- func (w *Producer) Stop()
- func (w *Producer) String() string
- type ProducerTransaction
Examples ¶
Constants ¶
const ( FrameTypeResponse int32 = 0 FrameTypeError int32 = 1 FrameTypeMessage int32 = 2 )
frame types
const ( StateInit = iota StateDisconnected StateConnected StateSubscribed // StateClosing means CLOSE has started... // (responses are ok, but no new messages will be sent) StateClosing )
states
const MsgIDLength = 16
The number of bytes for a Message.ID
const VERSION = "1.0.6"
VERSION
Variables ¶
var ErrAlreadyConnected = errors.New("already connected")
ErrAlreadyConnected is returned from ConnectToNSQD when already connected
var ErrClosing = errors.New("closing")
ErrClosing is returned when a connection is closing
var ErrNotConnected = errors.New("not connected")
ErrNotConnected is returned when a publish command is made against a Producer that is not connected
var ErrOverMaxInFlight = errors.New("over configure max-inflight")
ErrOverMaxInFlight is returned from Consumer if over max-in-flight
var ErrStopped = errors.New("stopped")
ErrStopped is returned when a publish command is made against a Producer that has been stopped
var MagicV1 = []byte(" V1")
MagicV1 is the initial identifier sent when connecting for V1 clients
var MagicV2 = []byte(" V2")
MagicV2 is the initial identifier sent when connecting for V2 clients
Functions ¶
func IsValidChannelName ¶
IsValidChannelName checks a channel name for correctness
func IsValidTopicName ¶
IsValidTopicName checks a topic name for correctness
func ReadResponse ¶
ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... size data
func ReadUnpackedResponse ¶
ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error
func UnpackResponse ¶
UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... frame ID data
Returns a triplicate of: frame type, data ([]byte), error
Types ¶
type AuthResponse ¶
type AuthResponse struct { Identity string `json:"identity"` IdentityUrl string `json:"identity_url"` PermissionCount int64 `json:"permission_count"` }
AuthResponse represents the metadata returned from an AUTH command to nsqd
type BackoffStrategy ¶ added in v1.0.4
BackoffStrategy defines a strategy for calculating the duration of time a consumer should backoff for a given attempt
type Command ¶
Command represents a command from a client to an NSQ daemon
func Auth ¶
Auth sends credentials for authentication
After `Identify`, this is usually the first message sent, if auth is used.
func DeferredPublish ¶ added in v1.1.0
DeferredPublish creates a new Command to write a message to a given topic where the message will queue at the channel level until the timeout expires
func Finish ¶
Finish creates a new Command to indiciate that a given message (by id) has been processed successfully
func Identify ¶
Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.
The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.
See http://nsq.io/clients/tcp_protocol_spec.html#identify for information on the supported options
func MultiPublish ¶
MultiPublish creates a new Command to write more than one message to a given topic (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
func Nop ¶
func Nop() *Command
Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats
func Ping ¶
func Ping() *Command
Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client
func Ready ¶
Ready creates a new Command to specify the number of messages a client is willing to receive
func Requeue ¶
Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given delay NOTE: a delay of 0 indicates immediate requeue
func StartClose ¶
func StartClose() *Command
StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection
func UnRegister ¶
UnRegister creates a new Command to remove a topic/channel for the connected nsqd
type Config ¶
type Config struct { DialTimeout time.Duration `opt:"dial_timeout" default:"1s"` // Deadlines for network reads and writes ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"` WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"` // LocalAddr is the local address to use when dialing an nsqd. // If empty, a local address is automatically chosen. LocalAddr net.Addr `opt:"local_addr"` // Duration between polling lookupd for new producers, and fractional jitter to add to // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers // restart at the same time // // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between // reconnection attempts LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"` LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"` // Maximum duration when REQueueing (for doubling of deferred requeue) MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"` DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"` // Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms. BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"` // Maximum amount of time to backoff when processing fails 0 == no backoff MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"` // Unit of time for calculating consumer backoff BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"` // Maximum number of times this consumer will attempt to process a message before giving up MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"` // Duration to wait for a message from a producer when in a state where RDY // counts are re-distributed (ie. max_in_flight < num_producers) LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"` // Duration between redistributing max-in-flight to connections RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"` // Identifiers sent to nsqd representing this client // UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>") ClientID string `opt:"client_id"` // (defaults: short hostname) Hostname string `opt:"hostname"` UserAgent string `opt:"user_agent"` // Duration of time between heartbeats. This must be less than ReadTimeout HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"` // Integer percentage to sample the channel (requires nsqd 0.2.25+) SampleRate int32 `opt:"sample_rate" min:"0" max:"99"` // To set TLS config, use the following options: // // tls_v1 - Bool enable TLS negotiation // tls_root_ca_file - String path to file containing root CA // tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates // tls_cert - String path to file containing public key for certificate // tls_key - String path to file containing private key for certificate // tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2') // TlsV1 bool `opt:"tls_v1"` TlsConfig *tls.Config `opt:"tls_config"` // Compression Settings Deflate bool `opt:"deflate"` DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"` Snappy bool `opt:"snappy"` // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"` // Timeout used by nsqd before flushing buffered writes (set to 0 to disable). // // WARNING: configuring clients with an extremely low // (< 25ms) output_buffer_timeout has a significant effect // on nsqd CPU usage (particularly with > 50 clients connected). OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"` // Maximum number of messages to allow in flight (concurrency knob) MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"` // The server-side message timeout for messages delivered to this client MsgTimeout time.Duration `opt:"msg_timeout" min:"0"` // secret for nsqd authentication (requires nsqd 0.2.29+) AuthSecret string `opt:"auth_secret"` // contains filtered or unexported fields }
Config is a struct of NSQ options
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).
Use Set(option string, value interface{}) as an alternate way to set parameters
func NewConfig ¶
func NewConfig() *Config
NewConfig returns a new default nsq configuration.
This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func (*Config) Set ¶
Set takes an option as a string and a value as an interface and attempts to set the appropriate configuration option.
It attempts to coerce the value into the right format depending on the named option and the underlying type of the value passed in.
Calls to Set() that take a time.Duration as an argument can be input as:
"1000ms" (a string parsed by time.ParseDuration()) 1000 (an integer interpreted as milliseconds) 1000*time.Millisecond (a literal time.Duration value)
Calls to Set() that take bool can be input as:
"true" (a string parsed by strconv.ParseBool()) true (a boolean) 1 (an int where 1 == true and 0 == false)
It returns an error for an invalid option or value.
type ConfigFlag ¶ added in v1.1.0
type ConfigFlag struct {
*Config
}
ConfigFlag wraps a Config and implements the flag.Value interface
Example ¶
cfg := nsq.NewConfig() flagSet := flag.NewFlagSet("", flag.ExitOnError) flagSet.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to pass through to nsq.Consumer (may be given multiple times)") flagSet.PrintDefaults() err := flagSet.Parse([]string{ "--consumer-opt=heartbeat_interval,1s", "--consumer-opt=max_attempts,10", }) if err != nil { panic(err.Error()) } println("HeartbeatInterval", cfg.HeartbeatInterval) println("MaxAttempts", cfg.MaxAttempts)
Output:
func (*ConfigFlag) Set ¶ added in v1.1.0
func (c *ConfigFlag) Set(opt string) (err error)
Set takes a comma separated value and follows the rules in Config.Set using the first field as the option key, and the second (if present) as the value
func (*ConfigFlag) String ¶ added in v1.1.0
func (c *ConfigFlag) String() string
String implements the flag.Value interface
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a connection to nsqd
Conn exposes a set of callbacks for the various events that occur on a connection
func NewConn ¶
func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn
NewConn returns a new Conn instance
func (*Conn) Connect ¶
func (c *Conn) Connect() (*IdentifyResponse, error)
Connect dials and bootstraps the nsqd connection (including IDENTIFY) and returns the IdentifyResponse
func (*Conn) IsClosing ¶
IsClosing indicates whether or not the connection is currently in the processing of gracefully closing
func (*Conn) LastMessageTime ¶
LastMessageTime returns a time.Time representing the time at which the last message was received
func (*Conn) MaxRDY ¶
MaxRDY returns the nsqd negotiated maximum RDY count that it will accept for this connection
func (*Conn) RemoteAddr ¶
RemoteAddr returns the configured destination nsqd address
func (*Conn) SetLogger ¶
SetLogger assigns the logger to use as well as a level.
The format parameter is expected to be a printf compatible string with a single %s argument. This is useful if you want to provide additional context to the log messages that the connection will print, the default is '(%s)'.
The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
Output(calldepth int, s string)
func (*Conn) WriteCommand ¶
WriteCommand is a goroutine safe method to write a Command to this connection, and flush.
type ConnDelegate ¶
type ConnDelegate interface { // OnResponse is called when the connection // receives a FrameTypeResponse from nsqd OnResponse(*Conn, []byte) // OnError is called when the connection // receives a FrameTypeError from nsqd OnError(*Conn, []byte) // OnMessage is called when the connection // receives a FrameTypeMessage from nsqd OnMessage(*Conn, *Message) // OnMessageFinished is called when the connection // handles a FIN command from a message handler OnMessageFinished(*Conn, *Message) // OnMessageRequeued is called when the connection // handles a REQ command from a message handler OnMessageRequeued(*Conn, *Message) // OnBackoff is called when the connection triggers a backoff state OnBackoff(*Conn) // OnContinue is called when the connection finishes a message without adjusting backoff state OnContinue(*Conn) // OnResume is called when the connection triggers a resume state OnResume(*Conn) // OnIOError is called when the connection experiences // a low-level TCP transport error OnIOError(*Conn, error) // OnHeartbeat is called when the connection // receives a heartbeat from nsqd OnHeartbeat(*Conn) // OnClose is called when the connection // closes, after all cleanup OnClose(*Conn) }
ConnDelegate is an interface of methods that are used as callbacks in Conn
type Consumer ¶
type Consumer struct { // read from this channel to block until consumer is cleanly stopped StopChan chan int // contains filtered or unexported fields }
Consumer is a high-level type to consume from NSQ.
A Consumer instance is supplied a Handler that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: Handler/HandlerFunc for details on implementing the interface to create handlers.
If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.
func NewConsumer ¶
NewConsumer creates a new instance of Consumer for the specified topic/channel
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewConsumer the values are no longer mutable (they are copied).
func (*Consumer) AddConcurrentHandlers ¶
AddConcurrentHandlers sets the Handler for messages received by this Consumer. It takes a second argument which indicates the number of goroutines to spawn for message handling.
(see Handler or HandlerFunc for details on implementing this interface)
func (*Consumer) AddHandler ¶
AddHandler sets the Handler for messages received by this Consumer. This can be called multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
This panics if called after connecting to NSQD or NSQ Lookupd ¶
(see Handler or HandlerFunc for details on implementing this interface)
func (*Consumer) ChangeMaxInFlight ¶
ChangeMaxInFlight sets a new maximum number of messages this consumer instance will allow in-flight, and updates all existing connections as appropriate.
For example, ChangeMaxInFlight(0) would pause message flow ¶
If already connected, it updates the reader RDY state for each connection.
func (*Consumer) ConnectToNSQD ¶
ConnectToNSQD takes a nsqd address to connect directly to.
It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.
func (*Consumer) ConnectToNSQDs ¶
ConnectToNSQDs takes multiple nsqd addresses to connect directly to.
It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to local instance.
func (*Consumer) ConnectToNSQLookupd ¶
ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.
A goroutine is spawned to handle continual polling.
func (*Consumer) ConnectToNSQLookupds ¶
ConnectToNSQLookupds adds multiple nsqlookupd address to the list for this Consumer instance.
If adding the first address it initiates an HTTP request to discover nsqd producers for the configured topic.
A goroutine is spawned to handle continual polling.
func (*Consumer) DisconnectFromNSQD ¶
DisconnectFromNSQD closes the connection to and removes the specified `nsqd` address from the list
func (*Consumer) DisconnectFromNSQLookupd ¶
DisconnectFromNSQLookupd removes the specified `nsqlookupd` address from the list used for periodic discovery.
func (*Consumer) GetMaxInFlight ¶ added in v1.1.0
func (*Consumer) IsStarved ¶
IsStarved indicates whether any connections for this consumer are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (*Consumer) SetBehaviorDelegate ¶
func (r *Consumer) SetBehaviorDelegate(cb interface{})
SetBehaviorDelegate takes a type implementing one or more of the following interfaces that modify the behavior of the `Consumer`:
DiscoveryFilter
func (*Consumer) SetLogger ¶
SetLogger assigns the logger to use as well as a level
The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
Output(calldepth int, s string)
func (*Consumer) Stats ¶ added in v1.0.2
func (r *Consumer) Stats() *ConsumerStats
Stats retrieves the current connection and message statistics for a Consumer
type ConsumerStats ¶ added in v1.0.2
type ConsumerStats struct { MessagesReceived uint64 MessagesFinished uint64 MessagesRequeued uint64 Connections int }
ConsumerStats represents a snapshot of the state of a Consumer's connections and the messages it has seen
type DiscoveryFilter ¶
DiscoveryFilter is an interface accepted by `SetBehaviorDelegate()` for filtering the nsqds returned from discovery via nsqlookupd
type ErrIdentify ¶
type ErrIdentify struct {
Reason string
}
ErrIdentify is returned from Conn as part of the IDENTIFY handshake
type ErrProtocol ¶
type ErrProtocol struct {
Reason string
}
ErrProtocol is returned from Producer when encountering an NSQ protocol level error
type ExponentialStrategy ¶ added in v1.0.4
type ExponentialStrategy struct {
// contains filtered or unexported fields
}
ExponentialStrategy implements an exponential backoff strategy (default)
type FailedMessageLogger ¶
type FailedMessageLogger interface {
LogFailedMessage(message *Message)
}
FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Consumer specified MaxAttemptCount)
type FullJitterStrategy ¶ added in v1.0.4
type FullJitterStrategy struct {
// contains filtered or unexported fields
}
FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html
type Handler ¶
Handler is the message processing interface for Consumer
Implement this interface for handlers that return whether or not message processing completed successfully.
When the return value is nil Consumer will automatically handle FINishing.
When the returned value is non-nil Consumer will automatically handle REQueing.
type HandlerFunc ¶
HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:
consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { // handle the message }))
func (HandlerFunc) HandleMessage ¶
func (h HandlerFunc) HandleMessage(m *Message) error
HandleMessage implements the Handler interface
type IdentifyResponse ¶
type IdentifyResponse struct { MaxRdyCount int64 `json:"max_rdy_count"` TLSv1 bool `json:"tls_v1"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` AuthRequired bool `json:"auth_required"` }
IdentifyResponse represents the metadata returned from an IDENTIFY command to nsqd
type Message ¶
type Message struct { ID MessageID Body []byte Timestamp int64 Attempts uint16 NSQDAddress string Delegate MessageDelegate // contains filtered or unexported fields }
Message is the fundamental data type containing the id, body, and metadata
func DecodeMessage ¶
DecodeMessage deserializes data (as []byte) and creates a new Message message format: [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... | (int64) || || (hex string encoded in ASCII) || (binary) | 8-byte || || 16-byte || N-byte ------------------------------------------------------------------------------------------...
nanosecond timestamp ^^ message ID message body (uint16) 2-byte attempts
func NewMessage ¶
NewMessage creates a Message, initializes some metadata, and returns a pointer
func (*Message) DisableAutoResponse ¶
func (m *Message) DisableAutoResponse()
DisableAutoResponse disables the automatic response that would normally be sent when a handler.HandleMessage returns (FIN/REQ based on the error value returned).
This is useful if you want to batch, buffer, or asynchronously respond to messages.
func (*Message) Finish ¶
func (m *Message) Finish()
Finish sends a FIN command to the nsqd which sent this message
func (*Message) HasResponded ¶
HasResponded indicates whether or not this message has been responded to
func (*Message) IsAutoResponseDisabled ¶
IsAutoResponseDisabled indicates whether or not this message will be responded to automatically
func (*Message) Requeue ¶
Requeue sends a REQ command to the nsqd which sent this message, using the supplied delay.
A delay of -1 will automatically calculate based on the number of attempts and the configured default_requeue_delay
func (*Message) RequeueWithoutBackoff ¶
RequeueWithoutBackoff sends a REQ command to the nsqd which sent this message, using the supplied delay.
Notably, using this method to respond does not trigger a backoff event on the configured Delegate.
type MessageDelegate ¶
type MessageDelegate interface { // OnFinish is called when the Finish() method // is triggered on the Message OnFinish(*Message) // OnRequeue is called when the Requeue() method // is triggered on the Message OnRequeue(m *Message, delay time.Duration, backoff bool) // OnTouch is called when the Touch() method // is triggered on the Message OnTouch(*Message) }
MessageDelegate is an interface of methods that are used as callbacks in Message
type MessageID ¶
type MessageID [MsgIDLength]byte
MessageID is the ASCII encoded hexadecimal message ID
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a high-level type to publish to NSQ.
A Producer instance is 1:1 with a destination `nsqd` and will lazily connect to that instance (and re-connect) when Publish commands are executed.
func NewProducer ¶
NewProducer returns an instance of Producer for the specified address
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewProducer the values are no longer mutable (they are copied).
func (*Producer) DeferredPublish ¶ added in v1.1.0
DeferredPublish synchronously publishes a message body to the specified topic where the message will queue at the channel level until the timeout expires, returning an error if publish failed
func (*Producer) DeferredPublishAsync ¶ added in v1.1.0
func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte, doneChan chan *ProducerTransaction, args ...interface{}) error
DeferredPublishAsync publishes a message body to the specified topic where the message will queue at the channel level until the timeout expires but does not wait for the response from `nsqd`.
When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present
func (*Producer) MultiPublish ¶
MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning an error if publish failed
func (*Producer) MultiPublishAsync ¶
func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, args ...interface{}) error
MultiPublishAsync publishes a slice of message bodies to the specified topic but does not wait for the response from `nsqd`.
When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present
func (*Producer) Ping ¶ added in v1.0.4
Ping causes the Producer to connect to it's configured nsqd (if not already connected) and send a `Nop` command, returning any error that might occur.
This method can be used to verify that a newly-created Producer instance is configured correctly, rather than relying on the lazy "connect on Publish" behavior of a Producer.
func (*Producer) Publish ¶
Publish synchronously publishes a message body to the specified topic, returning an error if publish failed
func (*Producer) PublishAsync ¶
func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, args ...interface{}) error
PublishAsync publishes a message body to the specified topic but does not wait for the response from `nsqd`.
When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present
func (*Producer) SetLogger ¶
SetLogger assigns the logger to use as well as a level
The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
Output(calldepth int, s string)
type ProducerTransaction ¶
type ProducerTransaction struct { Error error // the error (or nil) of the publish command Args []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync // contains filtered or unexported fields }
ProducerTransaction is returned by the async publish methods to retrieve metadata about the command after the response is received.