Documentation ¶
Overview ¶
Package triton provides an opinionated interface with Kinesis
Index ¶
- Constants
- func GetCheckpointStats(clientName string, db *sql.DB) (stat map[string]int64, err error)
- func MarshalRecord(r Record) ([]byte, error)
- type BatchWriter
- type Checkpointer
- type Config
- type KinesisService
- type Reader
- type Record
- type S3Service
- type Store
- type Store2
- type Stream
- type StreamConfig
- type Writer
Constants ¶
const BUFFER_SIZE int = 1024 * 1024
const CREATE_TABLE_STMT = `` /* 237-byte string literal not displayed */
const MaxBatchSize = 500
MaxBatchSize is the limit Kinesis has on a PutRecords call
Variables ¶
This section is empty.
Functions ¶
func GetCheckpointStats ¶
func MarshalRecord ¶
Types ¶
type BatchWriter ¶
type BatchWriter struct {
// contains filtered or unexported fields
}
BatchWriter implements an asyncronous writer that writes records in batches. A batch is written when either the buffer size is exceeded or the time interval since the last write has been exceeded.
Write errors are written to the Errors() channel. It is highly recommended that you actively monitor this channel because the writer will not stop after an error.
func NewBatchWriter ¶
func NewBatchWriter(w Writer) *BatchWriter
NewBatchWriter creates a batch version of an existing Writer using default values for size and interval.
func NewBatchWriterSize ¶
func NewBatchWriterSize(w Writer, size int, intr time.Duration) *BatchWriter
NewBatchWriterSize creates a batch writer using the given size and interval.
func (*BatchWriter) Close ¶
func (bw *BatchWriter) Close()
Close prevents future writes and flushes all currently buffered records. If there is an error it will have been written to the Errors chan.
func (*BatchWriter) Errors ¶
func (bw *BatchWriter) Errors() <-chan error
Errors returns the channel that errors will be returned on. It is highly recommended that you monitor this channel.
func (*BatchWriter) Flush ¶
func (bw *BatchWriter) Flush()
Flush forces all buffered records to be sent. If there is an error it will have been written to the Errors chan.
func (*BatchWriter) WriteRecords ¶
func (bw *BatchWriter) WriteRecords(rs ...Record) error
WriteRecords performs an asyncronous write to Kinesis.
The returned error will always be nil in the current implementation. It is recommended you read errors from Errors().
type Checkpointer ¶
type Checkpointer interface { Checkpoint(string, string) error LastSequenceNumber(string) (string, error) }
func NewCheckpointer ¶
Create a new Checkpointer. May return an error if the database is not usable.
type Config ¶
type Config struct {
Streams map[string]StreamConfig
}
func (*Config) ConfigForName ¶
func (c *Config) ConfigForName(n string) (sc *StreamConfig, err error)
type KinesisService ¶
type KinesisService interface { DescribeStream(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) GetShardIterator(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) GetRecords(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) }
type Reader ¶
type Reader interface { // Read reads up to len(r) bytes into r. It returns the number of records // read (0 <= n <= len(r)), the offset after the read, and any error // encountered. Read(r []Record) (n int, off string, err error) }
Reader is an interface for a basic read
type S3Service ¶
type S3Service interface { ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (n int64, err error) Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) }
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
A store manages buffering records together into files, and uploading them somewhere.
type Store2 ¶
type Store2 struct {
// contains filtered or unexported fields
}
func NewStoreService ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
func NewStream ¶
NewStream returns a stream configured for all shards using env variables to determine the Kinesis Service.
func NewStreamShardsService ¶
func NewStreamShardsService(svc KinesisService, region, name string, shards []string) (*Stream, error)
NewStreamShardsService returns a Stream configured to the specified shards using the specificed Kinesis service.
If no shards are given, the stream will be configured to return from all shards for a given stream.