inskinesis

package
v0.10.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

README

Kinesis Package

The inskinesis package is a Go library designed to facilitate streaming data to Amazon Kinesis streams. This README provides an overview of the package's functionality and usage.

Table of Contents

Introduction

The inskinesis package is designed to make it easier to stream data to Amazon Kinesis streams in your Go applications. It provides a simple interface for sending records to a Kinesis stream while handling batching, partitioning, and retries. This can be especially useful for applications that generate a high volume of data and need to send it to Kinesis efficiently.

Installation

To use the inskinesis package in your Go project, you can install it using Go modules. Run the following command in your project directory:

go get github.com/useinsider/go-pkg/inskinesis

Getting Started

Here's a quick guide on how to get started with the inskinesis package:

  1. Import the package in your Go code:

    import "github.com/useinsider/go-pkg/inskinesis"
    
  2. Create a configuration for your Kinesis stream:

    config := inskinesis.Config{
        Region:                 "your-aws-region",
        StreamName:             "your-kinesis-stream-name",
        Partitioner:            nil, // Optionally provide a partitioner function
        MaxStreamBatchSize:     100, // Maximum size of each batch of records
        MaxStreamBatchByteSize: 1024 * 1024, // Maximum size in bytes for each batch
        MaxBatchSize:           500, // Maximum size of the log buffer
        MaxGroup:               10, // Maximum number of concurrent groups for sending records
    }
    
Field Default Value Description
Region N/A The AWS region where the Kinesis stream is located. Required
StreamName N/A The name of the Kinesis stream. Required
Partitioner UUID An optional partitioner function used to determine the partition key for records. If not provided, a default UUID-based partitioner is used.
MaxStreamBatchSize 100 The maximum size of each batch of records to be sent to the stream.
MaxStreamBatchByteSize 256 KB (2^18 byte) The maximum size (in bytes) of each batch of records.
MaxBatchSize 500 The maximum size of the log buffer for accumulating log records before batching.
MaxGroup 1 The maximum number of concurrent groups for sending records. If you want to send records concurrently, set this value to a number greater than 1.
RetryCount 3 The number of times to retry sending a batch of records to the stream.
RetryInterval 100 ms The interval between retries.
Verbose false Whether to enable verbose logging.

Please note that N/A in the Default Value column indicates that these fields are required and do not have default values.

  1. Create a Kinesis stream instance:

    stream, err := inskinesis.NewKinesis(config)
    if err != nil {
        // Handle the error
    }
    
  2. Send records to the Kinesis stream:

    // Send a single record
    stream.Put(yourRecord)
    
    // Send multiple records
    records := []interface{}{record1, record2, record3}
    for _, record := range records {
        stream.Put(record)
    }
    
  3. To ensure all records are sent and clean up resources, flush and stop streaming:

    stream.FlushAndStopStreaming()
    

Package Structure

The inskinesis package is organized as follows:

  • inskinesis package: The main package containing the StreamInterface, stream, and related functionality for streaming records to Kinesis.
  • PartitionerFunction: A customizable partitioning function for determining the partition key of records.
  • Various error handling and logging functionality.

Usage

The package provides a simple interface for streaming records to a Kinesis stream. You can customize the configuration based on your needs, including the region, stream name, batch sizes, and partitioning function.

Here's an example of how to use the package:

import "github.com/useinsider/go-pkg/inskinesis"

config := inskinesis.Config{
    Region:                 "your-aws-region",
    StreamName:             "your-kinesis-stream-name",
    Partitioner:            nil, // Optionally provide a partitioner function
    MaxStreamBatchSize:     100, // Maximum size of each batch of records
    MaxStreamBatchByteSize: 1024 * 1024, // Maximum size in bytes for each batch
    MaxBatchSize:           500,         // Maximum size of the log buffer
    MaxGroup:               10, // Maximum number of concurrent groups for sending records
}

stream, err := inskinesis.NewKinesis(config)
if err != nil {
// Handle the error
}

