Documentation
¶
Overview ¶
Package producer is a generated protocol buffer package.
It is generated from these files:
messages.proto
It has these top-level messages:
AggregatedRecord Tag Record
Amazon kinesis producer A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.
Note: this project start as a fork of `tj/go-kinesis`. if you are not intersting in the KPL aggregation logic, you probably want to check it out.
Index ¶
- Variables
- type AggregatedRecord
- func (*AggregatedRecord) Descriptor() ([]byte, []int)
- func (m *AggregatedRecord) GetExplicitHashKeyTable() []string
- func (m *AggregatedRecord) GetPartitionKeyTable() []string
- func (m *AggregatedRecord) GetRecords() []*Record
- func (*AggregatedRecord) ProtoMessage()
- func (m *AggregatedRecord) Reset()
- func (m *AggregatedRecord) String() string
- type Aggregator
- type Config
- type FailureRecord
- type Instance
- func (p *Instance) NotifyFailures() <-chan *FailureRecord
- func (p *Instance) Put(data []byte, partitionKey string) error
- func (p *Instance) SetBatchCount(c int)
- func (p *Instance) SetBatchSize(s int)
- func (p *Instance) SetFlushInterval(t time.Duration)
- func (p *Instance) SetLogger(l Logger)
- func (p *Instance) Start()
- func (p *Instance) Stop()
- type LogValue
- type Logger
- type Producer
- type Putter
- type Record
- type StdLogger
- type Tag
Constants ¶
This section is empty.
Variables ¶
var ( ErrStoppedProducer = errors.New("Unable to Put record. Producer is already stopped") ErrIllegalPartitionKey = errors.New("Invalid parition key. Length must be at least 1 and at most 256") ErrRecordSizeExceeded = errors.New("Data must be less than or equal to 1MB in size") )
Errors
Functions ¶
This section is empty.
Types ¶
type AggregatedRecord ¶
type AggregatedRecord struct { PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table" json:"partition_key_table,omitempty"` ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table" json:"explicit_hash_key_table,omitempty"` Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*AggregatedRecord) Descriptor ¶
func (*AggregatedRecord) Descriptor() ([]byte, []int)
func (*AggregatedRecord) GetExplicitHashKeyTable ¶
func (m *AggregatedRecord) GetExplicitHashKeyTable() []string
func (*AggregatedRecord) GetPartitionKeyTable ¶
func (m *AggregatedRecord) GetPartitionKeyTable() []string
func (*AggregatedRecord) GetRecords ¶
func (m *AggregatedRecord) GetRecords() []*Record
func (*AggregatedRecord) ProtoMessage ¶
func (*AggregatedRecord) ProtoMessage()
func (*AggregatedRecord) Reset ¶
func (m *AggregatedRecord) Reset()
func (*AggregatedRecord) String ¶
func (m *AggregatedRecord) String() string
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
func (*Aggregator) Count ¶
func (a *Aggregator) Count() int
Count return how many records stored in the aggregator.
func (*Aggregator) Drain ¶
func (a *Aggregator) Drain() (*k.PutRecordsRequestEntry, error)
Drain create an aggregated `kinesis.PutRecordsRequestEntry` that compatible with the KCL's deaggregation logic.
If you interested to know more about it. see: aggregation-format.md
func (*Aggregator) Put ¶
func (a *Aggregator) Put(data []byte, partitionKey string)
Put record using `data` and `partitionKey`. This method is thread-safe.
func (*Aggregator) Size ¶
func (a *Aggregator) Size() int
Size return how many bytes stored in the aggregator. including partition keys.
type Config ¶
type Config struct { // StreamName is the Kinesis stream. StreamName string // FlushInterval is a regular interval for flushing the buffer. Defaults to 5s. FlushInterval time.Duration // BatchCount determine the maximum number of items to pack in batch. // Must not exceed length. Defaults to 500. BatchCount int // BatchSize determine the maximum number of bytes to send with a PutRecords request. // Must not exceed 5MiB; Default to 5MiB. BatchSize int // AggregateBatchCount determine the maximum number of items to pack into an aggregated record. AggregateBatchCount int // AggregationBatchSize determine the maximum number of bytes to pack into an aggregated record. User records larger // than this will bypass aggregation. AggregateBatchSize int // BacklogCount determines the channel capacity before Put() will begin blocking. Default to `BatchCount`. BacklogCount int // Number of requests to sent concurrently. Default to 24. MaxConnections int // Logger is the logger used. Default to producer.Logger. Logger Logger // Enabling verbose logging. Default to false. Verbose bool // Client is the Putter interface implementation. Client Putter }
Config is the Producer configuration.
type FailureRecord ¶
type FailureRecord struct { Data []byte PartitionKey string // contains filtered or unexported fields }
Failure record type
type Instance ¶ added in v0.2.5
Instance batches records.
func (*Instance) NotifyFailures ¶ added in v0.2.5
func (p *Instance) NotifyFailures() <-chan *FailureRecord
NotifyFailures registers and return listener to handle undeliverable messages. The incoming struct has a copy of the Data and the PartitionKey along with some error information about why the publishing failed.
func (*Instance) Put ¶ added in v0.2.5
Put `data` using `partitionKey` asynchronously. This method is thread-safe.
Under the covers, the Producer will automatically re-attempt puts in case of transient errors. When unrecoverable error has detected(e.g: trying to put to in a stream that doesn't exist), the message will returned by the Producer. Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.
func (*Instance) SetBatchCount ¶ added in v0.2.5
func (*Instance) SetBatchSize ¶ added in v0.2.5
func (*Instance) SetFlushInterval ¶ added in v0.2.5
type LogValue ¶
type LogValue struct { Name string Value interface{} }
LogValue represents a key:value pair used by the Logger interface
type Logger ¶
type Logger interface { Info(msg string, values ...LogValue) Error(msg string, err error, values ...LogValue) }
Logger represents a simple interface used by kinesis-producer to handle logging
type Putter ¶
type Putter interface {
PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error)
}
Putter is the interface that wraps the KinesisAPI.PutRecords method.
type Record ¶
type Record struct { PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index" json:"partition_key_index,omitempty"` ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index" json:"explicit_hash_key_index,omitempty"` Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"` Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*Record) Descriptor ¶
func (*Record) GetExplicitHashKeyIndex ¶
func (*Record) GetPartitionKeyIndex ¶
func (*Record) ProtoMessage ¶
func (*Record) ProtoMessage()
type StdLogger ¶
StdLogger implements the Logger interface using standard library loggers
type Tag ¶
type Tag struct { Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*Tag) Descriptor ¶
func (*Tag) ProtoMessage ¶
func (*Tag) ProtoMessage()