Documentation ¶
Overview ¶
Package consumer is a generated GoMock package.
Index ¶
- func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler
- type Configuration
- type ConsumeFn
- type Consumer
- type Listener
- type Message
- type MessagePoolConfiguration
- type MessagePoolOptions
- type MessageProcessor
- type MockMessage
- type MockMessageMockRecorder
- type MockMessageProcessor
- type MockMessageProcessorMockRecorder
- type NewMessageProcessorFn
- type Options
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMessageHandler ¶ added in v0.5.0
func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler
NewMessageHandler creates a new server handler with messageFn.
Types ¶
type Configuration ¶
type Configuration struct { Encoder *proto.Configuration `yaml:"encoder"` Decoder *proto.Configuration `yaml:"decoder"` MessagePool *MessagePoolConfiguration `yaml:"messagePool"` AckFlushInterval *time.Duration `yaml:"ackFlushInterval"` AckBufferSize *int `yaml:"ackBufferSize"` ConnectionWriteBufferSize *int `yaml:"connectionWriteBufferSize"` ConnectionReadBufferSize *int `yaml:"connectionReadBufferSize"` ConnectionWriteTimeout *time.Duration `yaml:"connectionWriteTimeout"` }
Configuration configs the consumer options.
func (*Configuration) NewOptions ¶
func (c *Configuration) NewOptions(iOpts instrument.Options) Options
NewOptions creates consumer options.
type ConsumeFn ¶
type ConsumeFn func(c Consumer)
ConsumeFn processes the consumer. This is useful when user want to reuse resource across messages received on the same consumer or have finer level control on how to read messages from consumer.
type Consumer ¶
type Consumer interface { // Message waits for and returns the next message received. Message() (Message, error) // Init initializes the consumer. Init() // Close closes the consumer. Close() }
Consumer receives messages from a connection.
type Listener ¶
type Listener interface { // Accept waits for and returns the next connection based consumer. Accept() (Consumer, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() net.Addr }
Listener is a consumer listener based on a network address.
type Message ¶
type Message interface { // Bytes returns the bytes. Bytes() []byte // Ack acks the message. Ack() // ShardID returns shard ID of the Message. ShardID() uint64 // SentAtNanos returns when the producer sent the Message. SentAtNanos() uint64 }
Message carries the data that needs to be processed.
type MessagePoolConfiguration ¶ added in v0.5.0
type MessagePoolConfiguration struct { // Size is the size of the pool. Size pool.Size `yaml:"size"` // Watermark is the object pool watermark configuration. Watermark pool.WatermarkConfiguration `yaml:"watermark"` // MaxBufferReuseSize specifies the maximum buffer which can // be reused and pooled, if a buffer greater than this // is used then it is discarded. Zero specifies no limit. MaxBufferReuseSize int `yaml:"maxBufferReuseSize"` }
MessagePoolConfiguration is the message pool configuration options, which extends the default object pool configuration.
func (MessagePoolConfiguration) NewOptions ¶ added in v0.5.0
func (c MessagePoolConfiguration) NewOptions( iopts instrument.Options, ) MessagePoolOptions
NewOptions creates message pool options.
type MessagePoolOptions ¶ added in v0.5.0
type MessagePoolOptions struct { PoolOptions pool.ObjectPoolOptions // MaxBufferReuseSize specifies the maximum buffer which can // be reused and pooled, if a buffer greater than this // is used then it is discarded. Zero specifies no limit. MaxBufferReuseSize int }
MessagePoolOptions are options to use when creating the message pool.
type MessageProcessor ¶ added in v0.5.0
type MessageProcessor interface { Process(m Message) Close() }
MessageProcessor processes the message. When a MessageProcessor was set in the server, it will be called to process every message received.
func NewNoOpMessageProcessor ¶ added in v1.4.0
func NewNoOpMessageProcessor() MessageProcessor
NewNoOpMessageProcessor creates a new MessageProcessor that does nothing.
type MockMessage ¶
type MockMessage struct {
// contains filtered or unexported fields
}
MockMessage is a mock of Message interface.
func NewMockMessage ¶
func NewMockMessage(ctrl *gomock.Controller) *MockMessage
NewMockMessage creates a new mock instance.
func (*MockMessage) EXPECT ¶
func (m *MockMessage) EXPECT() *MockMessageMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockMessage) SentAtNanos ¶ added in v1.4.0
func (m *MockMessage) SentAtNanos() uint64
SentAtNanos mocks base method.
func (*MockMessage) ShardID ¶ added in v1.2.0
func (m *MockMessage) ShardID() uint64
ShardID mocks base method.
type MockMessageMockRecorder ¶
type MockMessageMockRecorder struct {
// contains filtered or unexported fields
}
MockMessageMockRecorder is the mock recorder for MockMessage.
func (*MockMessageMockRecorder) Ack ¶
func (mr *MockMessageMockRecorder) Ack() *gomock.Call
Ack indicates an expected call of Ack.
func (*MockMessageMockRecorder) Bytes ¶
func (mr *MockMessageMockRecorder) Bytes() *gomock.Call
Bytes indicates an expected call of Bytes.
func (*MockMessageMockRecorder) SentAtNanos ¶ added in v1.4.0
func (mr *MockMessageMockRecorder) SentAtNanos() *gomock.Call
SentAtNanos indicates an expected call of SentAtNanos.
func (*MockMessageMockRecorder) ShardID ¶ added in v1.2.0
func (mr *MockMessageMockRecorder) ShardID() *gomock.Call
ShardID indicates an expected call of ShardID.
type MockMessageProcessor ¶ added in v0.5.0
type MockMessageProcessor struct {
// contains filtered or unexported fields
}
MockMessageProcessor is a mock of MessageProcessor interface.
func NewMockMessageProcessor ¶ added in v0.5.0
func NewMockMessageProcessor(ctrl *gomock.Controller) *MockMessageProcessor
NewMockMessageProcessor creates a new mock instance.
func (*MockMessageProcessor) Close ¶ added in v0.5.0
func (m *MockMessageProcessor) Close()
Close mocks base method.
func (*MockMessageProcessor) EXPECT ¶ added in v0.5.0
func (m *MockMessageProcessor) EXPECT() *MockMessageProcessorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockMessageProcessor) Process ¶ added in v0.5.0
func (m *MockMessageProcessor) Process(arg0 Message)
Process mocks base method.
type MockMessageProcessorMockRecorder ¶ added in v0.5.0
type MockMessageProcessorMockRecorder struct {
// contains filtered or unexported fields
}
MockMessageProcessorMockRecorder is the mock recorder for MockMessageProcessor.
func (*MockMessageProcessorMockRecorder) Close ¶ added in v0.5.0
func (mr *MockMessageProcessorMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close.
func (*MockMessageProcessorMockRecorder) Process ¶ added in v0.5.0
func (mr *MockMessageProcessorMockRecorder) Process(arg0 interface{}) *gomock.Call
Process indicates an expected call of Process.
type NewMessageProcessorFn ¶ added in v1.4.0
type NewMessageProcessorFn func() MessageProcessor
NewMessageProcessorFn creates a new MessageProcessor scoped to a single connection. Messages are processed serially in a connection.
func SingletonMessageProcessor ¶ added in v1.4.0
func SingletonMessageProcessor(p MessageProcessor) NewMessageProcessorFn
SingletonMessageProcessor uses the same MessageProcessor for all connections.
type Options ¶
type Options interface { // EncoderOptions returns the options for Encoder. EncoderOptions() proto.Options // SetEncoderOptions sets the options for Encoder. SetEncoderOptions(value proto.Options) Options // DecoderOptions returns the options for Decoder. DecoderOptions() proto.Options // SetDecoderOptions sets the options for Decoder. SetDecoderOptions(value proto.Options) Options // MessagePoolOptions returns the options for message pool. MessagePoolOptions() MessagePoolOptions // SetMessagePoolOptions sets the options for message pool. SetMessagePoolOptions(value MessagePoolOptions) Options // AckFlushInterval returns the ack flush interval. AckFlushInterval() time.Duration // SetAckFlushInterval sets the ack flush interval. SetAckFlushInterval(value time.Duration) Options // AckBufferSize returns the ack buffer size. AckBufferSize() int // SetAckBufferSize sets the ack buffer size. SetAckBufferSize(value int) Options // ConnectionWriteBufferSize returns the size of buffer before a write or a read. ConnectionWriteBufferSize() int // SetConnectionWriteBufferSize sets the buffer size. SetConnectionWriteBufferSize(value int) Options // ConnectionReadBufferSize returns the size of buffer before a write or a read. ConnectionReadBufferSize() int // SetConnectionWriteBufferSize sets the buffer size. SetConnectionReadBufferSize(value int) Options // ConnectionWriteTimeout returns the timeout for writing to the connection. ConnectionWriteTimeout() time.Duration // SetConnectionWriteTimeout sets the write timeout for the connection. SetConnectionWriteTimeout(value time.Duration) Options // InstrumentOptions returns the instrument options. InstrumentOptions() instrument.Options // SetInstrumentOptions sets the instrument options. SetInstrumentOptions(value instrument.Options) Options }
Options configs the consumer listener.