Documentation ¶
Overview ¶
Package nsqpb is a generated GoMock package.
Index ¶
- func ConvertLogLevel(level logrus.Level) nsq.LogLevel
- func LookupTopic(nsqdLookupHostPort string, topic string) bool
- type HandleMessage
- type MockProducer
- type MockProducerMockRecorder
- type NSQLogger
- type NsqConfig
- type PbProduce
- type Producer
- type ProtoConsume
- func (p *ProtoConsume) AddTopic(supportedTopic string)
- func (p *ProtoConsume) ConsumeMessages(topicName string, channelName string) error
- func (p *ProtoConsume) DeleteTopic(toRemove string)
- func (p *ProtoConsume) GetStats() []*nsq.ConsumerStats
- func (p *ProtoConsume) GetTopics() []string
- func (p *ProtoConsume) NSQProtoConsume(msg *nsq.Message) error
- func (p *ProtoConsume) Pause()
- func (p *ProtoConsume) UnPause()
- func (p *ProtoConsume) WaitThenConsume(topic, channel string, handler HandleMessage, waitInterval int)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertLogLevel ¶
func LookupTopic ¶
LookupTopics goes to nsqd and checks to see if topic is supported
Types ¶
type HandleMessage ¶
type HandleMessage interface {
UnmarshalAndProcess(msg []byte, done chan int, finish chan int) error
}
HandleMessage is an interface for unmarshalling your messages to a struct or protobuf message, then processing the object. Fulfilling this interface is how you would interact w/ the nsq channels
type MockProducer ¶
type MockProducer struct {
// contains filtered or unexported fields
}
MockProducer is a mock of Producer interface
func NewMockProducer ¶
func NewMockProducer(ctrl *gomock.Controller) *MockProducer
NewMockProducer creates a new mock instance
func (*MockProducer) EXPECT ¶
func (m *MockProducer) EXPECT() *MockProducerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProducer) WriteProto ¶
func (m *MockProducer) WriteProto(message proto.Message, topicName string) error
WriteProto mocks base method
type MockProducerMockRecorder ¶
type MockProducerMockRecorder struct {
// contains filtered or unexported fields
}
MockProducerMockRecorder is the mock recorder for MockProducer
func (*MockProducerMockRecorder) WriteProto ¶
func (mr *MockProducerMockRecorder) WriteProto(message, topicName interface{}) *gomock.Call
WriteProto indicates an expected call of WriteProto
type NSQLogger ¶
type NSQLogger struct{}
func NewNSQLogger ¶
func NewNSQLogger() (logger NSQLogger, level nsq.LogLevel)
func NewNSQLoggerAtLevel ¶
type NsqConfig ¶
type NsqConfig struct { NsqLookupdIp string NsqdIp string NsqdPort string NsqLookupdPort string MaxInFlight int TouchInterval int Timeout int64 // seconds }
func DefaultNsqConf ¶
func DefaultNsqConf() *NsqConfig
DefaultNsqConf returns new NsqConfig struct with default values. Searches environment variables for nsqlookupd ip addr and nsqd ip addr. defaults to 127.0.0.1 if not found.
func (*NsqConfig) LookupDAddress ¶
LookupDAddress returns `<ip>:<port>` of configured nsqlookupd, the format nsq package takes
func (*NsqConfig) NsqDAddress ¶
NsqDAddress returns `<ip>:<port>` of configured nsqd, the format nsq package takes
type PbProduce ¶
type PbProduce struct { Producer *nsq.Producer // contains filtered or unexported fields }
func DefaultProducer ¶
DefaultProducer will create a nsq producer using default config settings (getting nsqd addresses from teh environment with LOCALHOST defaults) it will also ping its configured nsqd to ensure that the producer has been configured correctly.
func GetInitProducer ¶
func GetInitProducer() *PbProduce
use this to get a producer instance in your code, it will call only once. need to have global variable once and cachedProducer set in your service, then pass those to this. look into sync.Once if confused
func (*PbProduce) WriteProto ¶
Write Protobuf Message to an NSQ topic with name topicName Gets the ip of the NSQ daemon from either the environment variable
`NSQD_IP` or sets it to 127.0.0.1. the NSQ daemon should run alongside
any service that produces messages, so this will work usually.
type ProtoConsume ¶
type ProtoConsume struct { Handler HandleMessage DecodeConfig *nsq.Config Config *NsqConfig StopChan chan int ConsumerRecovery func() MessageRecovery func(message *nsq.Message) // contains filtered or unexported fields }
ProtoConsume wraps nsq.Message so that code outside the package can just add a UnmarshalProtoFunc that doesn't require messing with nsq fields. just write a function that unmarshals to your proto object and does work ...put in WORK.
func NewDefaultProtoConsume ¶
func NewDefaultProtoConsume() *ProtoConsume
NewDefaultProtoConsume returns a new ProtoConsume object with nsq configuration and nsqpb configuration. also sets default message recovery and consumer recovery functions
func (*ProtoConsume) AddTopic ¶
func (p *ProtoConsume) AddTopic(supportedTopic string)
Adds a supported topic to store on consumer
func (*ProtoConsume) ConsumeMessages ¶
func (p *ProtoConsume) ConsumeMessages(topicName string, channelName string) error
Consume messages on a given topic / channel in NSQ protoconsume's UnmarshalProtoFunc will be added with a wrapper as a handler for the consumer. The ip address of the NSQLookupd instance can be set by the environment variable NSQLOOKUPD_IP, but will default to 127.0.0.1
func (*ProtoConsume) DeleteTopic ¶
func (p *ProtoConsume) DeleteTopic(toRemove string)
TODO: does it matter to add a bool for pass/fail
func (*ProtoConsume) GetStats ¶
func (p *ProtoConsume) GetStats() []*nsq.ConsumerStats
func (*ProtoConsume) GetTopics ¶
func (p *ProtoConsume) GetTopics() []string
Retrieves all consumer supported topics
func (*ProtoConsume) NSQProtoConsume ¶
func (p *ProtoConsume) NSQProtoConsume(msg *nsq.Message) error
NSQProtoConsume is a wrapper for `p.Handler.UnmarshalAndProcess` --> `nsq.HandlerFunc`
func (*ProtoConsume) Pause ¶
func (p *ProtoConsume) Pause()
func (*ProtoConsume) UnPause ¶
func (p *ProtoConsume) UnPause()
func (*ProtoConsume) WaitThenConsume ¶ added in v0.4.2
func (p *ProtoConsume) WaitThenConsume(topic, channel string, handler HandleMessage, waitInterval int)
WaitThenConsume will first ensure that the given topic exists by checking in lookupd (configured in ProtoConsume). If it doesn't exist, then it will seep for waitInterval seconds. If it does exists, then it will set *ProtoConsume's Handler to handler and start (*ProtoConsume).ConsumeMessages on the given topic and channel. any errors encountered will be logged this function doesn't return anything.