Documentation ¶
Overview ¶
Mock Kinesis Service
Index ¶
- Constants
- Variables
- func GetCheckpointStats(clientName string, db *sql.DB) (stat map[string]int64, err error)
- type ArchiveReader
- type CheckpointService
- type Checkpointer
- type Config
- type DynamoDBService
- type KinesisService
- type Reader
- type S3Service
- type S3Uploader
- type S3UploaderService
- type SequenceNumber
- type SerialReader
- type ShardID
- type ShardStreamReader
- func NewShardStreamReader(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader)
- func NewShardStreamReaderFromSequence(svc KinesisService, streamName string, sid ShardID, sn SequenceNumber) (s *ShardStreamReader)
- func NewShardStreamReaderTrimHorizon(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader)
- type Store
- type StoreArchive
- type StoreArchiveList
- type StreamConfig
- type StreamReader
- func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)
- func NewStreamReaderDefaultLatest(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)
- func NewStreamReaderDefaultTrimHorizon(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)
Constants ¶
const BUFFER_SIZE int = 1024 * 1024
const CREATE_TABLE_STMT = `` /* 237-byte string literal not displayed */
Variables ¶
var MinPollInterval = 1.0 * time.Second
Recommended minimum polling interval to keep from overloading a Kinesis shard.
var RequestLimit int64 = 1000
Functions ¶
Types ¶
type ArchiveReader ¶
type ArchiveReader struct {
// contains filtered or unexported fields
}
An ArchiveReader understands how to translate our archive data store format into indivdual records.
func (*ArchiveReader) ReadRecord ¶
func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error)
type CheckpointService ¶
type Checkpointer ¶
type Checkpointer interface { Checkpoint(ShardID, SequenceNumber) error LastSequenceNumber(ShardID) (SequenceNumber, error) }
func NewCheckpointer ¶
Create a new Checkpointer. May return an error if the database is not usable.
type Config ¶
type Config struct {
Streams map[string]StreamConfig
}
func (*Config) ConfigForName ¶
func (c *Config) ConfigForName(n string) (sc *StreamConfig, err error)
type DynamoDBService ¶
type DynamoDBService interface {
UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error)
}
type KinesisService ¶
type KinesisService interface { DescribeStream(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) GetShardIterator(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) GetRecords(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) }
type S3Service ¶
type S3Service interface { GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error) }
type S3Uploader ¶
type S3Uploader struct {
// contains filtered or unexported fields
}
An Uploader is just a simple wrapper around an S3manager. It just assumes default options, and that we will want to upload from some local file name to a remote file name.
func NewUploader ¶
func NewUploader(c client.ConfigProvider, bucketName string) *S3Uploader
func (*S3Uploader) Upload ¶
func (s *S3Uploader) Upload(fileName, keyName string) (err error)
type S3UploaderService ¶
type S3UploaderService interface {
Upload(input *s3manager.UploadInput) (*s3manager.UploadOutput, error)
}
type SequenceNumber ¶
type SequenceNumber string
type SerialReader ¶
type SerialReader struct {
// contains filtered or unexported fields
}
A SerialReader let's us read from multiple readers, in sequence
func (*SerialReader) ReadRecord ¶
func (sr *SerialReader) ReadRecord() (rec map[string]interface{}, err error)
type ShardID ¶
type ShardID string
Some types to make sure our lists of func args don't get confused
func ListShards ¶
func ListShards(svc KinesisService, streamName string) (shards []ShardID, err error)
func PickShardID ¶
func PickShardID(svc KinesisService, streamName string, shardNum int) (sid ShardID, err error)
Utility function to pick a shard id given an integer shard number. Use this if you want the 2nd shard, but don't know what the id would be.
type ShardStreamReader ¶
type ShardStreamReader struct { StreamName string ShardID ShardID ShardIteratorType string NextIteratorValue *string LastSequenceNumber *SequenceNumber // contains filtered or unexported fields }
A ShardStreamReader provides records from a Kinesis stream. It's specific to a single shard. A Stream is blocking, and will avoid overloading a shard by limiting how often it attempts to consume records.
func NewShardStreamReader ¶
func NewShardStreamReader(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader)
Create a new stream starting at the latest position
This uses the Kinesis LATEST iterator type and assumes the caller only wants new data.
func NewShardStreamReaderFromSequence ¶
func NewShardStreamReaderFromSequence(svc KinesisService, streamName string, sid ShardID, sn SequenceNumber) (s *ShardStreamReader)
Create a new stream given a specific sequence number
This uses the Kinesis AFTER_SEQUENCE_NUMBER interator type, so this assumes the provided sequenceNumber has already been processed, and the caller wants records produced since.
func NewShardStreamReaderTrimHorizon ¶
func NewShardStreamReaderTrimHorizon(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader)
Create a new stream starting at the oldest position
This uses the Kinesis TRIM_HORIZON iterator type and assumes the caller only wants all availible data.
func (*ShardStreamReader) Get ¶
func (s *ShardStreamReader) Get() (r *kinesis.Record, err error)
Get the next record from the Shard Stream
If records are already loaded, this returns the next record quickly.
If not, it may block fetching them from the underlying API. In the event the API doesn't have any records prepared either, this method will return a nil record. This allows the caller to do other things rather than just blocking in this call forever or needing to pass in other flow control signals.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
A store manages buffering records together into files, and uploading them somewhere.
func NewStore ¶
func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store)
type StoreArchive ¶
type StoreArchive struct { StreamName string Bucket string Key string ClientName string T time.Time SortValue int // contains filtered or unexported fields }
A StoreArchive represents an instance of a data file stored, usually, in S3.
func NewStoreArchive ¶
func NewStoreArchive(bucketName, keyName string, svc S3Service) (sa StoreArchive, err error)
func (*StoreArchive) ReadRecord ¶
func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error)
type StoreArchiveList ¶
type StoreArchiveList []StoreArchive
Sortable list of store archives.
Though archives, when they come out of S3 are lexigraphically sorted, we want to just be sure that we're really handling our dates and times correctly.
func (StoreArchiveList) Len ¶
func (l StoreArchiveList) Len() int
func (StoreArchiveList) Less ¶
func (l StoreArchiveList) Less(i, j int) bool
func (StoreArchiveList) Swap ¶
func (l StoreArchiveList) Swap(i, j int)
type StreamConfig ¶
type StreamReader ¶
A StreamReader is a higher-level interface for reading data from a live Triton stream.
By implementing a Reader interface, we can delivery processed triton data to the client. In addition, we provide checkpointing service.
func NewStreamReader ¶
func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)
func NewStreamReaderDefaultLatest ¶
func NewStreamReaderDefaultLatest(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)
func NewStreamReaderDefaultTrimHorizon ¶
func NewStreamReaderDefaultTrimHorizon(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error)