producer

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2016 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Benchmark added in v0.4.3

type Benchmark struct {
	core.ProducerBase
}

Benchmark producer plugin This producer is similar to producer.Null but will use the standard buffer mechanism before discarding messages. This producer is used for benchmarking the core system. If you require a /dev/null style producer you should prefer producer.Null instead as it is way more performant. Configuration example

  • "producer.Benchmark":

func (*Benchmark) Configure added in v0.4.3

func (prod *Benchmark) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Benchmark) Produce added in v0.4.3

func (prod *Benchmark) Produce(workers *sync.WaitGroup)

Produce discards the message.

type Console

type Console struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Console producer plugin The console producer writes messages to the standard output streams. This producer does not implement a fuse breaker. Configuration example

  • "producer.Console": Console: "stdout"

Console may either be "stdout" or "stderr". By default it is set to "stdout".

func (*Console) Configure

func (prod *Console) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Console) Produce

func (prod *Console) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type ElasticSearch

type ElasticSearch struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

ElasticSearch producer plugin The ElasticSearch producer sends messages to elastic search using the bulk http API. This producer uses a fuse breaker when cluster health reports a "red" status or the connection is down. Configuration example

  • "producer.ElasticSearch": Connections: 6 RetrySec: 5 TTL: "" DayBasedIndex: false User: "" Password: "" BatchSizeByte: 32768 BatchMaxCount: 256 BatchTimeoutSec: 5 Port: 9200 Servers:
  • "localhost" Index: "console" : "console" "_GOLLUM_" : "_GOLLUM_" Type: "console" : "console" "_GOLLUM_" : "_GOLLUM_"

RetrySec denotes the time in seconds after which a failed dataset will be transmitted again. By default this is set to 5.

Connections defines the number of simultaneous connections allowed to a elasticsearch server. This is set to 6 by default.

TTL defines the TTL set in elasticsearch messages. By default this is set to "" which means no TTL.

DayBasedIndex can be set to true to append the date of the message to the index as in "<index>_YYYY-MM-DD". By default this is set to false.

Servers defines a list of servers to connect to. The first server in the list is used as the server passed to the "Domain" setting. The Domain setting can be overwritten, too.

Port defines the elasticsearch port, wich has to be the same for all servers. By default this is set to 9200.

User and Password can be used to pass credentials to the elasticsearch server. By default both settings are empty.

Index maps a stream to a specific index. You can define the wildcard stream (*) here, too. If set all streams that do not have a specific mapping will go to this stream (including _GOLLUM_). If no category mappings are set the stream name is used.

Type maps a stream to a specific type. This behaves like the index map and is used to assign a _type to an elasticsearch message. By default the type "log" is used.

BatchSizeByte defines the size in bytes required to trigger a flush. By default this is set to 32768 (32KB).

BatchMaxCount defines the number of documents required to trigger a flush. By default this is set to 256.

BatchTimeoutSec defines the time in seconds after which a flush will be triggered. By default this is set to 5.

func (*ElasticSearch) Configure

func (prod *ElasticSearch) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*ElasticSearch) Produce

func (prod *ElasticSearch) Produce(workers *sync.WaitGroup)

Produce starts a bluk indexer

type File

type File struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

File producer plugin The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary. This producer does not implement a fuse breaker. Configuration example

  • "producer.File": File: "/var/log/gollum.log" FileOverwrite: false Permissions: "0664" FolderPermissions: "0755" BatchMaxCount: 8192 BatchFlushCount: 4096 BatchTimeoutSec: 5 FlushTimeoutSec: 0 Rotate: false RotateTimeoutMin: 1440 RotateSizeMB: 1024 RotateAt: "" RotateTimestamp: "2006-01-02_15" RotatePruneCount: 0 RotatePruneAfterHours: 0 RotatePruneTotalSizeMB: 0 Compress: false

File contains the path to the log file to write. The wildcard character "*" can be used as a placeholder for the stream name. By default this is set to /var/log/gollum.log.

FileOverwrite enables files to be overwritten instead of appending new data to it. This is set to false by default.

Permissions accepts an octal number string that contains the unix file permissions used when creating a file. By default this is set to "0664".

FolderPermissions accepts an octal number string that contains the unix file permissions used when creating a folder. By default this is set to "0755".

BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.

BatchFlushCount defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.

FlushTimeoutSec sets the maximum number of seconds to wait before a flush is aborted during shutdown. By default this is set to 0, which does not abort the flushing procedure.

Rotate if set to true the logs will rotate after reaching certain thresholds. By default this is set to false.

RotateTimeoutMin defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this is set to 1440 (i.e. 1 Day).

RotateAt defines specific timestamp as in "HH:MM" when the log should be rotated. Hours must be given in 24h format. When left empty this setting is ignored. By default this setting is disabled.

RotateSizeMB defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this is set to 1024.

RotateTimestamp sets the timestamp added to the filename when file rotation is enabled. The format is based on Go's time.Format function and set to "2006-01-02_15" by default.

RotatePruneCount removes old logfiles upon rotate so that only the given number of logfiles remain. Logfiles are located by the name defined by "File" and are pruned by date (followed by name). By default this is set to 0 which disables pruning.

RotatePruneAfterHours removes old logfiles that are older than a given number of hours. By default this is set to 0 which disables pruning.

RotatePruneTotalSizeMB removes old logfiles upon rotate so that only the given number of MBs are used by logfiles. Logfiles are located by the name defined by "File" and are pruned by date (followed by name). By default this is set to 0 which disables pruning.

RotateZeroPadding sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won't add zeros, every other number defines the number of leading zeros to be used. By default this is set to 0.

Compress defines if a rotated logfile is to be gzip compressed or not. By default this is set to false.

func (*File) Configure

func (prod *File) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*File) Produce

func (prod *File) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is dumped to a file.

type HTTPRequest added in v0.4.0

type HTTPRequest struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

HTTPRequest producer plugin The HTTPRequest producers sends messages as HTTP packet to a given webserver. This producer uses a fuse breaker when a request fails with an error code > 400 or the connection is down. Configuration example

  • "producer.HTTPRequest": RawData: true Encoding: "text/plain; charset=utf-8" Address: "localhost:80"

Address defines the webserver to send http requests to. Set to "localhost:80" by default.

RawData switches between creating POST data from the incoming message (false) and passing the message as HTTP request without changes (true). This setting is enabled by default.

Encoding defines the payload encoding when RawData is set to false. Set to "text/plain; charset=utf-8" by default.

func (*HTTPRequest) Configure added in v0.4.0

func (prod *HTTPRequest) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*HTTPRequest) Produce added in v0.4.0

func (prod *HTTPRequest) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type InfluxDB added in v0.4.0

type InfluxDB struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

InfluxDB producer plugin This producer writes data to an influxDB cluster. The data is expected to be of a valid influxDB format. As the data format changed between influxDB versions it is advisable to use a formatter for the specific influxDB version you want to write to. There are collectd to influxDB formatters available that can be used (as an example). This producer uses a fuse breaker if the connection to the influxDB cluster is lost. Configuration example

  • "producer.InfluxDB": Host: "localhost:8086" User: "" Password: "" Database: "default" TimeBasedName: true UseVersion08: false Version: 100 RetentionPolicy: "" BatchMaxCount: 8192 BatchFlushCount: 4096 BatchTimeoutSec: 5

Host defines the host (and port) of the InfluxDB server. Defaults to "localhost:8086".

User defines the InfluxDB username to use to login. If this name is left empty credentials are assumed to be disabled. Defaults to empty.

Password defines the user's password. Defaults to empty.

Database sets the InfluxDB database to write to. By default this is is set to "default".

TimeBasedName enables using time.Format based formatting of databse names. I.e. you can use something like "metrics-2006-01-02" to switch databases for each day. This setting is enabled by default.

RetentionPolicy correlates to the InfluxDB retention policy setting. This is left empty by default (no retention policy used)

UseVersion08 has to be set to true when writing data to InfluxDB 0.8.x. By default this is set to false. DEPRECATED. Use Version instead.

Version defines the InfluxDB version to use as in Mmp (Major, minor, patch). For version 0.8.x use 80, for version 0.9.0 use 90, for version 1.0.0 use use 100 and so on. Defaults to 100.

BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.

BatchFlushCount defines the number of messages to be buffered before they are written to InfluxDB. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.

func (*InfluxDB) Configure added in v0.4.0

func (prod *InfluxDB) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*InfluxDB) Produce added in v0.4.0

func (prod *InfluxDB) Produce(workers *sync.WaitGroup)

Produce starts a bulk producer which will collect datapoints until either the buffer is full or a timeout has been reached. The buffer limit does not describe the number of messages received from kafka but the size of the buffer content in KB.

type Kafka

