Documentation
¶
Index ¶
Constants ¶
const ( IteratorTypeAtSequenceNumber = "AT_SEQUENCE_NUMBER" IteratorTypeAfterSequenceNumber = "AFTER_SEQUENCE_NUMER" IteratorTypeAtTimestamp = "AT_TIMESTAMP" IteratorTypeTrimHorizon = "TRIM_HORIZON" IteratorTypeLatest = "LATEST" )
Define our iterator types, see https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#GetShardIteratorInput for more detail.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // The amount of time in between each GetRecords request. In order to not // exceed your ReadThroughput, you should consider the number of concurrent // consumers you have running. Interval time.Duration // IteratorType is the type of iterator that we want to use to read from the stream. // This denotes our starting position in the stream. IteratorType string // The maximum amount of records we will get on one GetRecords request. // In order to not run into Kinesis limits, you should consider the size // of your records. Limit int64 }
Config sets some properties that affect how we interact with the Kinesis stream.
type HandlerFunc ¶
HandlerFunc is the argument to the listen function, for every batch of records that comes off of the Kinesis stream, we will call the HandlerFunc once.
type LocalStore ¶
type LocalStore struct {
// contains filtered or unexported fields
}
LocalStore implements Store using a local map. This store is not usable if your application is running in multiple containers.
func NewLocalStore ¶
func NewLocalStore() *LocalStore
NewLocalStore will create a pointer to a local store that can keep track of our shard iterators.
func (*LocalStore) GetShardIterator ¶
func (s *LocalStore) GetShardIterator(stream, shard string) (string, error)
GetShardIterator will get the shard iterator that corresponds to the stream-shard combination. We do not require a lock here, because we are simply reading.
func (*LocalStore) UpdateShardIterator ¶
func (s *LocalStore) UpdateShardIterator(stream, shard, iterator string) error
UpdateShardIterator will use the stream-shard combination as the key, and store the iterator that corresponds to it. Updates require a mutex lock so that two goroutines are not trying to update it at the same time.
type Logger ¶
type Logger interface {
Log(keyvals ...interface{}) error
}
Logger is an interface that helps the user to log what is happening inside of this pacakge. This interface is pretty common and is used by https://github.com/go-kit/kit
type Shard ¶
type Shard struct { // The identifier of the shard inside the stream ID string // The sequence number to start at StartAt string }
Shard is a shard on the Kinesis stream
type Store ¶
type Store interface { // GetShardIterator will get the current iterator for the shard. This // tells Amazon where we want to start reading records from. GetShardIterator(stream, shard string) (string, error) // UpdateShardIterator will update the position in the shard so that // on the next tick of our listener, we read records from the latest // position. UpdateShardIterator(stream, shard, iterator string) error }
Store is an interface that defines how we will persist and retrieve the shard iterator. It is important to keep track of the shard iterator so that we know our position in the stream. The implementation of Store must be safe for concurrent use.
type Stream ¶
type Stream struct { // Shards are all the shards that belong to the stream Shards []Shard // Logger is an interface that can be used to debug your stream Logger Logger // Name is the name of the stream Name string // contains filtered or unexported fields }
Stream will keep track of where we are at on the stream for each shard
func NewStream ¶
func NewStream(sess *session.Session, kinesisEndpoint string, stream string, store Store, config Config) (*Stream, error)
NewStream will return a pointer to a stream that you can listen on. Stream is capable of managing multiple shards, printing out log statements, and polling Kinesis at a regular interval.
func (*Stream) Listen ¶
func (s *Stream) Listen(handler HandlerFunc) error
Listen will call the HandlerFunc for each batch of events that come off the Kinesis stream. Listen will poll the Kinesis Stream every interval, and handle any new records. We use the store to keep track of our position in the stream so that we avoid reading recoreds twice, or not progressing in the stream.