kafka

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

kafka输入源

该插件可从kafka采集数据到日志服务。

参数说明

插件类型(type)为 service_kafka

参数 类型 必选或可选 参数说明
ConsumerGroup string 必选 消费组名。
ClientID string 必选 客户机ID。
Topics string 必选 kafka主题。
Brokers string 必选 kafka代理。
MaxMessageLen int 可选 正整数,最大消息长度(1~524288)。
Version string 可选 kafka版本,默认为空。
Offset string 可选 初始偏移量(oldest,newest),默认oldest。
SASLUsername string 可选 SASL用户名。
SASLPassword string 可选 SASL密码。
示例

从kafka采集数据到日志服务,配置详情如下:

  • 配置详情
{
  "inputs":[
    {
      "type":"service_kafka",
      "detail":{
        "Brokers": ["localhost:9092"],
        "Topics": ["test1"],
        "ConsumerGroup": "group1",
        "ClientID": "id1",
        "Offset": "oldest"
      }
    }
  ]
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type InputKafka

type InputKafka struct {
	ConsumerGroup string
	ClientID      string
	Topics        []string
	Brokers       []string
	MaxMessageLen int
	Version       string
	Offset        string
	SASLUsername  string
	SASLPassword  string
	// Assignor Consumer group partition assignment strategy (range, roundrobin, sticky)
	Assignor string
	// Decoder the decoder to use, default is "ext_default_decoder"
	Decoder           string
	Format            string
	FieldsExtend      bool
	DisableUncompress bool
	// contains filtered or unexported fields
}

func (*InputKafka) Cleanup added in v1.6.0

Cleanup implements ConsumerGroupHandler

func (*InputKafka) Collect

func (k *InputKafka) Collect(collector pipeline.Collector) error

func (*InputKafka) ConsumeClaim added in v1.6.0

func (k *InputKafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim implements ConsumerGroupHandler, must start a consumer loop of ConsumerGroupClaim's Messages().

func (*InputKafka) Description

func (k *InputKafka) Description() string

func (*InputKafka) Init

func (k *InputKafka) Init(context pipeline.Context) (int, error)

func (*InputKafka) Setup added in v1.6.0

func (k *InputKafka) Setup(session sarama.ConsumerGroupSession) error

Setup implements ConsumerGroupHandler

func (*InputKafka) Start

func (k *InputKafka) Start(collector pipeline.Collector) error

func (*InputKafka) StartService added in v1.6.0

func (k *InputKafka) StartService(context pipeline.PipelineContext) error

func (*InputKafka) Stop

func (k *InputKafka) Stop() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL