Kinesis Pusher
This is intended to be used as a module and following the functions to be called to set it up.
Introduction
The module has runner.Runner
which has client for AWS Kinesis and other details like AWS configuration, Kinesis Stream name, initial channel sizes, etc.
This struct runner.Runner
has few methods that needs to be run as Go routines to start the tickers and ingesting the channels.
Prerequisites
This module designed for AWS Kinesis, and to use this one will need following,
- AWS Account
- Kinesis Stream
- AWS Credentials - either via IAM User, or AWS Role using Instance Profile, etc.
- AWS Credentials should have access to PutRecords and Describe* actions on provided Kinesis Stream
Usage
Create runner.Runner
This creates a new runner.Runner with 1000 as channel buffer size for pushing records and 10 as API Calls of PutRecords.
r := runner.New(1000, 10)
The Runner has some builder pattern options as well
- WithIntervals allows to set intervals, for checking shards and checking records.
- WithAsync enables the async flag to not wait for PutRecords Response, and send the data asynchronously.
- WithDebug enables the debug flag, writes metrics to logger.
AWS Details that we need are Region, Profile (in case of shared profile) and Kinesis Stream name.
// configure the underlying kinesis client
r.Configure(
runner.NewAWSConfig("us-east-1", "default", "test-stream"),
)
(Optionally) In case if there's a need to enable metrics, use ConfigureWithMetrics()
r.ConfigureWithMetrics(
runner.NewAWSConfig("us-east-1", "default", "test-stream"),
)
Refresh the number of open shards at first
Kinesis can have some shards open (for read and write) and some close (just for reading). As rate limiting depends a lot on number of shards to have efficient data push in first cycle we check what is the number of open shards and the limits are applied based on that.
This also returns an error in case if there's a problem with credentials or stream's existence, so if something is wrong here, it doesn't make sense to move forward.
if err := r.CheckShards(); err != nil {
log.Fatal(err)
}
Start the runner
As simple as this a long running go routine will start that will batch the incoming data and push it in batches to Kinesis, using PutRecords API.
// start the runner so that incoming data can be pushed
go r.Run()
Sending data
This struct runner.Runner
has an exported channel RecordChan
which accepts records of type runner.Record
. Sending it is as simple as follows.
r.RecordChan <- runner.Record{
Data: []byte("hello\n"),
PartitionKey: fmt.Sprintf("pk-%d", time.Now().UnixNano()%1000)
}
Metrics (Optional)
Setup
Metrics are important here as there are higher chances of failure and recovery. Setting up Metrics is optional, it can be helpful if there's already a port running and can be added as a route to it.
Enabling metrics option is as simple as configuring Runner via ConfigureWithMetrics().
Once enabled, the handler needs to be registered via a r.Metrics.RegisterHandler(mux) call. In future this will accept something like Basic Auth.
r := runner.New(1000, 10)
r.ConfigureWithMetrics(
runner.NewAWSConfig("us-east-1", "default", "test-stream"),
)
r.Metrics.RegisterHandler(http.DefaultServeMux) // can be used like http.NewServeMux()
go r.Run()
http.ListenAndServe(":8001", nil)
Details
Currently there are two metrics exposed via /metrics endpoint.
- Kinesis Push Delay: The delay in between sending data to r.echan for Kinesis PutRecords Call and the time it finished.
- Kinesis Records Count: The number of records aggregated by
success
, failed
and recovered
. Ideally the difference between failed
and recovered
should be zero, which means all failed records were recovered.
License
MIT