README
¶
Kinesis Consumer Input Plugin
The Kinesis consumer plugin reads from a Kinesis data stream and creates metrics using one of the supported input data formats.
Configuration
[[inputs.kinesis_consumer]]
## Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
# access_key = ""
# secret_key = ""
# token = ""
# role_arn = ""
# profile = ""
# shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
## ex: endpoint_url = "http://localhost:8000"
# endpoint_url = ""
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
# shard_iterator_type = "TRIM_HORIZON"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "default"
Required AWS IAM permissions
Kinesis:
- DescribeStream
- GetRecords
- GetShardIterator
DynamoDB:
- GetItem
- PutItem
DynamoDB Checkpoint
The DynamoDB checkpoint stores the last processed record in a DynamoDB. To leverage this functionality, create a table with the folowing string type keys:
Partition key: namespace
Sort key: shard_id
Documentation
¶
Index ¶
- type DynamoDB
- type KinesisConsumer
- func (k *KinesisConsumer) Description() string
- func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error
- func (k *KinesisConsumer) Get(streamName, shardID string) (string, error)
- func (k *KinesisConsumer) SampleConfig() string
- func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error
- func (k *KinesisConsumer) SetParser(parser parsers.Parser)
- func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error
- func (k *KinesisConsumer) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KinesisConsumer ¶
type KinesisConsumer struct { Region string `toml:"region"` AccessKey string `toml:"access_key"` SecretKey string `toml:"secret_key"` RoleARN string `toml:"role_arn"` Profile string `toml:"profile"` Filename string `toml:"shared_credential_file"` Token string `toml:"token"` EndpointURL string `toml:"endpoint_url"` StreamName string `toml:"streamname"` ShardIteratorType string `toml:"shard_iterator_type"` DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` Log telegraf.Logger // contains filtered or unexported fields }
func (*KinesisConsumer) Description ¶
func (k *KinesisConsumer) Description() string
func (*KinesisConsumer) Gather ¶
func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error
func (*KinesisConsumer) Get ¶
func (k *KinesisConsumer) Get(streamName, shardID string) (string, error)
Get wraps the checkpoint's Get function (called by consumer library)
func (*KinesisConsumer) SampleConfig ¶
func (k *KinesisConsumer) SampleConfig() string
func (*KinesisConsumer) Set ¶
func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error
Set wraps the checkpoint's Set function (called by consumer library)
func (*KinesisConsumer) SetParser ¶
func (k *KinesisConsumer) SetParser(parser parsers.Parser)
func (*KinesisConsumer) Start ¶
func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error
func (*KinesisConsumer) Stop ¶
func (k *KinesisConsumer) Stop()
Click to show internal directories.
Click to hide internal directories.