frinesis

An AWS Kinesis implementation of a Frizzle Sink.
In addition to the AWS Kinesis SDK for Go, Frinesis uses a modified version of
sendgridlabs/go-kinesis/batchproducer (under separate MIT license).
Frizzle is a magic message (Msg
) bus designed for parallel processing w many goroutines.
Receive()
messages from a configured Source
- Do your processing, possibly
Send()
each Msg
on to one or more Sink
destinations
Ack()
(or Fail()
) the Msg
to notify the Source
that processing completed
Prereqs / Build instructions
Go mod
As of Go 1.11, frinesis uses go mod for dependency management.
Running the tests
Frinesis has integration tests which require a kinesis endpoint to test against. KINESIS_ENDPOINT
environment variable is
used by tests. We test with a localstack instance (docker-compose.yml
provided) but other
tools like kinesalite
could also work.
$ docker-compose up -d
# takes a few seconds to initialize; can use a tool like wait-for-it.sh in scripting
$ export KINESIS_ENDPOINT=localhost:4568
$ go test -v --cover ./...
Configuration
Frinesis Sinks are configured using Viper.
func InitSink(config *viper.Viper) (*Sink, error)
InitSinkWithLogger(config *viper.Viper, logger *zap.Logger)
We typically initialize Viper through environment variables (but client can do whatever it wants,
just needs to provide the configured Viper object with relevant values). The application might
use a prefix before the below values.
Variable |
Required |
Description |
Default |
AWS_REGION_NAME |
required |
region being used e.g. us-east-1 |
|
KINESIS_ENDPOINT |
optional |
if using a custom endpoint e.g. for local testing. Defaults to AWS standard internal and retrieving credentials from IAM if not set. http:// prefixed if no scheme set |
|
KINESIS_FLUSH_TIMEOUT |
sink (optional) |
how long to wait for Kinesis Sink to flush remaining messages on close (use duration) |
30s |
Async Error Handling
Since records are sent in batch fashion, Kinesis may report errors asynchronously.
Errors can be recovered via channel returned by the Sink.Events()
method.
In addition to the String()
method required by frizzle, currently only errors are
returned by frinesis (no other event types) so all Events recovered will also conform
to error
interface.
Contributing
Contributions welcome! Take a look at open issues.