// Send records to the stream
stream.Put(yourRecord)

// To ensure all records are sent and clean up resources, flush and stop streaming
stream.FlushAndStopStreaming()

Error Handling

The inskinesis package provides error channels for receiving errors during streaming. You can use these channels to handle errors in your application gracefully. It's important to monitor the error channels to ensure the robustness of your data streaming process.

Here's an example of how to use the error channels:

go func () {
    for {
        select {
        case err := <-stream.Error():
            sentry.Error(err)
        }
    }
}()

Contributing

If you would like to contribute to the inskinesis package, please follow standard Go community guidelines for contributions. You can create issues, submit pull requests, and help improve the package for everyone.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Partitioners = partitionersCollection{}

Functions

func TakeSliceArg

func TakeSliceArg(arg interface{}) (out []interface{}, ok bool)

Types

type Config

type Config struct {
	Region                 string
	StreamName             string
	Partitioner            *PartitionerFunction
	MaxStreamBatchSize     int
	MaxStreamBatchByteSize int
	MaxBatchSize           int
	MaxGroup               int
	RetryCount             int
	RetryWaitTime          time.Duration
	Verbose                bool
}

type CustomRetryer added in v0.10.4

type CustomRetryer struct {
	request.Retryer
}

CustomRetryer retries on "connection reset by peer"

func (CustomRetryer) ShouldRetry added in v0.10.4

func (r CustomRetryer) ShouldRetry(req *request.Request) bool

type FakeStream

type FakeStream struct {
	StreamInterface
	Data        []string
	Stream      StreamInterface
	Partitioner *PartitionerFunction
}

func (*FakeStream) Datum

func (s *FakeStream) Datum(i int, r interface{}) string

Datum gets item at the index i, and converts JSON at the index i to interface pointer r. Returns JSON string. Example:

t := MyStruct{}
js := s.Datum(-1, &t)

func (*FakeStream) Get

func (s *FakeStream) Get()

func (*FakeStream) Put

func (s *FakeStream) Put(v interface{})

type KinesisInterface

type KinesisInterface interface {
	PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

type MockKinesisInterface

type MockKinesisInterface struct {
	// contains filtered or unexported fields
}

MockKinesisInterface is a mock of KinesisInterface interface.

func NewMockKinesisInterface

func NewMockKinesisInterface(ctrl *gomock.Controller) *MockKinesisInterface

NewMockKinesisInterface creates a new mock instance.

func (*MockKinesisInterface) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockKinesisInterface) PutRecords

PutRecords mocks base method.

type MockKinesisInterfaceMockRecorder

type MockKinesisInterfaceMockRecorder struct {
	// contains filtered or unexported fields
}

MockKinesisInterfaceMockRecorder is the mock recorder for MockKinesisInterface.

func (*MockKinesisInterfaceMockRecorder) PutRecords

func (mr *MockKinesisInterfaceMockRecorder) PutRecords(input any) *gomock.Call

PutRecords indicates an expected call of PutRecords.

type MockStreamInterface

type MockStreamInterface struct {
	// contains filtered or unexported fields
}

MockStreamInterface is a mock of StreamInterface interface.

func NewMockStreamInterface

func NewMockStreamInterface(ctrl *gomock.Controller) *MockStreamInterface

NewMockStreamInterface creates a new mock instance.

func (*MockStreamInterface) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockStreamInterface) FlushAndStopStreaming

func (m *MockStreamInterface) FlushAndStopStreaming()

FlushAndStopStreaming mocks base method.

func (*MockStreamInterface) Put

func (m *MockStreamInterface) Put(record any) error

Put mocks base method.

type MockStreamInterfaceMockRecorder

type MockStreamInterfaceMockRecorder struct {
	// contains filtered or unexported fields
}

MockStreamInterfaceMockRecorder is the mock recorder for MockStreamInterface.

func (*MockStreamInterfaceMockRecorder) FlushAndStopStreaming

