producer

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2015 License: Apache-2.0 Imports: 26 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Console

type Console struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Console producer plugin Configuration example

  • "producer.Console": Enable: true Console: "stderr"

The console producer writes messages to the standard output streams.

Console may either be "stdout" or "stderr". By default it is set to "stdout".

func (*Console) Configure

func (prod *Console) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Console) Produce

func (prod *Console) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type ElasticSearch

type ElasticSearch struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

ElasticSearch producer plugin Configuration example

  • "producer.ElasticSearch": Enable: true Connections: 10 RetrySec: 5 TTL: "1d" DayBasedIndex: false User: "root" Password: "root" BatchSizeByte: 65535 BatchMaxCount: 512 BatchTimeoutSec: 5 Port: 9200 Servers:
  • "localhost" Index: "console" : "default" "_GOLLUM_" : "default" Type: "console" : "log" "_GOLLUM_" : "gollum" Stream:
  • "console"
  • "_GOLLUM_"

The ElasticSearch producer sends messages to elastic search using the bulk http API.

RetrySec denotes the time in seconds after which a failed dataset will be transmitted again. By default this is set to 5.

Connections defines the number of simultaneous connections allowed to a elasticsearch server. This is set to 6 by default.

TTL defines the TTL set in elasticsearch messages. By default this is set to "" which means no TTL.

DayBasedIndex can be set to true to append the date of the message to the index as in "<index>_YYYY-MM-DD". By default this is set to false.

Servers defines a list of servers to connect to. The first server in the list is used as the server passed to the "Domain" setting. The Domain setting can be overwritten, too.

Port defines the elasticsearch port, wich has to be the same for all servers. By default this is set to 9200.

User and Password can be used to pass credentials to the elasticsearch server. By default both settings are empty.

Index maps a stream to a specific index. You can define the wildcard stream (*) here, too. If set all streams that do not have a specific mapping will go to this stream (including _GOLLUM_). If no category mappings are set the stream name is used.

Type maps a stream to a specific type. This behaves like the index map and is used to assign a _type to an elasticsearch message. By default the type "log" is used.

BatchSizeByte defines the size in bytes required to trigger a flush. By default this is set to 32768 (32KB).

BatchMaxCount defines the number of documents required to trigger a flush. By default this is set to 256.

BatchTimeoutSec defines the time in seconds after which a flush will be triggered. By default this is set to 5.

func (*ElasticSearch) Configure

func (prod *ElasticSearch) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*ElasticSearch) Produce

func (prod *ElasticSearch) Produce(workers *sync.WaitGroup)

Produce starts a bluk indexer

type File

type File struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

File producer plugin Configuration example

  • "producer.File": Enable: true File: "/var/log/gollum.log" BatchSizeMaxKB: 16384 BatchSizeByte: 4096 BatchTimeoutSec: 2 FlushTimeoutSec: 10 Rotate: false RotateTimeoutMin: 1440 RotateSizeMB: 1024 RotateAt: "00:00" RotateTimestamp: "2006-01-02_15" Compress: true

The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary.

File contains the path to the log file to write. The wildcard character "*" can be used as a placeholder for the stream name. By default this is set to /var/prod/gollum.log.

BatchSizeMaxKB defines the internal file buffer size in KB. This producers allocates a front- and a backbuffer of this size. If the frontbuffer is filled up completely a flush is triggered and the frontbuffer becomes available for writing again. Messages larger than BatchSizeMaxKB are rejected.

BatchSizeByte defines the number of bytes to be buffered before they are written to disk. By default this is set to 8KB.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5..

FlushTimeoutSec sets the maximum number of seconds to wait before a flush is aborted during shutdown. By default this is set to 0, which does not abort the flushing procedure.

Rotate if set to true the logs will rotate after reaching certain thresholds.

RotateTimeoutMin defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this is set to 1440 (i.e. 1 Day).

RotateAt defines specific timestamp as in "HH:MM" when the log should be rotated. Hours must be given in 24h format. When left empty this setting is ignored. By default this setting is disabled.

RotateTimestamp sets the timestamp added to the filename when file rotation is enabled. The format is based on Go's time.Format function and set to "2006-01-02_15" by default.

Compress defines if a rotated logfile is to be gzip compressed or not. By default this is set to false.

func (*File) Configure

func (prod *File) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*File) Produce

func (prod *File) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is dumped to a file.

type HttpReq

type HttpReq struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

HttpReq producer plugin Configuration example

  • "producer.HttpReq": Enable: true Address: ":80"

The HttpReq producers sends messages that already are valid http request to a

given webserver.

Address defines the webserver to send http requests to. Set to ":80", which is equal to "localhost:80" by default.

func (*HttpReq) Configure

func (prod *HttpReq) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (HttpReq) Produce

func (prod HttpReq) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Kafka

type Kafka struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Kafka producer plugin Configuration example

  • "producer.Kafka": Enable: true ClientId: "weblog" Partitioner: "Roundrobin" RequiredAcks: 0 TimeoutMs: 0 SendRetries: 5 Compression: "Snappy" MaxOpenRequests: 6 BatchMinCount: 10 BatchMaxCount: 0 BatchSizeByte: 16384 BatchSizeMaxKB: 524288 BatchTimeoutSec: 5 ServerTimeoutSec: 3 SendTimeoutMs: 100 ElectRetries: 3 ElectTimeoutMs: 1000 MetadataRefreshSec: 30 Servers:
  • "192.168.222.30:9092" Stream:
  • "console"
  • "_GOLLUM_" Topic: "console" : "default" "_GOLLUM_" : "default"

The kafka producer writes messages to a kafka cluster. This producer is backed by the sarama library so most settings relate to that library.

ClientId sets the client id of this producer. By default this is "gollum".

Partitioner sets the distribution algorithm to use. Valid values are: "Random","Roundrobin" and "Hash". By default "Hash" is set.

RequiredAcks defines the acknowledgement level required by the broker. 0 = No responses required. 1 = wait for the local commit. -1 = wait for all replicas to commit. >1 = wait for a specific number of commits. By default this is set to 1.

TimeoutMs denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this is set to 1500.

SendRetries defines how many times to retry sending data before marking a server as not reachable. By default this is set to 3.

Compression sets the method of compression to use. Valid values are: "None","Zip" and "Snappy". By default "None" is set.

MaxOpenRequests defines the number of simultanious connections are allowed. By default this is set to 5.

BatchMinCount sets the minimum number of messages required to trigger a flush. By default this is set to 1.

BatchMaxCount defines the maximum number of messages processed per request. By default this is set to 0 for "unlimited".

BatchSizeByte sets the mimimum number of bytes to collect before a new flush is triggered. By default this is set to 8192.

BatchSizeMaxKB defines the maximum allowed message size. By default this is set to 1 MB.

BatchTimeoutSec sets the minimum time in seconds to pass after wich a new flush will be triggered. By default this is set to 3.

MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 256.

ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.

SendTimeoutMs defines the number of milliseconds to wait for a server to resond before triggering a timeout. Defaults to 250.

ElectRetries defines how many times to retry during a leader election. By default this is set to 3.

ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.

MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 10000. This corresponds to the JVM setting `topic.metadata.refresh.interval.ms`.

Servers contains the list of all kafka servers to connect to. This setting is mandatory and has no defaults.

Topic maps a stream to a specific kafka topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). If no topic mappings are set the stream names will be used as topic.

func (*Kafka) Configure

func (prod *Kafka) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Kafka) Produce

func (prod *Kafka) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type Null

type Null struct {
	// contains filtered or unexported fields
}

Null producer plugin Configuration example

  • "producer.Null": Enable: true

This producer does nothing and provides only bare-bone configuration (i.e. enabled and streams). Use this producer to test consumer performance.

func (*Null) Configure

func (prod *Null) Configure(conf core.PluginConfig) error

Configure initializes the basic members

func (*Null) Control

func (prod *Null) Control() chan<- core.PluginControl

Control returns write access to this producer's control channel.

func (*Null) Enqueue

func (prod *Null) Enqueue(msg core.Message)

Enqueue simply ignores the message

func (*Null) Produce

func (prod *Null) Produce(threads *sync.WaitGroup)

Produce writes to a buffer that is dumped to a file.

func (*Null) Streams

func (prod *Null) Streams() []core.MessageStreamID

Streams returns the streams this producer is listening to.

type Proxy

type Proxy struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Proxy producer plugin Configuration example

  • "producer.Proxy": Enable: true Address: "127.0.0.1:8080" ConnectionBufferSizeKB: 4096 TimeoutSec: 3 Partitioner: "ascii" Delimiter: ":" Offset: 1

This producer is compatible to consumer.proxy. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms.

Address stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.Proxy". By default this is set to ":5880".

ConnectionBufferSizeKB sets the connection buffer size in KB. This also defines the size of the buffer used by the message parser. By default this is set to 1024, i.e. 1 MB buffer.

TimeoutSec defines the maximum time in seconds a client is allowed to take for a response. By default this is set to 1.

Partitioner defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this is set to "delimiter".

  • "delimiter" separates messages by looking for a delimiter string. The delimiter is included into the left hand message.
  • "ascii" reads an ASCII encoded number at a given offset until a given delimiter is found.
  • "binary" reads a binary number at a given offset and size
  • "binary_le" is an alias for "binary"
  • "binary_be" is the same as "binary" but uses big endian encoding
  • "fixed" assumes fixed size messages

Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to "\n".

Offset defines the offset used by the binary and text paritioner. By default this is set to 0. This setting is ignored by the fixed partitioner.

Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.

func (*Proxy) Configure

func (prod *Proxy) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Proxy) Produce

func (prod *Proxy) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given Proxy.

type Redis

type Redis struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Redis producer plugin Configuration example

  • "producer.Redis": Enable: true Address: "127.0.0.1:6379" Database: 0 Key: "gollum" Storage: "hash" FieldFormat: "format.Identifier" FieldFromParsed: true

Address stores the identifier to connect to. This can either be any ip address and port like "localhost:6379" or a file like "unix:///var/redis.socket". By default this is set to ":6379".

Database defines the redis database to connect to. By default this is set to 0.

Key defines the redis key to store the values in. By default this is set to "default".

Storage defines the type of the storage to use. Valid values are: "hash", "list", "set", "sortedset", "string". By default this is set to "hash".

FieldFormat defines an extra formatter used to define an additional field or score value if required by the storage type. If no field value is required this value is ignored. By default this is set to "format.Identifier".

FieldAfterFormat will send the formatted message to the FieldFormatter if set to true. If this is set to false the message will be send to the FieldFormatter before it has been formatted. By default this is set to false.

func (*Redis) Configure

func (prod *Redis) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Redis) Produce

func (prod *Redis) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Scribe

type Scribe struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Scribe producer plugin Configuration example

  • "producer.Scribe": Enable: true Address: "192.168.222.30:1463" ConnectionBufferSizeKB: 4096 BatchSizeMaxKB: 16384 BatchSizeByte: 4096 BatchTimeoutSec: 2 Stream:
  • "console"
  • "_GOLLUM_" Category: "console" : "default" "_GOLLUM_" : "default"

The scribe producer allows sending messages to Facebook's scribe.

Address defines the host and port to connect to. By default this is set to "localhost:1463".

ConnectionBufferSizeKB sets the connection buffer size in KB. By default this is set to 1024, i.e. 1 MB buffer.

BatchSizeMaxKB defines the maximum number of bytes to buffer before messages get dropped. If a message crosses the threshold it is still buffered but additional messages will be dropped. By default this is set to 8192.

BatchSizeByte defines the number of bytes to be buffered before they are written to scribe. By default this is set to 8KB.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.

Category maps a stream to a specific scribe category. You can define the wildcard stream (*) here, too. When set, all streams that do not have a specific mapping will go to this category (including _GOLLUM_). If no category mappings are set the stream name is used.

func (*Scribe) Configure

func (prod *Scribe) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Scribe) Produce

func (prod *Scribe) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to scribe.

type Socket

type Socket struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Socket producer plugin Configuration example

  • "producer.Socket": Enable: true Address: "unix:///var/gollum.socket" ConnectionBufferSizeKB: 4096 BatchSizeMaxKB: 16384 BatchSizeByte: 4096 BatchTimeoutSec: 5 Acknowledge: "ACK\n"

Address stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to ":5880".

ConnectionBufferSizeKB sets the connection buffer size in KB. By default this is set to 1024, i.e. 1 MB buffer.

BatchSizeMaxKB defines the maximum number of bytes to buffer before messages get dropped. Any message that crosses the threshold is dropped. By default this is set to 8192.

BatchSizeByte defines the number of bytes to be buffered before they are written to scribe. By default this is set to 8KB.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.

Acknowledge can be set to a non-empty value to expect the given string as a response from the server after a batch has been sent. This setting is disabled by default, i.e. set to "". If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used.

func (*Socket) Configure

func (prod *Socket) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Socket) Produce

func (prod *Socket) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type Websocket

type Websocket struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Websocket producer plugin Configuration example

  • "producer.Websocket": Enable: true Address: ":80" Path: "/data" ReadTimeoutSec: 5

The websocket producer opens up a websocket.

Address stores the identifier to bind to. This is allowed be any ip address/dns and port like "localhost:5880". By default this is set to ":81".

Path defines the url path to listen for. By default this is set to "/"

ReadTimeoutSec specifies the maximum duration in seconds before timing out read of the request. By default this is set to 3 seconds.

func (*Websocket) Configure

func (prod *Websocket) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Websocket) Produce

func (prod *Websocket) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL