kinetic

package module
v0.0.0-...-13939be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: MIT Imports: 21 Imported by: 1

README

GoDoc Circle CI

kinetic

Kinetic is an MIT-licensed high-performance AWS Kinesis Client for Go

Kinetic wraps sendgridlabs go-kinesis library to provide maximum throughput for AWS Kinesis producers and consumers. An instance of a Kinetic listener/producer is meant to be used for each shard, so please use it accordingly. If you use more than one instance per-shard then you will hit the AWS Kinesis throughput limits.

Getting Started

Before using kinetic, you should make sure you have a created a Kinesis stream and your configuration file has the credentails necessary to read and write to the stream. Once this stream exists in AWS, kinetic will ensure it is in the "ACTIVE" state before running.

Testing

Tests are written using goconvey and kinesalite. Make sure you have kinesalite running locally before attempting to run the tests. They can be run either via the comamnd line:

$ go test -v -cover -race

or via web interface:

$ goconvey

Running

Kinetic can be used to interface with kinesis like so:

import "github.com/rewardStyle/kinetic"

// Use configuration in /etc/kinetic.conf
listener, _ := new(kinetic.Listener).Init()

// Use custom configuration
producer, _ := new(kinetic.Producer).InitC("your-stream", "0", "shard-type", "accesskey", "secretkey", "region", 10)

producer.Send(new(kinetic.Message).Init([]byte(`{"foo":"bar"}`), "test"))

// Using Retrieve
msg, err := listener.Retrieve()
if err != nil {
    println(err)
}

println(string(msg))

// Using Listen - will block unless sent in goroutine
go listener.Listen(func(msg []byte, wg *sync.WaitGroup) {
    println(string(msg))
    wg.Done()
})

producer.Send(new(KinesisMessage).Init([]byte(`{"foo":"bar"}`), "test"))

listener.Close()
producer.Close()

// Or with Kinesis Firehose
firehose, err := new(kinetic.Producer).Firehose()
if err != nil {
    println(err)
}

firehose.Send(new(KinesisMessage).Init([]byte(`{"foo":"bar"}`), "test"))

firehose.Close()

For more examples take a look at the tests. API documentation can be found here.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// HTTPTimeout is the timeout for http for requests.
	HTTPTimeout = 60 * time.Second
	// DialTimeout is the timeout for the tpc dial.
	DialTimeout = 5 * time.Second
	// HandShakeTimeout is the timeout for the tpc handshake.
	HandShakeTimeout = 5 * time.Second
)
View Source
var (
	// ErrNullStream represents an error where the stream was not specified
	ErrNullStream = errors.New("A stream must be specified")
	// ErrNotActive represents an error where the stream is not ready for processing
	ErrNotActive = errors.New("The Stream is not yet active")
)
View Source
var (
	// ErrThroughputExceeded represents an error when the Kinesis throughput has been exceeded
	ErrThroughputExceeded = errors.New("Configured AWS Kinesis throughput has been exceeded")
	// ErrKinesisFailure represents a generic internal AWS Kinesis error
	ErrKinesisFailure = errors.New("AWS Kinesis internal failure")
	// ErrBadConcurrency represents an error when the provided concurrency value is invalid
	ErrBadConcurrency = errors.New("Concurrency must be greater than zero")
	// ErrDroppedMessage represents an error the channel is full and messages are being dropped
	ErrDroppedMessage = errors.New("Channel is full, dropped message")
)
View Source
var ErrMetaAuthentication = errors.New("Authentication error: failed to auth from meta.  Your IAM roles are bad, or you need to specify an AccessKey and SecretKey")

ErrMetaAuthentication represents an error that occurred on authentication from meta

View Source
var ErrNilShardIterator = errors.New("Nil shard iterator")

ErrNilShardIterator is an error for when we get back a nil shard iterator

View Source
var ErrNilShardStatus = errors.New("Nil stream status")

ErrNilShardStatus is an error for when we get back a nil stream status

View Source
var (

	// ShardIterTypes are the types of iterators to use within Kinesis
	ShardIterTypes shardIteratorTypes = map[int]string{
		// contains filtered or unexported fields
	}
)