func (mr *MockStreamInterfaceMockRecorder) FlushAndStopStreaming() *gomock.Call

FlushAndStopStreaming indicates an expected call of FlushAndStopStreaming.

func (*MockStreamInterfaceMockRecorder) Put

Put indicates an expected call of Put.

type MockSubscribeToShardEventStreamEvent

type MockSubscribeToShardEventStreamEvent struct {
	// contains filtered or unexported fields
}

MockSubscribeToShardEventStreamEvent is a mock of SubscribeToShardEventStreamEvent interface.

func NewMockSubscribeToShardEventStreamEvent

func NewMockSubscribeToShardEventStreamEvent(ctrl *gomock.Controller) *MockSubscribeToShardEventStreamEvent

NewMockSubscribeToShardEventStreamEvent creates a new mock instance.

func (*MockSubscribeToShardEventStreamEvent) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSubscribeToShardEventStreamEvent) MarshalEvent

MarshalEvent mocks base method.

func (*MockSubscribeToShardEventStreamEvent) UnmarshalEvent

UnmarshalEvent mocks base method.

type MockSubscribeToShardEventStreamEventMockRecorder

type MockSubscribeToShardEventStreamEventMockRecorder struct {
	// contains filtered or unexported fields
}

MockSubscribeToShardEventStreamEventMockRecorder is the mock recorder for MockSubscribeToShardEventStreamEvent.

func (*MockSubscribeToShardEventStreamEventMockRecorder) MarshalEvent

MarshalEvent indicates an expected call of MarshalEvent.

func (*MockSubscribeToShardEventStreamEventMockRecorder) UnmarshalEvent

func (mr *MockSubscribeToShardEventStreamEventMockRecorder) UnmarshalEvent(arg0, arg1 any) *gomock.Call

UnmarshalEvent indicates an expected call of UnmarshalEvent.

type MockSubscribeToShardEventStreamReader

type MockSubscribeToShardEventStreamReader struct {
	// contains filtered or unexported fields
}

MockSubscribeToShardEventStreamReader is a mock of SubscribeToShardEventStreamReader interface.

func NewMockSubscribeToShardEventStreamReader

func NewMockSubscribeToShardEventStreamReader(ctrl *gomock.Controller) *MockSubscribeToShardEventStreamReader

NewMockSubscribeToShardEventStreamReader creates a new mock instance.

func (*MockSubscribeToShardEventStreamReader) Close

Close mocks base method.

func (*MockSubscribeToShardEventStreamReader) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSubscribeToShardEventStreamReader) Err

Err mocks base method.

func (*MockSubscribeToShardEventStreamReader) Events

Events mocks base method.

type MockSubscribeToShardEventStreamReaderMockRecorder

type MockSubscribeToShardEventStreamReaderMockRecorder struct {
	// contains filtered or unexported fields
}

MockSubscribeToShardEventStreamReaderMockRecorder is the mock recorder for MockSubscribeToShardEventStreamReader.

func (*MockSubscribeToShardEventStreamReaderMockRecorder) Close

Close indicates an expected call of Close.

func (*MockSubscribeToShardEventStreamReaderMockRecorder) Err

Err indicates an expected call of Err.

func (*MockSubscribeToShardEventStreamReaderMockRecorder) Events

Events indicates an expected call of Events.

type PartitionerFunction

type PartitionerFunction func(record interface{}) string

PartitionerFunction is the common signature of all partitioners, it maps a record to a partition key.

func PartitionerPointer

func PartitionerPointer(function PartitionerFunction) *PartitionerFunction

PartitionerPointer returns a pointer to the PartitionerPointer value passed in.

type StreamInterface

type StreamInterface interface {
	Put(record interface{})
	Error() <-chan error
	FlushAndStopStreaming()
}

StreamInterface defines the interface for a Kinesis stream.

func NewKinesis

func NewKinesis(config Config) (StreamInterface, error)

NewKinesis creates a new Kinesis stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL