sink

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2022 License: MIT Imports: 20 Imported by: 0

README

sink

Contains interfaces and methods for sinking data to external services. As a general rule, sinks should support all formats of data if possible.

Each sink must use a select statement to read data from its channel and check if context was cancelled to prevent goroutine leaks (learn more about goroutine leaks here).

Information for each sink is available in the GoDoc.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DynamoDB

type DynamoDB struct {
	Table    string `json:"table"`
	ItemsKey string `json:"items_key"`
}

DynamoDB sinks JSON data to an AWS DynamoDB table. This sink supports sinking multiple rows from the same event to a table.

The sink has these settings:

Table:
	DynamoDB table that data is written to
ItemsKey:
	JSON key-value that contains maps that represent items to be stored in the DynamoDB table
	This key can be a single map or an array of maps:
		[
			{
				"PK": "foo",
				"SK": "bar",
			},
			{
				"PK": "baz",
				"SK": "qux",
			}
		]

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "dynamodb",
	"settings": {
		"table": "foo-table",
		"items_key": "foo"
	}
}

func (*DynamoDB) Send

func (sink *DynamoDB) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the DynamoDB sink.

type Firehose added in v0.4.0

type Firehose struct {
	Stream string `json:"stream"`
}

Firehose sinks data to an AWS Kinesis Firehose Delivery Stream. This sink uploads data in batches of records and will automatically retry any failed put record attempts.

The sink has these settings:

Stream:
	Kinesis Firehose Delivery Stream that data is sent to

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "firehose",
	"settings": {
		"stream": "foo"
	}
}

func (*Firehose) Send added in v0.4.0

func (sink *Firehose) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the Kinesis sink.

type HTTP

type HTTP struct {
	URL     string `json:"url"`
	Headers []struct {
		Key   string `json:"key"`
		Value string `json:"value"`
	} `json:"headers"`
	HeadersKey string `json:"headers_key"`
}

HTTP sinks JSON data to an HTTP(S) endpoint.

The sink has these settings:

URL:
	HTTP(S) endpoint that data is sent to
Headers (optional):
	contains configured maps that represent HTTP headers to be sent in the HTTP request
	defaults to no headers
HeadersKey (optional):
	JSON key-value that contains maps that represent HTTP headers to be sent in the HTTP request
	This key can be a single map or an array of maps:
		[
			{
				"FOO": "bar",
			},
			{
				"BAZ": "qux",
			}
		]

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "http",
	"settings": {
		"url": "foo.com/bar"
	}
}

func (*HTTP) Send

func (sink *HTTP) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the HTTP sink.

type Kinesis

type Kinesis struct {
	Stream              string `json:"stream"`
	Partition           string `json:"partition"`
	PartitionKey        string `json:"partition_key"`
	ShardRedistribution bool   `json:"shard_redistribution"`
}

Kinesis sinks data to an AWS Kinesis Data Stream using Kinesis Producer Library (KPL) compliant aggregated records. This sink can automatically redistribute data across shards by retrieving partition keys from JSON data; by default, it uses random strings to avoid hot shards. More information about the KPL and its schema is available here: https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html.

The sink has these settings:

Stream:
	Kinesis Data Stream that data is sent to
Partition (optional):
	string that is used as the partition key for the aggregated record
PartitionKey (optional):
	JSON key-value that is used as the partition key for the aggregated record
ShardRedistribution (optional):
	determines if data should be redistributed across shards based on the partition key
	if enabled with an empty partition key, then data aggregation is disabled
	defaults to false, data is randomly distributed across shards

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "kinesis",
	"settings": {
		"stream": "foo"
	}
}

func (*Kinesis) Send

func (sink *Kinesis) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the Kinesis sink.

type S3

type S3 struct {
	Bucket    string `json:"bucket"`
	Prefix    string `json:"prefix"`
	PrefixKey string `json:"prefix_key"`
}

S3 sinks data as gzip compressed objects to an AWS S3 bucket. Object names contain the year, month, and day the data was processed by the sink; they can be optionally prefixed with a custom string.

The sink has these settings:

Bucket:
	S3 bucket that data is written to
Prefix (optional):
	prefix prepended to the S3 object name
	defaults to no prefix
PrefixKey (optional):
	JSON key-value that is used as the prefix prepended to the S3 object name, overrides Prefix
	defaults to no prefix

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "s3",
	"settings": {
		"bucket": "foo-bucket"
	}
}

func (*S3) Send

func (sink *S3) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the S3 sink.

type SQS added in v0.4.0

type SQS struct {
	Queue string `json:"queue"`
}

SQS sinks data to an AWS SQS queue.

The sink has these settings:

Queue:
	SQS queue name that data is sent to

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "sqs",
	"settings": {
		"queue": "foo"
	}
}

func (*SQS) Send added in v0.4.0

func (sink *SQS) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the Kinesis sink.

type Sink

type Sink interface {
	Send(context.Context, *config.Channel) error
}

Sink is an interface for sending data to external services. Sinks read channels of capsules and are interruptable.

func Factory

func Factory(cfg config.Config) (Sink, error)

Factory returns a configured Sink from a config. This is the recommended method for retrieving ready-to-use Sinks.

type Stdout

type Stdout struct{}

Stdout sinks data to stdout.

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "stdout"
}

func (*Stdout) Send

func (sink *Stdout) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the Stdout sink.

type SumoLogic

type SumoLogic struct {
	URL         string `json:"url"`
	Category    string `json:"category"`
	CategoryKey string `json:"category_key"`
}

SumoLogic sinks JSON data to Sumo Logic using an HTTP collector. More information about Sumo Logic HTTP collectors is available here: https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Data-to-an-HTTP-Source.

The sink has these settings:

URL:
	HTTP(S) endpoint that data is sent to
Category (optional):
	configured Sumo Logic source category
	defaults to no source category, which sends data to the source category configured for URL
CategoryKey (optional):
	JSON key-value that is used as the Sumo Logic source category, overrides Category
	defaults to no source category, which sends data to the source category configured for URL

When loaded with a factory, the sink uses this JSON configuration:

{
	"type": "sumologic",
	"settings": {
		"url": "foo.com/bar"
	}
}

func (*SumoLogic) Send

func (sink *SumoLogic) Send(ctx context.Context, ch *config.Channel) error

Send sinks a channel of encapsulated data with the SumoLogic sink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL