writer

package
v0.15.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2020 License: Apache-2.0 Imports: 26 Imported by: 2

README

m3msg writer

Messages are written in the following manner:

  1. Write to the public Writer in writer.go, which acquires read lock on writer (can be concurrent).
  2. That writes to all registered consumerServiceWriter writers (one per downstream service) in a sequential loop, one after another.
  3. The consumerServiceWriter selects a shard by asking message what shard it is and writes immediately to that shard's shardWriter, without taking any locks in any of this process (should check for out of bounds of the shard in future).
  4. The shardWriter then acquires a read lock and writes it to a messageWriter.
  5. The messageWriter then acquires a write lock on itself and pushes the message onto a queue, at this point it seems messageWriter has a single consumerWriter which it sends message in a batch to from the messageWriter queue pertiodically with writeBatch.
  6. The consumerWriter (one per downstream consumer instance) then takes a write lock for the connection index selected every write that it receives. The messageWriter selects the connection index based on the shard ID so that shards should balance the connection they ultimately use to send data downstream to instances (so IO is not blocked on a per downstream instance).

Documentation

Overview

Package writer is a generated GoMock package.

Package writer is a generated GoMock package.

Package writer is a generated GoMock package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewWriter

func NewWriter(opts Options) producer.Writer

NewWriter creates a new writer.

Types

type ConnectionOptions

type ConnectionOptions interface {
	// NumConnections returns the number of connections.
	NumConnections() int

	// SetNumConnections sets the number of connections.
	SetNumConnections(value int) ConnectionOptions

	// DialTimeout returns the dial timeout.
	DialTimeout() time.Duration

	// SetDialTimeout sets the dial timeout.
	SetDialTimeout(value time.Duration) ConnectionOptions

	// WriteTimeout returns the write timeout.
	WriteTimeout() time.Duration

	// SetWriteTimeout sets the write timeout.
	SetWriteTimeout(value time.Duration) ConnectionOptions

	// KeepAlivePeriod returns the keepAlivePeriod.
	KeepAlivePeriod() time.Duration

	// SetKeepAlivePeriod sets the keepAlivePeriod.
	SetKeepAlivePeriod(value time.Duration) ConnectionOptions

	// ResetDelay returns the delay before resetting connection.
	ResetDelay() time.Duration

	// SetResetDelay sets the delay before resetting connection.
	SetResetDelay(value time.Duration) ConnectionOptions

	// RetryOptions returns the options for connection retrier.
	RetryOptions() retry.Options

	// SetRetryOptions sets the options for connection retrier.
	SetRetryOptions(value retry.Options) ConnectionOptions

	// FlushInterval returns the interval for flushing the buffered bytes.
	FlushInterval() time.Duration

	// SetFlushInterval sets the interval for flushing the buffered bytes.
	SetFlushInterval(value time.Duration) ConnectionOptions

	// WriteBufferSize returns the buffer size for write.
	WriteBufferSize() int

	// SetWriteBufferSize sets the buffer size for write.
	SetWriteBufferSize(value int) ConnectionOptions

	// ReadBufferSize returns the buffer size for read.
	ReadBufferSize() int

	// SetReadBufferSize sets the buffer size for read.
	SetReadBufferSize(value int) ConnectionOptions

	// InstrumentOptions returns the instrument options.
	InstrumentOptions() instrument.Options

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) ConnectionOptions
}

ConnectionOptions configs the connections.

func NewConnectionOptions

func NewConnectionOptions() ConnectionOptions

NewConnectionOptions creates ConnectionOptions.

type MockackRouter

type MockackRouter struct {
	// contains filtered or unexported fields
}

MockackRouter is a mock of ackRouter interface

func NewMockackRouter

func NewMockackRouter(ctrl *gomock.Controller) *MockackRouter

NewMockackRouter creates a new mock instance

func (*MockackRouter) Ack

func (m *MockackRouter) Ack(ack metadata) error

Ack mocks base method

func (*MockackRouter) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockackRouter) Register

func (m *MockackRouter) Register(replicatedShardID uint64, mw messageWriter)

Register mocks base method

func (*MockackRouter) Unregister

func (m *MockackRouter) Unregister(replicatedShardID uint64)

Unregister mocks base method

type MockackRouterMockRecorder

type MockackRouterMockRecorder struct {
	// contains filtered or unexported fields
}

MockackRouterMockRecorder is the mock recorder for MockackRouter

func (*MockackRouterMockRecorder) Ack

func (mr *MockackRouterMockRecorder) Ack(ack interface{}) *gomock.Call

Ack indicates an expected call of Ack

func (*MockackRouterMockRecorder) Register

func (mr *MockackRouterMockRecorder) Register(replicatedShardID, mw interface{}) *gomock.Call

Register indicates an expected call of Register

func (*MockackRouterMockRecorder) Unregister

func (mr *MockackRouterMockRecorder) Unregister(replicatedShardID interface{}) *gomock.Call

Unregister indicates an expected call of Unregister

type MockconsumerServiceWriter

type MockconsumerServiceWriter struct {
	// contains filtered or unexported fields
}

MockconsumerServiceWriter is a mock of consumerServiceWriter interface

func NewMockconsumerServiceWriter

func NewMockconsumerServiceWriter(ctrl *gomock.Controller) *MockconsumerServiceWriter

NewMockconsumerServiceWriter creates a new mock instance

func (*MockconsumerServiceWriter) Close

func (m *MockconsumerServiceWriter) Close()

Close mocks base method

func (*MockconsumerServiceWriter) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockconsumerServiceWriter) Init

func (m *MockconsumerServiceWriter) Init(arg0 initType) error

Init mocks base method

func (*MockconsumerServiceWriter) RegisterFilter

func (m *MockconsumerServiceWriter) RegisterFilter(fn producer.FilterFunc)

RegisterFilter mocks base method

func (*MockconsumerServiceWriter) SetMessageTTLNanos

func (m *MockconsumerServiceWriter) SetMessageTTLNanos(value int64)

SetMessageTTLNanos mocks base method

func (*MockconsumerServiceWriter) UnregisterFilter

func (m *MockconsumerServiceWriter) UnregisterFilter()

UnregisterFilter mocks base method

func (*MockconsumerServiceWriter) Write

Write mocks base method

type MockconsumerServiceWriterMockRecorder

type MockconsumerServiceWriterMockRecorder struct {
	// contains filtered or unexported fields
}

MockconsumerServiceWriterMockRecorder is the mock recorder for MockconsumerServiceWriter

func (*MockconsumerServiceWriterMockRecorder) Close

Close indicates an expected call of Close

func (*MockconsumerServiceWriterMockRecorder) Init

func (mr *MockconsumerServiceWriterMockRecorder) Init(arg0 interface{}) *gomock.Call

Init indicates an expected call of Init

func (*MockconsumerServiceWriterMockRecorder) RegisterFilter

func (mr *MockconsumerServiceWriterMockRecorder) RegisterFilter(fn interface{}) *gomock.Call

RegisterFilter indicates an expected call of RegisterFilter

func (*MockconsumerServiceWriterMockRecorder) SetMessageTTLNanos

func (mr *MockconsumerServiceWriterMockRecorder) SetMessageTTLNanos(value interface{}) *gomock.Call

SetMessageTTLNanos indicates an expected call of SetMessageTTLNanos

func (*MockconsumerServiceWriterMockRecorder) UnregisterFilter

func (mr *MockconsumerServiceWriterMockRecorder) UnregisterFilter() *gomock.Call

UnregisterFilter indicates an expected call of UnregisterFilter

func (*MockconsumerServiceWriterMockRecorder) Write

func (mr *MockconsumerServiceWriterMockRecorder) Write(rm interface{}) *gomock.Call

Write indicates an expected call of Write

type MockshardWriter

type MockshardWriter struct {
	// contains filtered or unexported fields
}

MockshardWriter is a mock of shardWriter interface

func NewMockshardWriter

func NewMockshardWriter(ctrl *gomock.Controller) *MockshardWriter

NewMockshardWriter creates a new mock instance

func (*MockshardWriter) Close

func (m *MockshardWriter) Close()

Close mocks base method

func (*MockshardWriter) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockshardWriter) QueueSize

func (m *MockshardWriter) QueueSize() int

QueueSize mocks base method

func (*MockshardWriter) SetMessageTTLNanos

func (m *MockshardWriter) SetMessageTTLNanos(value int64)

SetMessageTTLNanos mocks base method

func (*MockshardWriter) UpdateInstances

func (m *MockshardWriter) UpdateInstances(instances []placement.Instance, cws map[string]consumerWriter)

UpdateInstances mocks base method

func (*MockshardWriter) Write

Write mocks base method

type MockshardWriterMockRecorder

type MockshardWriterMockRecorder struct {
	// contains filtered or unexported fields
}

MockshardWriterMockRecorder is the mock recorder for MockshardWriter

func (*MockshardWriterMockRecorder) Close

Close indicates an expected call of Close

func (*MockshardWriterMockRecorder) QueueSize

func (mr *MockshardWriterMockRecorder) QueueSize() *gomock.Call

QueueSize indicates an expected call of QueueSize

func (*MockshardWriterMockRecorder) SetMessageTTLNanos

func (mr *MockshardWriterMockRecorder) SetMessageTTLNanos(value interface{}) *gomock.Call

SetMessageTTLNanos indicates an expected call of SetMessageTTLNanos

func (*MockshardWriterMockRecorder) UpdateInstances

func (mr *MockshardWriterMockRecorder) UpdateInstances(instances, cws interface{}) *gomock.Call

UpdateInstances indicates an expected call of UpdateInstances

func (*MockshardWriterMockRecorder) Write

func (mr *MockshardWriterMockRecorder) Write(rm interface{}) *gomock.Call

Write indicates an expected call of Write

type Options

type Options interface {
	// TopicName returns the topic name.
	TopicName() string

	// SetTopicName sets the topic name.
	SetTopicName(value string) Options

	// TopicService returns the topic service.
	TopicService() topic.Service

	// SetTopicService sets the topic service.
	SetTopicService(value topic.Service) Options

	// TopicWatchInitTimeout returns the timeout for topic watch initialization.
	TopicWatchInitTimeout() time.Duration

	// SetTopicWatchInitTimeout sets the timeout for topic watch initialization.
	SetTopicWatchInitTimeout(value time.Duration) Options

	// ServiceDiscovery returns the client to service discovery service.
	ServiceDiscovery() services.Services

	// SetServiceDiscovery sets the client to service discovery services.
	SetServiceDiscovery(value services.Services) Options

	// PlacementOptions returns the placement options.
	PlacementOptions() placement.Options

	// SetPlacementOptions sets the placement options.
	SetPlacementOptions(value placement.Options) Options

	// PlacementWatchInitTimeout returns the timeout for placement watch initialization.
	PlacementWatchInitTimeout() time.Duration

	// SetPlacementWatchInitTimeout sets the timeout for placement watch initialization.
	SetPlacementWatchInitTimeout(value time.Duration) Options

	// MessagePoolOptions returns the options of pool for messages.
	MessagePoolOptions() pool.ObjectPoolOptions

	// SetMessagePoolOptions sets the options of pool for messages.
	SetMessagePoolOptions(value pool.ObjectPoolOptions) Options

	// MessageRetryOptions returns the retry options for message retry.
	MessageRetryOptions() retry.Options

	// MessageRetryOptions returns the retry options for message retry.
	SetMessageRetryOptions(value retry.Options) Options

	// MessageQueueNewWritesScanInterval returns the interval between scanning
	// message queue for new writes.
	MessageQueueNewWritesScanInterval() time.Duration

	// SetMessageQueueNewWritesScanInterval sets the interval between scanning
	// message queue for new writes.
	SetMessageQueueNewWritesScanInterval(value time.Duration) Options

	// MessageQueueFullScanInterval returns the interval between scanning
	// message queue for retriable writes and cleanups.
	MessageQueueFullScanInterval() time.Duration

	// SetMessageQueueFullScanInterval sets the interval between scanning
	// message queue for retriable writes and cleanups.
	SetMessageQueueFullScanInterval(value time.Duration) Options

	// MessageQueueScanBatchSize returns the batch size for queue scan.
	MessageQueueScanBatchSize() int

	// SetMessageQueueScanBatchSize sets the batch size for queue scan.
	SetMessageQueueScanBatchSize(value int) Options

	// InitialAckMapSize returns the initial size of the ack map.
	InitialAckMapSize() int

	// SetInitialAckMapSize sets the initial size of the ack map.
	SetInitialAckMapSize(value int) Options

	// CloseCheckInterval returns the close check interval.
	CloseCheckInterval() time.Duration

	// SetCloseCheckInterval sets the close check interval.
	SetCloseCheckInterval(value time.Duration) Options

	// AckErrorRetryOptions returns the retrier for ack errors.
	AckErrorRetryOptions() retry.Options

	// SetAckErrorRetryOptions sets the retrier for ack errors.
	SetAckErrorRetryOptions(value retry.Options) Options

	// EncoderOptions returns the encoder's options.
	EncoderOptions() proto.Options

	// SetEncoderOptions sets the encoder's options.
	SetEncoderOptions(value proto.Options) Options

	// EncoderOptions returns the decoder's options.
	DecoderOptions() proto.Options

	// SetEncoderOptions sets the decoder's options.
	SetDecoderOptions(value proto.Options) Options

	// ConnectionOptions returns the options for connections.
	ConnectionOptions() ConnectionOptions

	// SetConnectionOptions sets the options for connections.
	SetConnectionOptions(value ConnectionOptions) Options

	// InstrumentOptions returns the instrument options.
	InstrumentOptions() instrument.Options

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) Options
}

Options configs the writer.

func NewOptions

func NewOptions() Options

NewOptions creates Options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL