consumer

package
v0.0.0-...-9649366 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2019 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package consumer is a generated GoMock package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConsumerHandler

func NewConsumerHandler(consumeFn ConsumeFn, opts Options) server.Handler

NewConsumerHandler creates a new server handler with consumerFn.

func NewMessageHandler

func NewMessageHandler(mp MessageProcessor, opts Options) server.Handler

NewMessageHandler creates a new server handler with messageFn.

Types

type Configuration

type Configuration struct {
	Encoder                   *proto.Configuration      `yaml:"encoder"`
	Decoder                   *proto.Configuration      `yaml:"decoder"`
	MessagePool               *MessagePoolConfiguration `yaml:"messagePool"`
	AckFlushInterval          *time.Duration            `yaml:"ackFlushInterval"`
	AckBufferSize             *int                      `yaml:"ackBufferSize"`
	ConnectionWriteBufferSize *int                      `yaml:"connectionWriteBufferSize"`
	ConnectionReadBufferSize  *int                      `yaml:"connectionReadBufferSize"`
}

Configuration configs the consumer options.

func (*Configuration) NewOptions

func (c *Configuration) NewOptions(iOpts instrument.Options) Options

NewOptions creates consumer options.

type ConsumeFn

type ConsumeFn func(c Consumer)

ConsumeFn processes the consumer. This is useful when user want to reuse resource across messages received on the same consumer or have finer level control on how to read messages from consumer.

type Consumer

type Consumer interface {
	// Message waits for and returns the next message received.
	Message() (Message, error)

	// Init initializes the consumer.
	Init()

	// Close closes the consumer.
	Close()
}

Consumer receives messages from a connection.

type Listener

type Listener interface {
	// Accept waits for and returns the next connection based consumer.
	Accept() (Consumer, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() net.Addr
}

Listener is a consumer listener based on a network address.

func NewListener

func NewListener(addr string, opts Options) (Listener, error)

NewListener creates a consumer listener.

type Message

type Message interface {
	// Bytes returns the bytes.
	Bytes() []byte

	// Ack acks the message.
	Ack()
}

Message carries the data that needs to be processed.

type MessagePoolConfiguration

type MessagePoolConfiguration struct {
	// Size is the size of the pool.
	Size int `yaml:"size"`

	// Watermark is the object pool watermark configuration.
	Watermark pool.WatermarkConfiguration `yaml:"watermark"`

	// MaxBufferReuseSize specifies the maximum buffer which can
	// be reused and pooled, if a buffer greater than this
	// is used then it is discarded. Zero specifies no limit.
	MaxBufferReuseSize int `yaml:"maxBufferReuseSize"`
}

MessagePoolConfiguration is the message pool configuration options, which extends the default object pool configuration.

func (MessagePoolConfiguration) NewOptions

NewOptions creates message pool options.

type MessagePoolOptions

type MessagePoolOptions struct {
	PoolOptions pool.ObjectPoolOptions

	// MaxBufferReuseSize specifies the maximum buffer which can
	// be reused and pooled, if a buffer greater than this
	// is used then it is discarded. Zero specifies no limit.
	MaxBufferReuseSize int
}

MessagePoolOptions are options to use when creating the message pool.

type MessageProcessor

type MessageProcessor interface {
	Process(m Message)
	Close()
}

MessageProcessor processes the message. When a MessageProcessor was set in the server, it will be called to process every message received.

type MockMessage

type MockMessage struct {
	// contains filtered or unexported fields
}

MockMessage is a mock of Message interface

func NewMockMessage

func NewMockMessage(ctrl *gomock.Controller) *MockMessage

NewMockMessage creates a new mock instance

func (*MockMessage) Ack

func (m *MockMessage) Ack()

Ack mocks base method

func (*MockMessage) Bytes

func (m *MockMessage) Bytes() []byte

Bytes mocks base method

func (*MockMessage) EXPECT

func (m *MockMessage) EXPECT() *MockMessageMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

type MockMessageMockRecorder

type MockMessageMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageMockRecorder is the mock recorder for MockMessage

func (*MockMessageMockRecorder) Ack

func (mr *MockMessageMockRecorder) Ack() *gomock.Call

Ack indicates an expected call of Ack

func (*MockMessageMockRecorder) Bytes

func (mr *MockMessageMockRecorder) Bytes() *gomock.Call

Bytes indicates an expected call of Bytes

type MockMessageProcessor

type MockMessageProcessor struct {
	// contains filtered or unexported fields
}

MockMessageProcessor is a mock of MessageProcessor interface

func NewMockMessageProcessor

func NewMockMessageProcessor(ctrl *gomock.Controller) *MockMessageProcessor

NewMockMessageProcessor creates a new mock instance

func (*MockMessageProcessor) Close

func (m *MockMessageProcessor) Close()

Close mocks base method

func (*MockMessageProcessor) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockMessageProcessor) Process

func (m *MockMessageProcessor) Process(arg0 Message)

Process mocks base method

type MockMessageProcessorMockRecorder

type MockMessageProcessorMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageProcessorMockRecorder is the mock recorder for MockMessageProcessor

func (*MockMessageProcessorMockRecorder) Close

Close indicates an expected call of Close

func (*MockMessageProcessorMockRecorder) Process

func (mr *MockMessageProcessorMockRecorder) Process(arg0 interface{}) *gomock.Call

Process indicates an expected call of Process

type Options

type Options interface {
	// EncoderOptions returns the options for Encoder.
	EncoderOptions() proto.Options

	// SetEncoderOptions sets the options for Encoder.
	SetEncoderOptions(value proto.Options) Options

	// DecoderOptions returns the options for Decoder.
	DecoderOptions() proto.Options

	// SetDecoderOptions sets the options for Decoder.
	SetDecoderOptions(value proto.Options) Options

	// MessagePoolOptions returns the options for message pool.
	MessagePoolOptions() MessagePoolOptions

	// SetMessagePoolOptions sets the options for message pool.
	SetMessagePoolOptions(value MessagePoolOptions) Options

	// AckFlushInterval returns the ack flush interval.
	AckFlushInterval() time.Duration

	// SetAckFlushInterval sets the ack flush interval.
	SetAckFlushInterval(value time.Duration) Options

	// AckBufferSize returns the ack buffer size.
	AckBufferSize() int

	// SetAckBufferSize sets the ack buffer size.
	SetAckBufferSize(value int) Options

	// ConnectionWriteBufferSize returns the size of buffer before a write or a read.
	ConnectionWriteBufferSize() int

	// SetConnectionWriteBufferSize sets the buffer size.
	SetConnectionWriteBufferSize(value int) Options

	// ConnectionReadBufferSize returns the size of buffer before a write or a read.
	ConnectionReadBufferSize() int

	// SetConnectionWriteBufferSize sets the buffer size.
	SetConnectionReadBufferSize(value int) Options

	// InstrumentOptions returns the instrument options.
	InstrumentOptions() instrument.Options

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) Options
}

Options configs the consumer listener.

func NewOptions

func NewOptions() Options

NewOptions creates a new options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL