Documentation ¶
Index ¶
Constants ¶
View Source
const ( ErrBatchTooLarge = "BatchTooLarge" ErrMessageTooLarge = "MessageTooLarge" )
View Source
const InfluxMaxBatchSize = 500
View Source
const KinesisMaxNumberOfRecords = 500
View Source
const KinesisMaxSizeInBytes = 5 * MEGABYTE
View Source
const KinesisPartitionKeyMaxSize = 256
View Source
const MEGABYTE = 1024 * 1024
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher interface { // accepts a message for dispatching Put([]byte) bool // the dispatching worker Dispatch() }
TODO: Remove this interface (we're covered by service now)
type EchoService ¶
type EchoService struct{}
Used for local testing / debugging
func (*EchoService) CreateBatch ¶
func (svc *EchoService) CreateBatch() Batch
prints the message to stdout
func (*EchoService) Send ¶
func (svc *EchoService) Send(batch Batch) error
type InfluxBatch ¶
type InfluxBatch struct {
// contains filtered or unexported fields
}
func NewInfluxBatch ¶
func NewInfluxBatch() *InfluxBatch
func (*InfluxBatch) Add ¶
func (batch *InfluxBatch) Add(message []byte) error
func (*InfluxBatch) CanAdd ¶
func (batch *InfluxBatch) CanAdd(message []byte) bool
func (*InfluxBatch) Len ¶
func (batch *InfluxBatch) Len() int
type InfluxService ¶
func NewInfluxService ¶
func NewInfluxService(host string, database string) *InfluxService
func (*InfluxService) CreateBatch ¶
func (svc *InfluxService) CreateBatch() Batch
func (*InfluxService) Send ¶
func (svc *InfluxService) Send(batch Batch) error
type KinesisBatch ¶
type KinesisBatch struct {
// contains filtered or unexported fields
}
func NewKinesisBatch ¶
func NewKinesisBatch(streamName string) *KinesisBatch
func (*KinesisBatch) Add ¶
func (batch *KinesisBatch) Add(message []byte) error
inserts message into batch; if not possible returns an error
func (*KinesisBatch) CanAdd ¶
func (batch *KinesisBatch) CanAdd(message []byte) bool
func (*KinesisBatch) IsEmpty ¶
func (batch *KinesisBatch) IsEmpty() bool
func (*KinesisBatch) IsReady ¶
func (batch *KinesisBatch) IsReady(message []byte) bool
func (*KinesisBatch) Len ¶
func (batch *KinesisBatch) Len() int
type KinesisService ¶
type KinesisService struct {
// contains filtered or unexported fields
}
func NewKinesisService ¶
func NewKinesisService(streamName string, awsRegion string) *KinesisService
func (*KinesisService) CreateBatch ¶
func (svc *KinesisService) CreateBatch() Batch
func (*KinesisService) Send ¶
func (svc *KinesisService) Send(batch Batch) error
type MessageDispatcher ¶
type MessageDispatcher struct { Service Service // contains filtered or unexported fields }
func NewMessageDispatcher ¶
func NewMessageDispatcher(service Service, bufferSize int) *MessageDispatcher
Creates and returns a new dispatcher
func (*MessageDispatcher) Dispatch ¶
func (dispatcher *MessageDispatcher) Dispatch()
func (*MessageDispatcher) Put ¶
func (dispatcher *MessageDispatcher) Put(message []byte) bool
inserts the message into the buffer for dispatching returns true for successful insertion, false if message was dropped will never block; if the queue is full, the message will be dropped
func (*MessageDispatcher) SetBatchFrequency ¶
func (dispatcher *MessageDispatcher) SetBatchFrequency(freq time.Duration)
type MockDispatcher ¶
type MockDispatcher struct {
Messages chan string
}
The following is a mock type for use in tests
func (*MockDispatcher) Dispatch ¶
func (dispatcher *MockDispatcher) Dispatch()
func (*MockDispatcher) Put ¶
func (dispatcher *MockDispatcher) Put(message []byte) bool
Click to show internal directories.
Click to hide internal directories.