type Kafka struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Kafka producer plugin The kafka producer writes messages to a kafka cluster. This producer is backed by the sarama library so most settings relate to that library. This producer uses a fuse breaker if any connection reports an error. Configuration example

  • "producer.Kafka": ClientId: "weblog" Partitioner: "Roundrobin" RequiredAcks: 1 TimeoutMs: 1500 GracePeriodMs: 10 SendRetries: 0 Compression: "None" MaxOpenRequests: 5 MessageBufferCount: 256 BatchMinCount: 1 BatchMaxCount: 0 BatchSizeByte: 8192 BatchSizeMaxKB: 1024 BatchTimeoutMs: 3000 ServerTimeoutSec: 30 SendTimeoutMs: 250 ElectRetries: 3 ElectTimeoutMs: 250 MetadataRefreshMs: 10000 Servers:
  • "localhost:9092" Topic: "console" : "console"

ClientId sets the client id of this producer. By default this is "gollum".

Partitioner sets the distribution algorithm to use. Valid values are: "Random","Roundrobin" and "Hash". By default "Roundrobin" is set.

KeyFormatter can define a formatter that extracts the key for a kafka message from the message payload. By default this is an empty string, which disables this feature. A good formatter for this can be format.Identifier.

RequiredAcks defines the acknowledgment level required by the broker. 0 = No responses required. 1 = wait for the local commit. -1 = wait for all replicas to commit. >1 = wait for a specific number of commits. By default this is set to 1.

TimeoutMs denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this is set to 10 seconds.

SendRetries defines how many times to retry sending data before marking a server as not reachable. By default this is set to 0.

Compression sets the method of compression to use. Valid values are: "None","Zip" and "Snappy". By default "None" is set.

MaxOpenRequests defines the number of simultanious connections are allowed. By default this is set to 5.

BatchMinCount sets the minimum number of messages required to trigger a flush. By default this is set to 1.

BatchMaxCount defines the maximum number of messages processed per request. By default this is set to 0 for "unlimited".

BatchSizeByte sets the minimum number of bytes to collect before a new flush is triggered. By default this is set to 8192.

BatchSizeMaxKB defines the maximum allowed message size. By default this is set to 1024.

BatchTimeoutMs sets the minimum time in milliseconds to pass after wich a new flush will be triggered. By default this is set to 3.

MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.

ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.

SendTimeoutMs defines the number of milliseconds to wait for a server to resond before triggering a timeout. Defaults to 250.

ElectRetries defines how many times to retry during a leader election. By default this is set to 3.

ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.

GracePeriodMs defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is dropped. By default this is set to 100ms.

MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 600000 (10 minutes). This corresponds to the JVM setting `topic.metadata.refresh.interval.ms`.

Servers contains the list of all kafka servers to connect to. By default this is set to contain only "localhost:9092".

Topic maps a stream to a specific kafka topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). If no topic mappings are set the stream names will be used as topic.

func (*Kafka) Configure

func (prod *Kafka) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Kafka) Produce

func (prod *Kafka) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type Kinesis added in v0.4.2

type Kinesis struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Kinesis producer plugin This producer sends data to an AWS kinesis stream. Configuration example

  • "producer.Kinesis": Region: "eu-west-1" Endpoint: "kinesis.eu-west-1.amazonaws.com" CredentialType: "none" CredentialId: "" CredentialToken: "" CredentialSecret: "" CredentialFile: "" CredentialProfile: "" BatchMaxMessages: 500 SendTimeframeSec: 1 BatchTimeoutSec: 3 StreamMapping: "*" : "default"

KinesisStream defines the stream to read from. By default this is set to "default"

Region defines the amazon region of your kinesis stream. By default this is set to "eu-west-1".

Endpoint defines the amazon endpoint for your kinesis stream. By default this is et to "kinesis.eu-west-1.amazonaws.com"

CredentialType defines the credentials that are to be used when connectiong to kensis. This can be one of the following: environment, static, shared, none. Static enables the parameters CredentialId, CredentialToken and CredentialSecret shared enables the parameters CredentialFile and CredentialProfile. None will not use any credentials and environment will pull the credentials from environmental settings. By default this is set to none.

BatchMaxMessages defines the maximum number of messages to send per batch. By default this is set to 500.

SendTimeframeMs defines the timeframe in milliseconds in which a second batch send can be triggered. By default this is set to 1000, i.e. one send operation per second.

BatchTimeoutSec defines the number of seconds after which a batch is flushed automatically. By default this is set to 3.

StreamMapping defines a translation from gollum stream to kinesis stream name. If no mapping is given the gollum stream name is used as kinesis stream name.

func (*Kinesis) Configure added in v0.4.2

func (prod *Kinesis) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Kinesis) Produce added in v0.4.2

func (prod *Kinesis) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Null

type Null struct {
	// contains filtered or unexported fields
}

Null producer plugin This producer does nothing and provides only bare-bone configuration (i.e. enabled and streams). Use this producer to test consumer performance. This producer does not implement a fuse breaker.

func (*Null) AddDependency added in v0.4.0

func (prod *Null) AddDependency(dep core.Producer)

AddDependency is an empty call because null does not support them

func (*Null) Configure

func (prod *Null) Configure(conf core.PluginConfig) error

Configure initializes the basic members

func (*Null) Control

func (prod *Null) Control() chan<- core.PluginControl

Control returns write access to this producer's control channel.

func (*Null) DependsOn added in v0.4.0

func (prod *Null) DependsOn(dep core.Producer) bool

DependsOn always returns false

func (*Null) Enqueue

func (prod *Null) Enqueue(msg core.Message, timeout *time.Duration)

Enqueue simply ignores the message

func (*Null) GetDropStreamID added in v0.4.0

func (prod *Null) GetDropStreamID() core.MessageStreamID

GetDropStreamID returns the id of the stream to drop messages to.

func (*Null) GetState added in v0.4.0

func (prod *Null) GetState() core.PluginState

GetState always returns PluginStateActive

func (*Null) IsActive added in v0.4.0

func (prod *Null) IsActive() bool

IsActive always returns true

func (*Null) IsBlocked added in v0.4.0

func (prod *Null) IsBlocked() bool

IsBlocked always returns false

func (*Null) Produce

func (prod *Null) Produce(threads *sync.WaitGroup)

Produce writes to a buffer that is dumped to a file.

func (*Null) Streams

func (prod *Null) Streams() []core.MessageStreamID

Streams returns the streams this producer is listening to.

type Proxy

type Proxy struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Proxy producer plugin This producer is compatible to consumer.proxy. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms. This producer does not implement a fuse breaker. Configuration example

  • "producer.Proxy": Address: ":5880" ConnectionBufferSizeKB: 1024 TimeoutSec: 1 Partitioner: "delimiter" Delimiter: "\n" Offset: 0 Size: 1

Address stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.Proxy". By default this is set to ":5880".

ConnectionBufferSizeKB sets the connection buffer size in KB. This also defines the size of the buffer used by the message parser. By default this is set to 1024, i.e. 1 MB buffer.

TimeoutSec defines the maximum time in seconds a client is allowed to take for a response. By default this is set to 1.

Partitioner defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this is set to "delimiter".

  • "delimiter" separates messages by looking for a delimiter string. The delimiter is included into the left hand message.
  • "ascii" reads an ASCII encoded number at a given offset until a given delimiter is found.
  • "binary" reads a binary number at a given offset and size
  • "binary_le" is an alias for "binary"
  • "binary_be" is the same as "binary" but uses big endian encoding
  • "fixed" assumes fixed size messages

Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to "\n".

Offset defines the offset used by the binary and text partitioner. By default this is set to 0. This setting is ignored by the fixed partitioner.

Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.

func (*Proxy) Configure

func (prod *Proxy) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Proxy) Produce

func (prod *Proxy) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given Proxy.

type Redis

type Redis struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Redis producer plugin This producer sends data to a redis server. Different redis storage types and database indexes are supported. This producer does not implement support for redis 3.0 cluster. Configuration example

  • "producer.Redis": Address: ":6379" Database: 0 Key: "default" Storage: "hash" FieldFormat: "format.Identifier" FieldAfterFormat: false

Address stores the identifier to connect to. This can either be any ip address and port like "localhost:6379" or a file like "unix:///var/redis.socket". By default this is set to ":6379". This producer does not implement a fuse breaker.

Database defines the redis database to connect to. By default this is set to 0.

Key defines the redis key to store the values in. By default this is set to "default".

Storage defines the type of the storage to use. Valid values are: "hash", "list", "set", "sortedset", "string". By default this is set to "hash".

FieldFormat defines an extra formatter used to define an additional field or score value if required by the storage type. If no field value is required this value is ignored. By default this is set to "format.Identifier".

FieldAfterFormat will send the formatted message to the FieldFormatter if set to true. If this is set to false the message will be send to the FieldFormatter before it has been formatted. By default this is set to false.

func (*Redis) Configure

func (prod *Redis) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Redis) Produce

func (prod *Redis) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Scribe

type Scribe struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Scribe producer plugin The scribe producer allows sending messages to Facebook's scribe. This producer uses a fuse breaker if the connection to the scribe server is lost. Configuration example

  • "producer.Scribe": Address: "localhost:1463" ConnectionBufferSizeKB: 1024 BatchMaxCount: 8192 BatchFlushCount: 4096 BatchTimeoutSec: 5 HeartBeatIntervalSec: 5 Category: "console" : "console" "_GOLLUM_" : "_GOLLUM_"

Address defines the host and port to connect to. By default this is set to "localhost:1463".

ConnectionBufferSizeKB sets the connection buffer size in KB. By default this is set to 1024, i.e. 1 MB buffer.

BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.

BatchFlushCount defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5. This also defines the maximum time allowed for messages to be sent to the server.

HeartBeatIntervalSec defines the interval used to query scribe for status updates. By default this is set to 5sec.

Category maps a stream to a specific scribe category. You can define the wildcard stream (*) here, too. When set, all streams that do not have a specific mapping will go to this category (including _GOLLUM_). If no category mappings are set the stream name is used.

func (*Scribe) Configure

func (prod *Scribe) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Scribe) Produce

func (prod *Scribe) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to scribe.

type Socket

type Socket struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Socket producer plugin The socket producer connects to a service over a TCP, UDP or unix domain socket based connection. This producer uses a fuse breaker when the service to connect to goes down. Configuration example

  • "producer.Socket": Enable: true Address: ":5880" ConnectionBufferSizeKB: 1024 BatchMaxCount: 8192 BatchFlushCount: 4096 BatchTimeoutSec: 5 Acknowledge: ""

Address stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to ":5880".

ConnectionBufferSizeKB sets the connection buffer size in KB. By default this is set to 1024, i.e. 1 MB buffer.

BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.

BatchFlushCount defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.

Acknowledge can be set to a non-empty value to expect the given string as a response from the server after a batch has been sent. This setting is disabled by default, i.e. set to "". If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used.

AckTimeoutMs defines the time in milliseconds to wait for a response from the server. After this timeout the send is marked as failed. Defaults to 2000.

func (*Socket) Configure

func (prod *Socket) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Socket) Produce

func (prod *Socket) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type Spooling added in v0.4.0

type Spooling struct {
	core.ProducerBase

	RespoolDuration time.Duration
	// contains filtered or unexported fields
}

Spooling producer plugin The Spooling producer buffers messages and sends them again to the previous stream stored in the message. This means the message must have been routed at least once before reaching the spooling producer. If the previous and current stream is identical the message is dropped. The Formatter configuration value is forced to "format.Serialize" and cannot be changed. This producer does not implement a fuse breaker. Configuration example

  • "producer.Spooling": Path: "/var/run/gollum/spooling" BatchMaxCount: 100 BatchTimeoutSec: 5 MaxFileSizeMB: 512 MaxFileAgeMin: 1 MessageSizeByte: 8192 RespoolDelaySec: 10 MaxMessagesSec: 100

Path sets the output directory for spooling files. Spooling files will Files will be stored as "<path>/<stream>/<number>.spl". By default this is set to "/var/run/gollum/spooling".

BatchMaxCount defines the maximum number of messages stored in memory before a write to file is triggered. Set to 100 by default.

BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.

MaxFileSizeMB sets the size in MB when a spooling file is rotated. Reading will start only after a file is rotated. Set to 512 MB by default.

MaxFileAgeMin defines the time in minutes after a spooling file is rotated. Reading will start only after a file is rotated. This setting divided by two will be used to define the wait time for reading, too. Set to 1 minute by default.

BufferSizeByte defines the initial size of the buffer that is used to parse messages from a spool file. If a message is larger than this size, the buffer will be resized. By default this is set to 8192.

RespoolDelaySec sets the number of seconds to wait before trying to load existing spool files after a restart. This is useful for configurations that contain dynamic streams. By default this is set to 10.

MaxMessagesSec sets the maximum number of messages that can be respooled per second. By default this is set to 100. Setting this value to 0 will cause respooling to work as fast as possible.

func (*Spooling) Configure added in v0.4.0

func (prod *Spooling) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Spooling) Produce added in v0.4.0

func (prod *Spooling) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Websocket

type Websocket struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

Websocket producer plugin The websocket producer opens up a websocket. This producer does not implement a fuse breaker. Configuration example

  • "producer.Websocket": Address: ":81" Path: "/" ReadTimeoutSec: 3

Address defines the host and port to bind to. This is allowed be any ip address/dns and port like "localhost:5880". By default this is set to ":81".

Path defines the url path to listen for. By default this is set to "/"

ReadTimeoutSec specifies the maximum duration in seconds before timing out read of the request. By default this is set to 3 seconds.

func (*Websocket) Configure

func (prod *Websocket) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*Websocket) Produce

func (prod *Websocket) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL