Documentation ¶
Overview ¶
Package producer is a generated protocol buffer package.
It is generated from these files:
messages.proto
It has these top-level messages:
AggregatedRecord Tag Record
Package producer is a KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.
Note: This project started as a fork of `tj/go-kinesis`. If you are not interested in the KPL aggregation logic, you will probably want to check it out.
Index ¶
- Variables
- type AggregatedRecord
- func (*AggregatedRecord) Descriptor() ([]byte, []int)
- func (m *AggregatedRecord) GetExplicitHashKeyTable() []string
- func (m *AggregatedRecord) GetPartitionKeyTable() []string
- func (m *AggregatedRecord) GetRecords() []*Record
- func (*AggregatedRecord) ProtoMessage()
- func (m *AggregatedRecord) Reset()
- func (m *AggregatedRecord) String() string
- type Aggregator
- type Config
- type FailureRecord
- type Producer
- type Putter
- type Record
- type Tag
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrStoppedProducer = errors.New("unable to Put record, producer is already stopped") ErrIllegalPartitionKey = errors.New("invalid partition key, length must be at least 1 and at most 256") ErrRecordSizeExceeded = errors.New("data must be less than or equal to 1MB in size") )
Errors
Functions ¶
This section is empty.
Types ¶
type AggregatedRecord ¶
type AggregatedRecord struct { PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table" json:"partition_key_table,omitempty"` ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table" json:"explicit_hash_key_table,omitempty"` Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*AggregatedRecord) Descriptor ¶
func (*AggregatedRecord) Descriptor() ([]byte, []int)
func (*AggregatedRecord) GetExplicitHashKeyTable ¶
func (m *AggregatedRecord) GetExplicitHashKeyTable() []string
func (*AggregatedRecord) GetPartitionKeyTable ¶
func (m *AggregatedRecord) GetPartitionKeyTable() []string
func (*AggregatedRecord) GetRecords ¶
func (m *AggregatedRecord) GetRecords() []*Record
func (*AggregatedRecord) ProtoMessage ¶
func (*AggregatedRecord) ProtoMessage()
func (*AggregatedRecord) Reset ¶
func (m *AggregatedRecord) Reset()
func (*AggregatedRecord) String ¶
func (m *AggregatedRecord) String() string
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Aggregator used to aggregate records into kinesis.PutRecordsRequestEntry
func (*Aggregator) Count ¶
func (a *Aggregator) Count() int
Count return how many records stored in the aggregator.
func (*Aggregator) Drain ¶
func (a *Aggregator) Drain() (*k.PutRecordsRequestEntry, error)
Drain create an aggregated `kinesis.PutRecordsRequestEntry` that compatible with the KCL's deaggregation logic.
If you interested to know more about it. see: aggregation-format.md
func (*Aggregator) Put ¶
func (a *Aggregator) Put(data []byte, partitionKey string)
Put record using `data` and `partitionKey`. This method is not thread-safe.
func (*Aggregator) Size ¶
func (a *Aggregator) Size() int
Size return how many bytes stored in the aggregator. including partition keys.
type Config ¶
type Config struct { // StreamName is the Kinesis stream. StreamName string // FlushInterval is a regular interval for flushing the buffer. Defaults to 5s. FlushInterval time.Duration // BatchCount determine the maximum number of items to pack in batch. // Must not exceed length. Defaults to 500. BatchCount int // BatchSize determine the maximum number of bytes to send with a PutRecords request. // Must not exceed 5MiB; Default to 5MiB. BatchSize int // AggregateBatchCount determine the maximum number of items to pack into an aggregated record. AggregateBatchCount int // AggregationBatchSize determine the maximum number of bytes to pack into an aggregated record. AggregateBatchSize int // BacklogCount determines the channel capacity before Put() will begin blocking. Default to `BatchCount`. BacklogCount int // Number of requests to sent concurrently. Default to 24. MaxConnections int // Logger is the logger used. Default to zap.L(). Logger *zap.Logger // Enabling verbose logging. Default to false. Verbose bool // Client is the Putter interface implementation. Client Putter }
Config is the Producer configuration.
type FailureRecord ¶
type FailureRecord struct { Data []byte PartitionKey string // contains filtered or unexported fields }
FailureRecord type
type Producer ¶
Producer batches records.
Example ¶
log := zap.L() s, err := session.NewSession(aws.NewConfig()) if err != nil { log.Fatal(err.Error()) } client := kinesis.New(s) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, Client: client, Logger: log, }) pr.Start() // Handle failures go func() { for r := range pr.NotifyFailures() { // r contains `Data`, `PartitionKey` and `Error()` log.Error(r.Error()) } }() go func() { for i := 0; i < 5000; i++ { err := pr.Put([]byte("foo"), "bar") if err != nil { log.With(zap.Error(err)).Fatal("error producing") } } }() time.Sleep(3 * time.Second) pr.Stop()
Output:
func (*Producer) NotifyFailures ¶
func (p *Producer) NotifyFailures() <-chan *FailureRecord
NotifyFailures registers and return listener to handle undeliverable messages. The incoming struct has a copy of the Data and the PartitionKey along with some error information about why the publishing failed.
func (*Producer) Put ¶
Put `data` using `partitionKey` asynchronously. This method is thread-safe.
Under the covers, the Producer will automatically re-attempt puts in case of transient errors. When unrecoverable error has detected(e.g: trying to put to in a stream that doesn't exist), the message will returned by the Producer. Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.
type Putter ¶
type Putter interface {
PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error)
}
Putter is the interface that wraps the KinesisAPI.PutRecords method.
type Record ¶
type Record struct { PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index" json:"partition_key_index,omitempty"` ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index" json:"explicit_hash_key_index,omitempty"` Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"` Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*Record) Descriptor ¶
func (*Record) GetExplicitHashKeyIndex ¶
func (*Record) GetPartitionKeyIndex ¶
func (*Record) ProtoMessage ¶
func (*Record) ProtoMessage()
type Tag ¶
type Tag struct { Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` XXX_unrecognized []byte `json:"-"` }
func (*Tag) Descriptor ¶
func (*Tag) ProtoMessage ¶
func (*Tag) ProtoMessage()