kafka

package
v14.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2021 License: MIT Imports: 20 Imported by: 0

README

Kafka Sink

The Kafka sink allows flushing of metrics or spans to to a Kafka topic.

Configuration

See the various kafka_* keys in example.yaml for all available configuration options. This sink supports the following features:

Status

This sink is stable. Some some encoding or options may change. This sink is in active development.

TODO

  • Uses the async client, but doesn't currently do anything on failure.

  • Does not currently handle writes of events or checks

  • batching

  • ack requirements

  • publishing of Protobuf or JSON formatted messages

Span Sampling

The Kafka sink supports span sampling! By default, setting kafka_span_sample_rate_percent less than 100 will sample based off of traceId (meaning that if one span with a particular traceId is selected, all spans with that traceId will be selected), but that behavior can be configured to use a tag instead via kafka_span_sample_tag. For example,

kafka_span_sample_tag: "request_id"
kafka_span_sample_rate_percent: 75

With this configuration, spans without the "request_id" tag will be rejected, and spans with the "request_id" will be sampled at 75%, based off of a hash of their "request_id" value; in this way, you can sample all values relevant to a particular tag value.

Format

Metrics are published in JSON in the form of:

{
  "name": "some.metric",
  "timestamp": 1234567, // unix time
  "value": 1.0,
  "tags": [ "tag:value", … ],
  "type": "gauge" // counter, etc
}

Spans are published in one of JSON or Protobuf. The form is defined in SSF's protobuf and codegen output. Note that it has a version field for compatibility in the future.

Documentation

Index

Constants

View Source
const IngestTimeout = 5 * time.Second

Variables

View Source
var IngestTimeoutError = errors.New("Timed out writing to Kafka producer")

Functions

This section is empty.

Types

type KafkaMetricSink

type KafkaMetricSink struct {
	// contains filtered or unexported fields
}

func NewKafkaMetricSink

func NewKafkaMetricSink(logger *logrus.Logger, cl *trace.Client, brokers string, checkTopic string, eventTopic string, metricTopic string, ackRequirement string, partitioner string, retries int, bufferBytes int, bufferMessages int, bufferDuration string) (*KafkaMetricSink, error)

NewKafkaMetricSink creates a new Kafka Plugin.

func (*KafkaMetricSink) Flush

func (k *KafkaMetricSink) Flush(ctx context.Context, interMetrics []samplers.InterMetric) error

Flush sends a slice of metrics to Kafka

func (*KafkaMetricSink) FlushOtherSamples

func (k *KafkaMetricSink) FlushOtherSamples(ctx context.Context, samples []ssf.SSFSample)

FlushOtherSamples flushes non-metric, non-span samples

func (*KafkaMetricSink) Name

func (k *KafkaMetricSink) Name() string

Name returns the name of this sink.

func (*KafkaMetricSink) Start

func (k *KafkaMetricSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

type KafkaSpanSink

type KafkaSpanSink struct {
	// contains filtered or unexported fields
}

func NewKafkaSpanSink

func NewKafkaSpanSink(logger *logrus.Logger, cl *trace.Client, brokers string, topic string, partitioner string, ackRequirement string, retries int, bufferBytes int, bufferMessages int, bufferDuration string, serializationFormat string, sampleTag string, sampleRatePercentage float64) (*KafkaSpanSink, error)

NewKafkaSpanSink creates a new Kafka Plugin.

func (*KafkaSpanSink) Flush

func (k *KafkaSpanSink) Flush()

Flush emits metrics, since the spans have already been ingested and are sending async.

func (*KafkaSpanSink) Ingest

func (k *KafkaSpanSink) Ingest(span *ssf.SSFSpan) error

Ingest takes the span and adds it to Kafka producer for async flushing. The flushing is driven by the settings from KafkaSpanSink's constructor. Tune the bytes, messages and interval settings to your tastes!

func (*KafkaSpanSink) Name

func (k *KafkaSpanSink) Name() string

Name returns the name of this sink.

func (*KafkaSpanSink) Start

func (k *KafkaSpanSink) Start(cl *trace.Client) error

Start performs final adjustments on the sink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL