Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaProducer ¶ added in v0.4.3
type KafkaProducer struct { core.ProducerBase // contains filtered or unexported fields }
KafkaProducer librdkafka producer plugin The kafka producer writes messages to a kafka cluster. This producer is backed by the native librdkafka (0.8.6) library so most settings relate to that library. This producer does not implement a fuse breaker. NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. Configuration example
- "native.KafkaProducer": ClientId: "weblog" RequiredAcks: 1 TimeoutMs: 1500 SendRetries: 0 Compression: "none" BatchSizeMaxKB: 1024 BatchMaxMessages: 100000 BatchMinMessages: 1000 BatchTimeoutMs: 1000 ServerTimeoutSec: 60 ServerMaxFails: 3 MetadataTimeoutMs: 1500 MetadataRefreshMs: 300000 KeyFormatter: "" Servers:
- "localhost:9092" Topic: "console" : "console"
SendRetries is mapped to message.send.max.retries. This defines the number of times librdkafka will try to re-send a message if it did not succeed. Set to 0 by default (don't retry).
Compression is mapped to compression.codec. Please note that "zip" has to be used instead of "gzip". Possible values are "none", "zip" and "snappy". By default this is set to "none".
TimeoutMs is mapped to request.timeout.ms. This defines the number of milliseconds to wait until a request is marked as failed. By default this is set to 1.5sec.
BatchSizeMaxKB is mapped to message.max.bytes (x1024). This defines the maximum message size in KB. By default this is set to 1 MB. Messages above this size are rejected.
BatchMaxMessages is mapped to queue.buffering.max.messages. This defines the maximum number of messages that can be pending at any given moment in time. If this limit is hit additional messages will be rejected. This value is set to 100.000 by default and should be adjusted according to your average message throughput.
BatchMinMessages is mapped to batch.num.messages. This defines the minimum number of messages required for a batch to be sent. This is set to 1000 by default and should be significantly lower than BatchMaxMessages to avoid messages to be rejected.
BatchTimeoutMs is mapped to queue.buffering.max.ms. This defines the number of milliseconds to wait until a batch is flushed to kafka. Set to 1sec by default.
ServerTimeoutSec is mapped to socket.timeout.ms. Defines the time in seconds after a server is defined as "not reachable". Set to 1 minute by default.
ServerMaxFails is mapped to socket.max.fails. Number of retries after a server is marked as "failing".
MetadataTimeoutMs is mapped to metadata.request.timeout.ms. Number of milliseconds a metadata request may take until considered as failed. Set to 1.5 seconds by default.
MetadataRefreshMs is mapped to topic.metadata.refresh.interval.ms. Interval in milliseconds for querying metadata. Set to 5 minutes by default.
Servers defines the list of brokers to produce messages to.
Topic defines a stream to topic mapping. If a stream is not mapped a topic named like the stream is assumed.
KeyFormatter defines the formatter used to extract keys from a message. Set to "" by default (disable).
func (*KafkaProducer) Configure ¶ added in v0.4.3
func (prod *KafkaProducer) Configure(conf core.PluginConfig) error
Configure initializes this producer with values from a plugin config.
func (*KafkaProducer) OnMessageDelivered ¶ added in v0.4.3
func (prod *KafkaProducer) OnMessageDelivered(userdata []byte)
OnMessageDelivered gets called by librdkafka on message delivery success
func (*KafkaProducer) OnMessageError ¶ added in v0.4.3
func (prod *KafkaProducer) OnMessageError(reason string, userdata []byte)
OnMessageError gets called by librdkafka on message delivery failure
func (*KafkaProducer) Produce ¶ added in v0.4.3
func (prod *KafkaProducer) Produce(workers *sync.WaitGroup)
Produce writes to a buffer that is sent to a given socket.
type PcapHTTPConsumer ¶
type PcapHTTPConsumer struct { core.ConsumerBase // contains filtered or unexported fields }
PcapHTTPConsumer consumer plugin Configuration example
- "native.PcapHTTPConsumer": Enable: true Interface: eth0 Filter: "dst port 80 and dst host 127.0.0.1" Promiscuous: true TimeoutMs: 3000
This plugin utilizes libpcap to listen for network traffic and reassamble http requests from it. As it uses a CGO based library it will break cross platform builds (i.e. you will have to compile it on the correct platform).
Interface defines the network interface to listen on. By default this is set to eth0, get your specific value from ifconfig.
Filter defines a libpcap filter for the incoming packages. You can filter for specific ports, portocols, ips, etc.. The documentation can be found here: http://www.tcpdump.org/manpages/pcap-filter.7.txt (manpage). By default this is set to listen on port 80 for localhost packages.
Promiscuous switches the network interface defined by Interface into promiscuous mode. This is required if you want to listen for all packages coming from the network, even those that were not meant for the ip bound to the interface you listen on. Enabling this can increase your CPU load. This setting is enabled by default.
TimeoutMs defines a timeout after which a tcp session is considered to have dropped, i.e. the (remaining) packages will be discarded. Every incoming packet will restart the timer for the specific client session. By default this is set to 3000, i.e. 3 seconds.
func (*PcapHTTPConsumer) Configure ¶
func (cons *PcapHTTPConsumer) Configure(conf core.PluginConfig) error
Configure initializes this consumer with values from a plugin config.
func (*PcapHTTPConsumer) Consume ¶
func (cons *PcapHTTPConsumer) Consume(workers *sync.WaitGroup)
Consume enables libpcap monitoring as configured.