Functions

This section is empty.

Types

type Empty

type Empty struct{}

Empty is an empty struct. It is mostly used for counting semaphore purposes

type Firehose

type Firehose struct {
	// contains filtered or unexported fields
}

Firehose is a Producer

func (*Firehose) Close

func (p *Firehose) Close() error

Close stops queuing and producing and waits for all tasks to finish

func (*Firehose) CloseSync

func (p *Firehose) CloseSync() error

CloseSync closes the Firehose producer in a syncronous manner.

func (*Firehose) Errors

func (p *Firehose) Errors() <-chan error

Errors gets the current number of errors on the Producer

func (*Firehose) Init

func (p *Firehose) Init() (Producer, error)

Init initalizes a firehose producer with the config file defaults

func (*Firehose) InitC

func (p *Firehose) InitC(stream, _, _, accessKey, secretKey, region string, concurrency int) (Producer, error)

InitC initializes a producer for Kinesis Firehose with the specified params

func (*Firehose) InitCWithEndpoint

func (p *Firehose) InitCWithEndpoint(stream, _, _, accessKey, secretKey, region string, concurrency int, endpoint string) (Producer, error)

InitCWithEndpoint initializes a producer for Kinesis Firehose with the specified params

func (*Firehose) IsProducing

func (p *Firehose) IsProducing() (isProducing bool)

IsProducing returns true if Firehose is producing otherwise false

func (*Firehose) Messages

func (p *Firehose) Messages() <-chan *Message

Messages gets the current message channel from the producer

func (*Firehose) NewEndpoint

func (p *Firehose) NewEndpoint(endpoint, stream string) (err error)

NewEndpoint switches the endpoint of the firehose stream. This is useful for testing.

func (*Firehose) ReInit

func (p *Firehose) ReInit()

ReInit re-initializes the shard iterator. Used with conjucntion with NewEndpoint

func (*Firehose) Send

func (p *Firehose) Send(msg *Message)

Send a message to Firehose asyncronously

func (*Firehose) SendSync

func (p *Firehose) SendSync(msg *Message)

SendSync - blocking send. TODO: we might want a version of this that can be fed a timeout value and listen for cancellation channels. This version is for simplicity.

func (*Firehose) TryToSend

func (p *Firehose) TryToSend(msg *Message) error

TryToSend tries to send the message, but if the channel is full it drops the message, and returns an error.

type KinesisProducer

type KinesisProducer struct {
	// contains filtered or unexported fields
}

KinesisProducer keeps a queue of messages on a channel and continually attempts to POST the records using the PutRecords method. If the messages were not sent successfully they are placed back on the queue to retry

func (*KinesisProducer) Close

func (p *KinesisProducer) Close() error

Close stops queuing and producing and waits for all tasks to finish

func (*KinesisProducer) CloseSync

func (p *KinesisProducer) CloseSync() error

CloseSync closes the Producer in a syncronous manner.

func (*KinesisProducer) Errors

func (p *KinesisProducer) Errors() <-chan error

Errors gets the current number of errors on the Producer

func (*KinesisProducer) Init

func (p *KinesisProducer) Init() (Producer, error)

Init initializes a producer with the params specified in the configuration file

func (*KinesisProducer) InitC

func (p *KinesisProducer) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (Producer, error)

InitC initializes a producer with the specified configuration: stream, shard, shard-iter-type, access-key, secret-key, and region

func (*KinesisProducer) InitCWithEndpoint

func (p *KinesisProducer) InitCWithEndpoint(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int, endpoint string) (Producer, error)

InitCWithEndpoint initializes a producer with the specified configuration: stream, shard, shard-iter-type, access-key, secret-key, and region

func (*KinesisProducer) IsProducing

func (p *KinesisProducer) IsProducing() bool

IsProducing identifies whether or not the messages are queued for POSTing to the stream

func (*KinesisProducer) Messages

func (p *KinesisProducer) Messages() <-chan *Message

Messages gets the current message channel from the producer

func (*KinesisProducer) NewEndpoint

func (p *KinesisProducer) NewEndpoint(endpoint, stream string) (err error)

NewEndpoint re-initializes kinesis client with new endpoint. Used for testing with kinesalite

func (*KinesisProducer) ReInit

func (p *KinesisProducer) ReInit()

ReInit re-initializes the shard iterator. Used with conjucntion with NewEndpoint

func (*KinesisProducer) Send

func (p *KinesisProducer) Send(msg *Message)

Send a message to the queue for POSTing

func (*KinesisProducer) SendSync

func (p *KinesisProducer) SendSync(msg *Message)

SendSync - blocking send. TODO: we might want a version of this that can be fed a timeout value and listen for cancellation channels. This version is for simplicity.

func (*KinesisProducer) TryToSend

func (p *KinesisProducer) TryToSend(msg *Message) error

TryToSend tries to send the message, but if the channel is full it drops the message, and returns an error.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener represents a kinesis listener

func (*Listener) Close

func (l *Listener) Close() error

Close stops consuming and listening and waits for all tasks to finish

func (*Listener) CloseSync

func (l *Listener) CloseSync() error

CloseSync closes the Listener in a syncronous manner.

func (*Listener) Errors

func (l *Listener) Errors() <-chan error

Errors gets the current number of errors on the Listener

func (*Listener) GetClient

func (l *Listener) GetClient() *kinesisiface.KinesisAPI

GetClient returns the client

func (*Listener) Init

func (l *Listener) Init() (*Listener, error)

Init initializes a listener with the params specified in the configuration file

func (*Listener) InitC

func (l *Listener) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (*Listener, error)

InitC initialize a listener with the supplied params

func (*Listener) InitCWithEndpoint

func (l *Listener) InitCWithEndpoint(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int, endpoint string) (*Listener, error)

InitCWithEndpoint initialize a listener with the supplied params

func (*Listener) InitWithStartTime

func (l *Listener) InitWithStartTime(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int, startTime *time.Time) (*Listener, error)

InitWithStartTime initializes a listener with start time

func (*Listener) IsConsuming

func (l *Listener) IsConsuming() bool

IsConsuming identifies whether or not the kinesis stream is being polled

func (*Listener) IsListening

func (l *Listener) IsListening() bool

IsListening identifies whether or not messages and errors are being handled after consumption

func (*Listener) Listen

func (l *Listener) Listen(fn msgFn)

Listen handles the consumed messages, errors and interrupts

func (*Listener) NewEndpoint

func (l *Listener) NewEndpoint(endpoint, stream string) (err error)

NewEndpoint re-initializes kinesis client with new endpoint. Used for testing with kinesalite

func (*Listener) ReInit

func (l *Listener) ReInit()

ReInit re-initializes the shard iterator. Used with conjucntion with NewEndpoint

func (*Listener) Retrieve

func (l *Listener) Retrieve() (*Message, error)

Retrieve a message from the stream and return the value

func (*Listener) RetrieveFn

func (l *Listener) RetrieveFn(fn msgFn)

RetrieveFn retrieves a message from the queue and apply the supplied function to the message

type Message

type Message struct {
	awsKinesis.Record
}

Message represents an item on the Kinesis stream

func (*Message) Init

func (k *Message) Init(msg []byte, key string) *Message

Init initializes a Message. Currently we are ignoring sequenceNumber.

func (*Message) Key

func (k *Message) Key() []byte

Key gets the partion key of the message

func (*Message) SetValue

func (k *Message) SetValue(value []byte)

SetValue sets the underlying value of the underlying record

func (*Message) Value

func (k *Message) Value() []byte

Value gets the underlying value of the underlying record

type Producer

type Producer interface {
	Init() (Producer, error)
	InitC(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (Producer, error)
	NewEndpoint(endpoint, stream string) (err error)
	ReInit()
	IsProducing() bool
	Send(msg *Message)
	SendSync(msg *Message)
	TryToSend(msg *Message) error
	Close() error
	CloseSync() error
}

Producer is an interface for sending messages to a stream of some sort.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL