README
¶
Kinesis Package
The inskinesis
package is a Go library designed to facilitate streaming data to Amazon Kinesis streams. This README
provides an overview of the package's functionality and usage.
Table of Contents
Introduction
The inskinesis
package is designed to make it easier to stream data to Amazon Kinesis streams in your Go applications.
It provides a simple interface for sending records to a Kinesis stream while handling batching, partitioning, and
retries. This can be especially useful for applications that generate a high volume of data and need to send it to
Kinesis efficiently.
Installation
To use the inskinesis
package in your Go project, you can install it using Go modules. Run the following command in
your project directory:
go get github.com/useinsider/go-pkg/inskinesis
Getting Started
Here's a quick guide on how to get started with the inskinesis
package:
-
Import the package in your Go code:
import "github.com/useinsider/go-pkg/inskinesis"
-
Create a configuration for your Kinesis stream:
config := inskinesis.Config{ Region: "your-aws-region", StreamName: "your-kinesis-stream-name", Partitioner: nil, // Optionally provide a partitioner function MaxStreamBatchSize: 100, // Maximum size of each batch of records MaxStreamBatchByteSize: 1024 * 1024, // Maximum size in bytes for each batch MaxBatchSize: 500, // Maximum size of the log buffer MaxGroup: 10, // Maximum number of concurrent groups for sending records }
Field | Default Value | Description |
---|---|---|
Region | N/A | The AWS region where the Kinesis stream is located. Required |
StreamName | N/A | The name of the Kinesis stream. Required |
Partitioner | UUID | An optional partitioner function used to determine the partition key for records. If not provided, a default UUID-based partitioner is used. |
MaxStreamBatchSize | 100 | The maximum size of each batch of records to be sent to the stream. |
MaxStreamBatchByteSize | 256 KB (2^18 byte) | The maximum size (in bytes) of each batch of records. |
MaxBatchSize | 500 | The maximum size of the log buffer for accumulating log records before batching. |
MaxGroup | 1 | The maximum number of concurrent groups for sending records. If you want to send records concurrently, set this value to a number greater than 1. |
RetryCount | 3 | The number of times to retry sending a batch of records to the stream. |
RetryInterval | 100 ms | The interval between retries. |
Verbose | false | Whether to enable verbose logging. |
Please note that N/A
in the Default Value column indicates that these fields are required and do not have default
values.
-
Create a Kinesis stream instance:
stream, err := inskinesis.NewKinesis(config) if err != nil { // Handle the error }
-
Send records to the Kinesis stream:
// Send a single record stream.Put(yourRecord) // Send multiple records records := []interface{}{record1, record2, record3} for _, record := range records { stream.Put(record) }
-
To ensure all records are sent and clean up resources, flush and stop streaming:
stream.FlushAndStopStreaming()
Package Structure
The inskinesis
package is organized as follows:
inskinesis
package: The main package containing theStreamInterface
,stream
, and related functionality for streaming records to Kinesis.PartitionerFunction
: A customizable partitioning function for determining the partition key of records.- Various error handling and logging functionality.
Usage
The package provides a simple interface for streaming records to a Kinesis stream. You can customize the configuration based on your needs, including the region, stream name, batch sizes, and partitioning function.
Here's an example of how to use the package:
import "github.com/useinsider/go-pkg/inskinesis"
config := inskinesis.Config{
Region: "your-aws-region",
StreamName: "your-kinesis-stream-name",
Partitioner: nil, // Optionally provide a partitioner function
MaxStreamBatchSize: 100, // Maximum size of each batch of records
MaxStreamBatchByteSize: 1024 * 1024, // Maximum size in bytes for each batch
MaxBatchSize: 500, // Maximum size of the log buffer
MaxGroup: 10, // Maximum number of concurrent groups for sending records
}
stream, err := inskinesis.NewKinesis(config)
if err != nil {
// Handle the error
}
// Send records to the stream
stream.Put(yourRecord)
// To ensure all records are sent and clean up resources, flush and stop streaming
stream.FlushAndStopStreaming()
Error Handling
The inskinesis
package provides error channels for receiving errors during streaming. You can use these channels to
handle errors in your application gracefully. It's important to monitor the error channels to ensure the robustness of
your data streaming process.
Here's an example of how to use the error channels:
go func () {
for {
select {
case err := <-stream.Error():
sentry.Error(err)
}
}
}()
Contributing
If you would like to contribute to the inskinesis
package, please follow standard Go community guidelines for
contributions. You can create issues, submit pull requests, and help improve the package for everyone.
Documentation
¶
Index ¶
- Variables
- func TakeSliceArg(arg interface{}) (out []interface{}, ok bool)
- type Config
- type CustomRetryer
- type FakeStream
- type KinesisInterface
- type MockKinesisInterface
- type MockKinesisInterfaceMockRecorder
- type MockStreamInterface
- type MockStreamInterfaceMockRecorder
- type MockSubscribeToShardEventStreamEvent
- func (m *MockSubscribeToShardEventStreamEvent) EXPECT() *MockSubscribeToShardEventStreamEventMockRecorder
- func (m *MockSubscribeToShardEventStreamEvent) MarshalEvent(arg0 protocol.PayloadMarshaler) (eventstream.Message, error)
- func (m *MockSubscribeToShardEventStreamEvent) UnmarshalEvent(arg0 protocol.PayloadUnmarshaler, arg1 eventstream.Message) error
- type MockSubscribeToShardEventStreamEventMockRecorder
- type MockSubscribeToShardEventStreamReader
- func (m *MockSubscribeToShardEventStreamReader) Close() error
- func (m *MockSubscribeToShardEventStreamReader) EXPECT() *MockSubscribeToShardEventStreamReaderMockRecorder
- func (m *MockSubscribeToShardEventStreamReader) Err() error
- func (m *MockSubscribeToShardEventStreamReader) Events() <-chan kinesis.SubscribeToShardEventStreamEvent
- type MockSubscribeToShardEventStreamReaderMockRecorder
- type PartitionerFunction
- type StreamInterface
Constants ¶
This section is empty.
Variables ¶
var Partitioners = partitionersCollection{}
Functions ¶
func TakeSliceArg ¶
func TakeSliceArg(arg interface{}) (out []interface{}, ok bool)
Types ¶
type CustomRetryer ¶ added in v0.10.4
CustomRetryer retries on "connection reset by peer"
func (CustomRetryer) ShouldRetry ¶ added in v0.10.4
func (r CustomRetryer) ShouldRetry(req *request.Request) bool
type FakeStream ¶
type FakeStream struct { StreamInterface Data []string Stream StreamInterface Partitioner *PartitionerFunction }
func (*FakeStream) Datum ¶
func (s *FakeStream) Datum(i int, r interface{}) string
Datum gets item at the index i, and converts JSON at the index i to interface pointer r. Returns JSON string. Example:
t := MyStruct{} js := s.Datum(-1, &t)
func (*FakeStream) Get ¶
func (s *FakeStream) Get()
func (*FakeStream) Put ¶
func (s *FakeStream) Put(v interface{})
type KinesisInterface ¶
type KinesisInterface interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}
type MockKinesisInterface ¶
type MockKinesisInterface struct {
// contains filtered or unexported fields
}
MockKinesisInterface is a mock of KinesisInterface interface.
func NewMockKinesisInterface ¶
func NewMockKinesisInterface(ctrl *gomock.Controller) *MockKinesisInterface
NewMockKinesisInterface creates a new mock instance.
func (*MockKinesisInterface) EXPECT ¶
func (m *MockKinesisInterface) EXPECT() *MockKinesisInterfaceMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockKinesisInterface) PutRecords ¶
func (m *MockKinesisInterface) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
PutRecords mocks base method.
type MockKinesisInterfaceMockRecorder ¶
type MockKinesisInterfaceMockRecorder struct {
// contains filtered or unexported fields
}
MockKinesisInterfaceMockRecorder is the mock recorder for MockKinesisInterface.
func (*MockKinesisInterfaceMockRecorder) PutRecords ¶
func (mr *MockKinesisInterfaceMockRecorder) PutRecords(input any) *gomock.Call
PutRecords indicates an expected call of PutRecords.
type MockStreamInterface ¶
type MockStreamInterface struct {
// contains filtered or unexported fields
}
MockStreamInterface is a mock of StreamInterface interface.
func NewMockStreamInterface ¶
func NewMockStreamInterface(ctrl *gomock.Controller) *MockStreamInterface
NewMockStreamInterface creates a new mock instance.
func (*MockStreamInterface) EXPECT ¶
func (m *MockStreamInterface) EXPECT() *MockStreamInterfaceMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockStreamInterface) FlushAndStopStreaming ¶
func (m *MockStreamInterface) FlushAndStopStreaming()
FlushAndStopStreaming mocks base method.
func (*MockStreamInterface) Put ¶
func (m *MockStreamInterface) Put(record any) error
Put mocks base method.
type MockStreamInterfaceMockRecorder ¶
type MockStreamInterfaceMockRecorder struct {
// contains filtered or unexported fields
}
MockStreamInterfaceMockRecorder is the mock recorder for MockStreamInterface.
func (*MockStreamInterfaceMockRecorder) FlushAndStopStreaming ¶
func (mr *MockStreamInterfaceMockRecorder) FlushAndStopStreaming() *gomock.Call
FlushAndStopStreaming indicates an expected call of FlushAndStopStreaming.
type MockSubscribeToShardEventStreamEvent ¶
type MockSubscribeToShardEventStreamEvent struct {
// contains filtered or unexported fields
}
MockSubscribeToShardEventStreamEvent is a mock of SubscribeToShardEventStreamEvent interface.
func NewMockSubscribeToShardEventStreamEvent ¶
func NewMockSubscribeToShardEventStreamEvent(ctrl *gomock.Controller) *MockSubscribeToShardEventStreamEvent
NewMockSubscribeToShardEventStreamEvent creates a new mock instance.
func (*MockSubscribeToShardEventStreamEvent) EXPECT ¶
func (m *MockSubscribeToShardEventStreamEvent) EXPECT() *MockSubscribeToShardEventStreamEventMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockSubscribeToShardEventStreamEvent) MarshalEvent ¶
func (m *MockSubscribeToShardEventStreamEvent) MarshalEvent(arg0 protocol.PayloadMarshaler) (eventstream.Message, error)
MarshalEvent mocks base method.
func (*MockSubscribeToShardEventStreamEvent) UnmarshalEvent ¶
func (m *MockSubscribeToShardEventStreamEvent) UnmarshalEvent(arg0 protocol.PayloadUnmarshaler, arg1 eventstream.Message) error
UnmarshalEvent mocks base method.
type MockSubscribeToShardEventStreamEventMockRecorder ¶
type MockSubscribeToShardEventStreamEventMockRecorder struct {
// contains filtered or unexported fields
}
MockSubscribeToShardEventStreamEventMockRecorder is the mock recorder for MockSubscribeToShardEventStreamEvent.
func (*MockSubscribeToShardEventStreamEventMockRecorder) MarshalEvent ¶
func (mr *MockSubscribeToShardEventStreamEventMockRecorder) MarshalEvent(arg0 any) *gomock.Call
MarshalEvent indicates an expected call of MarshalEvent.
func (*MockSubscribeToShardEventStreamEventMockRecorder) UnmarshalEvent ¶
func (mr *MockSubscribeToShardEventStreamEventMockRecorder) UnmarshalEvent(arg0, arg1 any) *gomock.Call
UnmarshalEvent indicates an expected call of UnmarshalEvent.
type MockSubscribeToShardEventStreamReader ¶
type MockSubscribeToShardEventStreamReader struct {
// contains filtered or unexported fields
}
MockSubscribeToShardEventStreamReader is a mock of SubscribeToShardEventStreamReader interface.
func NewMockSubscribeToShardEventStreamReader ¶
func NewMockSubscribeToShardEventStreamReader(ctrl *gomock.Controller) *MockSubscribeToShardEventStreamReader
NewMockSubscribeToShardEventStreamReader creates a new mock instance.
func (*MockSubscribeToShardEventStreamReader) Close ¶
func (m *MockSubscribeToShardEventStreamReader) Close() error
Close mocks base method.
func (*MockSubscribeToShardEventStreamReader) EXPECT ¶
func (m *MockSubscribeToShardEventStreamReader) EXPECT() *MockSubscribeToShardEventStreamReaderMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockSubscribeToShardEventStreamReader) Err ¶
func (m *MockSubscribeToShardEventStreamReader) Err() error
Err mocks base method.
func (*MockSubscribeToShardEventStreamReader) Events ¶
func (m *MockSubscribeToShardEventStreamReader) Events() <-chan kinesis.SubscribeToShardEventStreamEvent
Events mocks base method.
type MockSubscribeToShardEventStreamReaderMockRecorder ¶
type MockSubscribeToShardEventStreamReaderMockRecorder struct {
// contains filtered or unexported fields
}
MockSubscribeToShardEventStreamReaderMockRecorder is the mock recorder for MockSubscribeToShardEventStreamReader.
func (*MockSubscribeToShardEventStreamReaderMockRecorder) Close ¶
func (mr *MockSubscribeToShardEventStreamReaderMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close.
func (*MockSubscribeToShardEventStreamReaderMockRecorder) Err ¶
func (mr *MockSubscribeToShardEventStreamReaderMockRecorder) Err() *gomock.Call
Err indicates an expected call of Err.
func (*MockSubscribeToShardEventStreamReaderMockRecorder) Events ¶
func (mr *MockSubscribeToShardEventStreamReaderMockRecorder) Events() *gomock.Call
Events indicates an expected call of Events.
type PartitionerFunction ¶
type PartitionerFunction func(record interface{}) string
PartitionerFunction is the common signature of all partitioners, it maps a record to a partition key.
func PartitionerPointer ¶
func PartitionerPointer(function PartitionerFunction) *PartitionerFunction
PartitionerPointer returns a pointer to the PartitionerPointer value passed in.
type StreamInterface ¶
type StreamInterface interface { Put(record interface{}) Error() <-chan error FlushAndStopStreaming() }
StreamInterface defines the interface for a Kinesis stream.
func NewKinesis ¶
func NewKinesis(config Config) (StreamInterface, error)
NewKinesis creates a new Kinesis stream.