Documentation ¶
Index ¶
- Constants
- func Init(config *Config) (*sarama.Config, error)
- func NewSyncProducer(config *Config) (sarama.SyncProducer, error)
- type CompressionStrategy
- type Config
- type ItemExporter
- type Kafka
- func (h *Kafka) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
- func (h *Kafka) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
- func (h *Kafka) Name() string
- func (h *Kafka) Start(ctx context.Context) error
- func (h *Kafka) Stop(ctx context.Context) error
- func (h *Kafka) Type() string
- type PartitionStrategy
- type RequiredAcks
- type TLSClientConfig
Constants ¶
View Source
const SinkType = "kafka"
Variables ¶
This section is empty.
Functions ¶
func NewSyncProducer ¶
func NewSyncProducer(config *Config) (sarama.SyncProducer, error)
Types ¶
type CompressionStrategy ¶
type CompressionStrategy string
var ( CompressionStrategyNone CompressionStrategy = "none" CompressionStrategyGZIP CompressionStrategy = "gzip" CompressionStrategySnappy CompressionStrategy = "snappy" CompressionStrategyLZ4 CompressionStrategy = "lz4" CompressionStrategyZSTD CompressionStrategy = "zstd" )
type Config ¶
type Config struct { Brokers string `yaml:"brokers"` Topic string `yaml:"topic"` TLS bool `yaml:"tls" default:"false"` TLSClientConfig TLSClientConfig `yaml:"tlsClientConfig"` MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"` FlushMessages int `yaml:"flushMessages" default:"500"` FlushBytes int `yaml:"flushBytes" default:"1000000"` MaxRetries int `yaml:"maxRetries" default:"3"` Compression CompressionStrategy `yaml:"compression" default:"none"` RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"` Partitioning PartitionStrategy `yaml:"partitioning" default:"none"` }
type ItemExporter ¶
type ItemExporter struct {
// contains filtered or unexported fields
}
func NewItemExporter ¶
func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error)
func (ItemExporter) ExportItems ¶
func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
func New ¶
func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig, shippingMethod processor.ShippingMethod) (*Kafka, error)
func (*Kafka) HandleNewDecoratedEvent ¶
func (*Kafka) HandleNewDecoratedEvents ¶
type PartitionStrategy ¶
type PartitionStrategy string
var ( PartitionStrategyNone PartitionStrategy = "none" PartitionStrategyRandom PartitionStrategy = "random" )
type RequiredAcks ¶
type RequiredAcks string
var ( RequiredAcksLeader RequiredAcks = "leader" RequiredAcksAll RequiredAcks = "all" RequiredAcksNone RequiredAcks = "none" )
type TLSClientConfig ¶ added in v1.0.12
type TLSClientConfig struct { CertificatePath string `yaml:"certificatePath"` KeyPath string `yaml:"keyPath"` CACertificate string `yaml:"caCertificate"` }
func (*TLSClientConfig) Validate ¶ added in v1.0.12
func (c *TLSClientConfig) Validate() error
Click to show internal directories.
Click to hide internal directories.