Documentation ¶
Index ¶
- Variables
- type Config
- type KafkaOutput
- func (k *KafkaOutput) Close() error
- func (k *KafkaOutput) Init(ctx context.Context, name string, cfg map[string]interface{}, ...) error
- func (k *KafkaOutput) RegisterMetrics(reg *prometheus.Registry)
- func (k *KafkaOutput) SetClusterName(name string)
- func (k *KafkaOutput) SetEventProcessors(ps map[string]map[string]interface{}, logger *log.Logger, ...)
- func (k *KafkaOutput) SetLogger(logger *log.Logger)
- func (k *KafkaOutput) SetName(name string)
- func (k *KafkaOutput) SetTargetsConfig(map[string]*types.TargetConfig)
- func (k *KafkaOutput) String() string
- func (k *KafkaOutput) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta)
- func (k *KafkaOutput) WriteEvent(ctx context.Context, ev *formatters.EventMsg)
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
View Source
var KafkaNumberOfFailSendMsgs = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "gnmic", Subsystem: "kafka_output", Name: "number_of_kafka_msgs_sent_fail_total", Help: "Number of failed msgs sent by gnmic kafka output", }, []string{"producer_id", "reason"})
View Source
var KafkaNumberOfSentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "gnmic", Subsystem: "kafka_output", Name: "number_of_written_kafka_bytes_total", Help: "Number of bytes written by gnmic kafka output", }, []string{"producer_id"})
View Source
var KafkaNumberOfSentMsgs = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "gnmic", Subsystem: "kafka_output", Name: "number_of_kafka_msgs_sent_success_total", Help: "Number of msgs successfully sent by gnmic kafka output", }, []string{"producer_id"})
View Source
var KafkaSendDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "gnmic", Subsystem: "kafka_output", Name: "msg_send_duration_ns", Help: "gnmic kafka output send duration in ns", }, []string{"producer_id"})
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Address string `mapstructure:"address,omitempty"` Topic string `mapstructure:"topic,omitempty"` Name string `mapstructure:"name,omitempty"` SASL *sasl `mapstructure:"sasl,omitempty"` TLS *tlsConfig `mapstructure:"tls,omitempty"` MaxRetry int `mapstructure:"max-retry,omitempty"` Timeout time.Duration `mapstructure:"timeout,omitempty"` RecoveryWaitTime time.Duration `mapstructure:"recovery-wait-time,omitempty"` Format string `mapstructure:"format,omitempty"` AddTarget string `mapstructure:"add-target,omitempty"` TargetTemplate string `mapstructure:"target-template,omitempty"` MsgTemplate string `mapstructure:"msg-template,omitempty"` NumWorkers int `mapstructure:"num-workers,omitempty"` Debug bool `mapstructure:"debug,omitempty"` BufferSize int `mapstructure:"buffer-size,omitempty"` OverrideTimestamps bool `mapstructure:"override-timestamps,omitempty"` EnableMetrics bool `mapstructure:"enable-metrics,omitempty"` EventProcessors []string `mapstructure:"event-processors,omitempty"` }
Config //
type KafkaOutput ¶
type KafkaOutput struct { Cfg *Config // contains filtered or unexported fields }
KafkaOutput //
func (*KafkaOutput) Init ¶
func (k *KafkaOutput) Init(ctx context.Context, name string, cfg map[string]interface{}, opts ...outputs.Option) error
Init /
func (*KafkaOutput) RegisterMetrics ¶ added in v0.7.0
func (k *KafkaOutput) RegisterMetrics(reg *prometheus.Registry)
Metrics //
func (*KafkaOutput) SetClusterName ¶ added in v0.8.0
func (k *KafkaOutput) SetClusterName(name string)
func (*KafkaOutput) SetEventProcessors ¶ added in v0.6.0
func (k *KafkaOutput) SetEventProcessors(ps map[string]map[string]interface{}, logger *log.Logger, tcs map[string]*types.TargetConfig, acts map[string]map[string]interface{})
func (*KafkaOutput) SetLogger ¶ added in v0.5.2
func (k *KafkaOutput) SetLogger(logger *log.Logger)
func (*KafkaOutput) SetName ¶ added in v0.7.0
func (k *KafkaOutput) SetName(name string)
func (*KafkaOutput) SetTargetsConfig ¶ added in v0.18.0
func (k *KafkaOutput) SetTargetsConfig(map[string]*types.TargetConfig)
func (*KafkaOutput) String ¶
func (k *KafkaOutput) String() string
func (*KafkaOutput) WriteEvent ¶ added in v0.7.0
func (k *KafkaOutput) WriteEvent(ctx context.Context, ev *formatters.EventMsg)
type XDGSCRAMClient ¶ added in v0.9.0
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶ added in v0.9.0
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶ added in v0.9.0
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.