Documentation ¶
Index ¶
- Constants
- func NewTLSConfig(config TLSConfig) (*tls.Config, error)
- func RateLimit(limit int, messages <-chan Message) <-chan Message
- type Auth
- type Client
- func (c *Client) CreateChannel(topic string, channel string) error
- func (c *Client) CreateTopic(topic string) error
- func (c *Client) DeleteChannel(topic string, channel string) error
- func (c *Client) DeleteTopic(topic string) error
- func (c *Client) EmptyChannel(topic string, channel string) error
- func (c *Client) EmptyTopic(topic string) error
- func (c *Client) MutliPublish(topic string, messages ...[]byte) (err error)
- func (c *Client) PauseChannel(topic string, channel string) error
- func (c *Client) PauseTopic(topic string) error
- func (c *Client) Ping() error
- func (c *Client) Publish(topic string, message []byte) (err error)
- func (c *Client) UnpauseChannel(topic string, channel string) error
- func (c *Client) UnpauseTopic(topic string) error
- type Cls
- type Command
- type Conn
- func (c *Conn) Close() (err error)
- func (c *Conn) LocalAddr() net.Addr
- func (c *Conn) Read(b []byte) (n int, err error)
- func (c *Conn) ReadCommand() (cmd Command, err error)
- func (c *Conn) ReadFrame() (frame Frame, err error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) SetDeadline(t time.Time) (err error)
- func (c *Conn) SetReadDeadline(t time.Time) (err error)
- func (c *Conn) SetWriteDeadline(t time.Time) (err error)
- func (c *Conn) Write(b []byte) (n int, err error)
- func (c *Conn) WriteCommand(cmd Command) (err error)
- func (c *Conn) WriteFrame(frame Frame) (err error)
- type Consumer
- type ConsumerConfig
- type Error
- type Fin
- type Frame
- type FrameType
- type Identify
- type IdentityResponse
- type LookupClient
- type LookupResult
- type MPub
- type Message
- type MessageID
- type Nop
- type Producer
- type ProducerConfig
- type ProducerInfo
- type ProducerRequest
- type Pub
- type Rdy
- type Req
- type Response
- type Sub
- type TLSConfig
- type Touch
- type UnknownFrame
Constants ¶
const ( DefaultUserAgent = "github.com/xenking/nsq-go" DefaultMaxConcurrency = 1 DefaultMaxInFlight = 1 DefaultDialTimeout = 5 * time.Second DefaultReadTimeout = 1 * time.Minute DefaultWriteTimeout = 10 * time.Second DefaultLookupTimeout = 10 * time.Second DefaultMaxRetryTimeout = 10 * time.Second DefaultMinRetryTimeout = 10 * time.Millisecond DefaultDrainTimeout = 10 * time.Second NoTimeout = time.Duration(0) )
const CommandIdentify = "IDENTIFY"
Variables ¶
This section is empty.
Functions ¶
func RateLimit ¶
RateLimit consumes messages from the messages channel and limits the rate at which they are produced to the channel returned by this function.
The limit is the maximum number of messages per second that are produced. No rate limit is applied if limit is negative or zero.
The returned channel is closed when the messages channel is closed.
Types ¶
type Auth ¶
type Auth struct { // Secret set for authentication. Secret string }
Auth represents the AUTH command.
type Client ¶
func (*Client) CreateTopic ¶
func (*Client) DeleteTopic ¶
func (*Client) EmptyTopic ¶
func (*Client) MutliPublish ¶
func (*Client) PauseTopic ¶
func (*Client) UnpauseTopic ¶
type Cls ¶
type Cls struct { }
Cls represents the CLS command.
type Command ¶
type Command interface { // Name returns the name of the command. Name() string // Write serializes the command to the given buffered output. Write(*bufio.Writer) error }
The Command interface is implemented by types that represent the different commands of the NSQ protocol.
func ReadCommand ¶
ReadCommand reads a command from the buffered input r, returning it or an error if something went wrong.
if cmd, err := nsq.ReadCommand(r); err != nil { // handle the error ... } else { switch c := cmd.(type) { case nsq.Pub: ... } }
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
func DialTimeout ¶
func (*Conn) ReadCommand ¶
func (*Conn) RemoteAddr ¶
func (*Conn) WriteCommand ¶
func (*Conn) WriteFrame ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(config ConsumerConfig) (c *Consumer, err error)
NewConsumer configures a new consumer instance.
func StartConsumer ¶
func StartConsumer(config ConsumerConfig) (c *Consumer, err error)
StartConsumer creates and starts consuming from NSQ right away. This is the fastest way to get up and running.
type ConsumerConfig ¶
type Error ¶
type Error string
Error is a frame type representing error responses to commands.
const ( ErrInvalid Error = "E_INVALID" ErrBadBody Error = "E_BAD_BODY" ErrBadTopic Error = "E_BAD_TOPIC" ErrBadChannel Error = "E_BAD_CHANNEL" ErrBadMessage Error = "E_BAD_MESSAGE" ErrPubFailed Error = "E_PUB_FAILED" ErrMPubFailed Error = "E_MPUB_FAILED" ErrFinFailed Error = "E_FIN_FAILED" ErrReqFailed Error = "E_REQ_FAILED" ErrTouchFailed Error = "E_TOUCH_FAILED" ErrAuthFailed Error = "E_AUTH_FAILED" )
type Fin ¶
type Fin struct { // MessageID is the ID of the message to mark finished. MessageID MessageID }
Fin represents the FIN command.
type Frame ¶
type Frame interface { // FrameType returns the code representing the frame type. FrameType() FrameType // Write serializes the frame to the given buffered output. Write(*bufio.Writer) error }
The Frame interface is implemented by types that represent the different types of frames that a consumer may receive.
type FrameType ¶
type FrameType int
FrameType is used to represent the different types of frames that a consumer may receive.
const ( // FrameTypeResponse is the code for frames that carry success responses to // commands. FrameTypeResponse FrameType = 0 // FrameTypeError is the code for frames that carry error responses to // commands. FrameTypeError FrameType = 1 // FrameTypeMessage is the code for frames that carry messages. FrameTypeMessage FrameType = 2 )
type Identify ¶
type Identify struct { // ClientID should be set to a unique identifier representing the client. ClientID string // Hostname represents the hostname of the client, by default it is set to // the value returned by os.Hostname is used. Hostname string // UserAgent represents the type of the client, by default it is set to // nsq.DefaultUserAgent. UserAgent string // TLSV1 can be set to configure the secure tcp with TLSV1, by default it is set to // false. TLSV1 bool TLSConfig *tls.Config // Compression Settings Deflate bool DeflateLevel int Snappy bool // MessageTimeout can bet set to configure the server-side message timeout // for messages delivered to this consumer. By default it is not sent to // the server. MessageTimeout time.Duration }
Identify represents the IDENTIFY command.
type IdentityResponse ¶ added in v1.2.8
type LookupClient ¶
func (*LookupClient) Lookup ¶
func (c *LookupClient) Lookup(topic string) (result LookupResult, err error)
type LookupResult ¶
type LookupResult struct { Channels []string `json:"channels"` Producers []ProducerInfo `json:"producers"` }
type MPub ¶
type MPub struct { // Topic must be set to the name of the topic to which the messages will be // published. Topic string // Messages is the list of raw messages to publish. Messages [][]byte }
MPub represents the MPUB command.
type Message ¶
type Message struct { // The ID of the message. ID MessageID // Attempts is set to the number of attempts made to deliver the message. Attempts uint16 // Body contains the raw data of the message frame. Body []byte // Timestamp is the time at which the message was published. Timestamp time.Time // contains filtered or unexported fields }
Message is a frame type representing a NSQ message.
func NewMessage ¶
NewMessage is a helper for creating Message instances directly. A common use-case is for writing tests, generally you won't use this directly.
If you do use this, the Command channel is used internally to communicate message commands, such as "Finish" or "Requeue". When using this for testing, you can make a channel and inspect any message sent along it for assertions.
func (*Message) Complete ¶
Complete will return a bool indicating whether Finish or Requeue has been called for this message.
func (*Message) Finish ¶
func (m *Message) Finish()
Finish must be called on every message received from a consumer to let the NSQ server know that the message was successfully processed.
One of Finish or Requeue should be called on every message, and the methods will panic if they are called more than once.
func (*Message) Requeue ¶
Requeue must be called on messages received from a consumer to let the NSQ server know that the message could not be proessed and should be retried. The timeout is the amount of time the NSQ server waits before offering this message again to its consumers.
One of Finish or Requeue should be called on every message, and the methods will panic if they are called more than once.
type MessageID ¶
type MessageID uint64
MessageID is used to represent NSQ message IDs.
func ParseMessageID ¶
ParseMessageID attempts to parse s, which should be an hexadecimal representation of an 8 byte message ID.
type Nop ¶
type Nop struct { }
Nop represents the NOP command.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer provide an abstraction around using direct connections to nsqd nodes to send messages.
func NewProducer ¶
func NewProducer(config ProducerConfig) (p *Producer, err error)
NewProducer configures a new producer instance.
func StartProducer ¶
func StartProducer(config ProducerConfig) (p *Producer, err error)
StartProducer starts and returns a new producer p, configured with the variables from the config parameter, or returning an non-nil error if some of the configuration variables were invalid.
func (*Producer) Connected ¶
Connected returns true if the producer has successfully established a connection to nsqd, false otherwise.
func (*Producer) Publish ¶
Publish sends a message using the producer p, returning an error if it was already closed or if an error occurred while publishing the message.
Note that no retry is done internally, the producer will fail after the first unsuccessful attempt to publish the message. It is the responsibility of the caller to retry if necessary.
func (*Producer) PublishTo ¶
PublishTo sends a message to a specific topic using the producer p, returning an error if it was already closed or if an error occurred while publishing the message.
Note that no retry is done internally, the producer will fail after the first unsuccessful attempt to publish the message. It is the responsibility of the caller to retry if necessary.
func (*Producer) Requests ¶
func (p *Producer) Requests() chan<- ProducerRequest
Requests returns a write-only channel that can be used to submit requests to p.
This method is useful when the publish operation needs to be associated with other operations on channels in a select statement for example, or to publish in a non-blocking fashion.
func (*Producer) Start ¶
func (p *Producer) Start()
Start explicitly begins the producer in case it was initialized with NewProducer instead of StartProducer.
func (*Producer) Stop ¶
func (p *Producer) Stop()
Stop gracefully shutsdown the producer, cancelling all inflight requests and waiting for all backend connections to be closed.
It is safe to call the method multiple times and from multiple goroutines, they will all block until the producer has been completely shutdown.
type ProducerConfig ¶
type ProducerConfig struct { Address string Topic string MaxConcurrency int Identify Identify TLS TLSConfig DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration }
ProducerConfig carries the different variables to tune a newly started producer.
type ProducerInfo ¶
type ProducerRequest ¶
type ProducerRequest struct { Topic string Message []byte Response chan<- error Deadline time.Time }
ProducerRequest are used to represent operations that are submitted to producers.
type Pub ¶
type Pub struct { // Topic must be set to the name of the topic to which the message will be // published. Topic string // Message is the raw message to publish. Message []byte }
Pub represents the PUB command.
type Rdy ¶
type Rdy struct { // Count is the number of messages that a consumer is ready to accept. Count int }
Rdy represents the RDY command.
type Req ¶
type Req struct { // MessageID is the ID of the message to requeue. MessageID MessageID // Timeout is the duration NSQ will wait for before sending this message // again to a client. Timeout time.Duration }
Req represents the REQ command.
type Response ¶
type Response string
Response is a frame type representing success responses to commands.
type Sub ¶
type Sub struct { // Topic must be set to the name of the topic to subscribe to. Topic string // Channel must be set to the name of the channel to subscribe to. Channel string }
Sub represents the SUB command.
type Touch ¶
type Touch struct { // MessageID is the ID of the message that the timeout will be reset. MessageID MessageID }
Touch represents the TOUCH command.
type UnknownFrame ¶
type UnknownFrame struct { // Type is the type of the frame. Type FrameType // Data contains the raw data of the frame. Data []byte }
UnknownFrame is used to represent frames of unknown types for which the library has no special implementation.
func (UnknownFrame) FrameType ¶
func (f UnknownFrame) FrameType() FrameType
FrameType returns the code representing the frame type, satisfies the Frame interface.