producer

package
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2019 License: Apache-2.0 Imports: 51 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMurmur2HashPartitioner added in v0.4.5

func NewMurmur2HashPartitioner(topic string) kafka.Partitioner

NewMurmur2HashPartitioner creates a new sarama partitioner based on the murmur2 hash algorithm.

Types

type AwsCloudwatchLogs added in v0.5.2

type AwsCloudwatchLogs struct {
	AwsMultiClient       components.AwsMultiClient `gollumdoc:"embed_type"`
	core.BatchedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

AwsCloudwatchLogs producer

The AwsCloudwatchLogs producer plugin sends messages to AWS Cloudwatch Logs service. Credentials are obtained by gollum automaticly.

Patameters

- LogStream: Stream name in cloudwatch logs.

- LogGroup: Group name in cloudwatch logs.

- Region: Amazon region into which stream logs to. Defaults to "eu-west-1".

Examples

This configuration sends messages to stream stream_name and group group_name with shared credentials.

CwLogs:

Type: AwsCloudwatchLogs:
    LogStream: stream_name
    LogGroup: group_name
Credential:
    Type: shared

func (*AwsCloudwatchLogs) Configure added in v0.5.2

func (prod *AwsCloudwatchLogs) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*AwsCloudwatchLogs) Produce added in v0.5.2

func (prod *AwsCloudwatchLogs) Produce(workers *sync.WaitGroup)

Produce starts the producer

type AwsFirehose added in v0.5.0

type AwsFirehose struct {
	core.BatchedProducer `gollumdoc:"embed_type"`

	// AwsMultiClient is public to make AwsMultiClient.Configure() callable (bug in treflect package)
	AwsMultiClient components.AwsMultiClient `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

AwsFirehose producer plugin

This producer sends data to an AWS Firehose stream.

Parameters

- StreamMapping: This value defines a translation from gollum stream names to firehose stream names. If no mapping is given, the gollum stream name is used as the firehose stream name. By default this parameter is set to "empty"

- RecordMaxMessages: This value defines the number of messages to send in one record to aws firehose. By default this parameter is set to "1".

- RecordMessageDelimiter: This value defines the delimiter string to use between messages within a firehose record. By default this parameter is set to "\n".

- SendTimeframeMs: This value defines the timeframe in milliseconds in which a second batch send can be triggered. By default this parameter is set to "1000".

Examples

This example set up a simple aws firehose producer:

firehoseOut:
  Type: producer.AwsFirehose
  Streams: "*"
  StreamMapping:
    "*": default
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: eu-west-1
  RecordMaxMessages: 1
  RecordMessageDelimiter: "\n"
  SendTimeframeSec: 1

func (*AwsFirehose) Configure added in v0.5.0

func (prod *AwsFirehose) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*AwsFirehose) Produce added in v0.5.0

func (prod *AwsFirehose) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type AwsKinesis added in v0.5.0

type AwsKinesis struct {
	core.BatchedProducer `gollumdoc:"embed_type"`

	// AwsMultiClient is public to make AwsMultiClient.Configure() callable (bug in treflect package)
	AwsMultiClient components.AwsMultiClient `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

AwsKinesis producer plugin

This producer sends data to an AWS kinesis stream. Configuration example

Parameters

- StreamMapping: This value defines a translation from gollum stream names to kinesis stream names. If no mapping is given the gollum stream name is used as the kinesis stream name. By default this parameter is set to "empty"

- RecordMaxMessages: This value defines the maximum number of messages to join into a kinesis record. By default this parameter is set to "500".

- RecordMessageDelimiter: This value defines the delimiter string to use between messages within a kinesis record. By default this parameter is set to "\n".

- SendTimeframeMs: This value defines the timeframe in milliseconds in which a second batch send can be triggered. By default this parameter is set to "1000".

Examples

This example set up a simple aws Kinesis producer:

KinesisOut:
  Type: producer.AwsKinesis
  Streams: "*"
  StreamMapping:
    "*": default
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: eu-west-1
  RecordMaxMessages: 1
  RecordMessageDelimiter: "\n"
  SendTimeframeSec: 1

func (*AwsKinesis) Configure added in v0.5.0

func (prod *AwsKinesis) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*AwsKinesis) Produce added in v0.5.0

func (prod *AwsKinesis) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type AwsS3 added in v0.5.0

type AwsS3 struct {
	core.DirectProducer `gollumdoc:"embed_type"`

	// Rotate is public to make Pruner.Configure() callable (bug in treflect package)
	// AwsMultiClient is public to make AwsMultiClient.Configure() callable (bug in treflect package)
	// BatchConfig is public to make BatchedWriterConfig.Configure() callable (bug in treflect package)
	Rotate         components.RotateConfig        `gollumdoc:"embed_type"`
	AwsMultiClient components.AwsMultiClient      `gollumdoc:"embed_type"`
	BatchConfig    components.BatchedWriterConfig `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

AwsS3 producer plugin

This producer sends messages to Amazon S3.

Each "file" uses a configurable batch and sends the content by a multipart upload to s3. This principle avoids temporary storage on disk.

Please keep in mind that Amazon S3 does not support appending to existing objects. Therefore rotation is mandatory in this producer.

Parameters

- Bucket: The S3 bucket to upload to

- File: This value is used as a template for final file names. The string " * " will replaced with the active stream name. By default this parameter is set to "gollum_*.log"

Examples

This example sends all received messages from all streams to S3, creating a separate file for each stream:

S3Out:
  Type: producer.AwsS3
  Streams: "*"
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: eu-west-1
  Bucket: gollum-s3-test
  Batch:
    TimeoutSec: 60
    MaxCount: 1000
    FlushCount: 500
    FlushTimeoutSec: 0
  Rotation:
    Timestamp: 2006-01-02T15:04:05.999999999Z07:00
    TimeoutMin: 1
    SizeMB: 20
  Modulators:
    - format.Envelope:
      Postfix: "\n"

func (*AwsS3) Configure added in v0.5.0

func (prod *AwsS3) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*AwsS3) Produce added in v0.5.0

func (prod *AwsS3) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is send to S3 as a multipart upload.

type Benchmark added in v0.4.3

type Benchmark struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
}

Benchmark producer

This producer is meant to provide more meaningful results in benchmark situations than producer.Null, as it is based on core.BufferedProducer.

Examples

benchmark:
  Type: producer.Benchmark
  Streams: "*"

func (*Benchmark) Produce added in v0.4.3

func (prod *Benchmark) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Console

type Console struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Console producer plugin

The console producer writes messages to standard output or standard error.

Parameters

- Console: Chooses the output device; either "stdout" or "stderr". By default this is set to "stdout".

Examples

StdErrPrinter:
  Type: producer.Console
  Streams: myerrorstream
  Console: stderr

func (*Console) Configure

func (prod *Console) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Console) Produce

func (prod *Console) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type ElasticSearch

type ElasticSearch struct {
	core.BatchedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

ElasticSearch producer plugin

The ElasticSearch producer sends messages to elastic search using the bulk http API. The producer expects a json payload.

Parameters

- Retry/Count: Set the amount of retries before a Elasticsearch request fail finally. By default this parameter is set to "3".

- Retry/TimeToWaitSec: This value denotes the time in seconds after which a failed dataset will be transmitted again. By default this parameter is set to "3".

- SetGzip: This value enables or disables gzip compression for Elasticsearch requests (disabled by default). This option is used one to one for the library package. See http://godoc.org/gopkg.in/olivere/elastic.v5#SetGzip By default this parameter is set to "false".

- Servers: This value defines a list of servers to connect to.

- User: This value used as the username for the elasticsearch server. By default this parameter is set to "".

- Password: This value used as the password for the elasticsearch server. By default this parameter is set to "".

- StreamProperties: This value defines the mapping and settings for each stream. As index use the stream name here.

- StreamProperties/<streamName>/Index: The value defines the Elasticsearch index used for the stream.

- StreamProperties/<streamName>/Type: This value defines the document type used for the stream.

- StreamProperties/<streamName>/TimeBasedIndex: This value can be set to "true" to append the date of the message to the index as in "<index>_<TimeBasedFormat>". NOTE: This setting incurs a performance penalty because it is necessary to check if an index exists for each message! By default this parameter is set to "false".

- StreamProperties/<streamName>/TimeBasedFormat: This value can be set to a valid go time format string to be used with DayBasedIndex. By default this parameter is set to "2006-01-02".

- StreamProperties/<streamName>/Mapping: This value is a map which is used for the document field mapping. As document type, the already defined type is reused for the field mapping. See https://www.elastic.co/guide/en/elasticsearch/reference/5.4/indices-create-index.html#mappings

- StreamProperties/<streamName>/Settings: This value is a map which is used for the index settings. See https://www.elastic.co/guide/en/elasticsearch/reference/5.4/indices-create-index.html#mappings

Examples

This example starts a simple twitter example producer for local running ElasticSearch:

producerElasticSearch:
  Type: producer.ElasticSearch
  Streams: tweets_stream
  SetGzip: true
  Servers:
    - http://127.0.0.1:9200
  StreamProperties:
    tweets_stream:
      Index: twitter
      DayBasedIndex: true
      Type: tweet
      Mapping:
        # index mapping for payload
        user: keyword
        message: text
      Settings:
        number_of_shards: 1
        number_of_replicas: 1

func (*ElasticSearch) Configure

func (prod *ElasticSearch) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*ElasticSearch) Produce

func (prod *ElasticSearch) Produce(workers *sync.WaitGroup)

Produce starts the producer

type File

type File struct {
	core.DirectProducer `gollumdoc:"embed_type"`

	// Rotate is public to make Pruner.Configure() callable (bug in treflect package)
	// Prune is public to make FileRotateConfig.Configure() callable (bug in treflect package)
	// BatchConfig is public to make BatchedWriterConfig.Configure() callable (bug in treflect package)
	Rotate      components.RotateConfig        `gollumdoc:"embed_type"`
	Pruner      file.Pruner                    `gollumdoc:"embed_type"`
	BatchConfig components.BatchedWriterConfig `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

File producer plugin

The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary.

Each target file will handled with separated batch processing.

Parameters

- File: This value contains the path to the log file to write. The wildcard character "*" can be used as a placeholder for the stream name. By default this parameter is set to "/var/log/gollum.log".

- FileOverwrite: This value causes the file to be overwritten instead of appending new data to it. By default this parameter is set to "false".

- Permissions: Defines the UNIX filesystem permissions used when creating the named file as an octal number. By default this paramater is set to "0664".

- FolderPermissions: Defines the UNIX filesystem permissions used when creating the folders as an octal number. By default this paramater is set to "0755".

Examples

This example will write the messages from all streams to `/tmp/gollum.log` after every 64 message or after 60sec:

fileOut:
  Type: producer.File
  Streams: "*"
  File: /tmp/gollum.log
  Batch:
    MaxCount: 128
    FlushCount: 64
    TimeoutSec: 60
    FlushTimeoutSec: 3

func (*File) Configure

func (prod *File) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*File) Produce

func (prod *File) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is dumped to a file.

type HTTPRequest added in v0.4.0

type HTTPRequest struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

HTTPRequest producer

The HTTPRequest producer sends messages as HTTP requests to a given webserver.

In RawData mode, incoming messages are expected to contain complete HTTP requests in "wire format", such as: ::

POST /foo/bar HTTP/1.0\n
Content-type: text/plain\n
Content-length: 24
\n
Dummy test\n
Request data\n

In this mode, the message's contents is parsed as an HTTP request and sent to the destination server (virtually) unchanged. If the message cannot be parsed as an HTTP request, an error is logged. Only the scheme, host and port components of the "Address" URL are used; any path and query parameters are ignored. The "Encoding" parameter is ignored.

If RawData mode is off, a POST request is made to the destination server for each incoming message, using the complete URL in "Address". The incoming message's contents are delivered in the POST request's body and Content-type is set to the value of "Encoding"

Parameters

- Address: defines the URL to send http requests to. If the value doesn't contain "://", it is prepended with "http://", so short forms like "localhost:8088" are accepted. The default value is "http://localhost:80".

- RawData: Turns "RawData" mode on. See the description above.

- Encoding: Defines the payload encoding when RawData is set to false.

Examples

HttpOut01:
  Type: producer.HTTPRequest
  Streams: http_01
  Address: "http://localhost:8099/test"
  RawData: true

func (*HTTPRequest) Configure added in v0.4.0

func (prod *HTTPRequest) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*HTTPRequest) Produce added in v0.4.0

func (prod *HTTPRequest) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type InfluxDB added in v0.4.0

type InfluxDB struct {
	core.BatchedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

InfluxDB producer

This producer writes data to an influxDB endpoint. Data is not converted to the correct influxDB format automatically. Proper formatting might be required.

Parameters

- Version: Defines the InfluxDB protocol version to use. This can either be 80-89 for 0.8.x, 90 for 0.9.0 or 91-100 for 0.9.1 or later. Be default this parameter is set to 100.

- Host: Defines the host (and port) of the InfluxDB master. Be default this parameter is set to "localhost:8086".

- User: Defines the InfluxDB username to use. If this is empty, credentials are not used. Be default this parameter is set to "".

- Password: Defines the InfluxDB password to use. Be default this parameter is set to "".

- Database: Sets the InfluxDB database to write to. Be default this parameter is set to "default".

- TimeBasedName: When set to true, the Database parameter is treated as a template for time.Format and the resulting string is used as the database name. You can e.g. use "default-2006-01-02" to switch databases each day. By default this parameter is set to "true".

- RetentionPolicy: Only available for Version 90. This setting defines the InfluxDB retention policy allowed with this protocol version. By default this parameter is set to "".

Examples

metricsToInflux:
  Type: producer.InfluxDB
  Streams: metrics
  Host: "influx01:8086"
  Database: "metrics"
  TimeBasedName: false
  Batch:
    MaxCount: 2000
    FlushCount: 100
    TimeoutSec: 5

func (*InfluxDB) Configure added in v0.4.0

func (prod *InfluxDB) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*InfluxDB) Produce added in v0.4.0

func (prod *InfluxDB) Produce(workers *sync.WaitGroup)

Produce starts a bulk producer which will collect datapoints until either the buffer is full or a timeout has been reached. The buffer limit does not describe the number of messages received from kafka but the size of the buffer content in KB.

type Kafka

type Kafka struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Kafka producer

This producer writes messages to a kafka cluster. This producer is backed by the sarama library (https://github.com/Shopify/sarama) so most settings directly relate to the settings of that library.

Parameters

- Servers: Defines a list of ideally all brokers in the cluster. At least one broker is required. By default this parameter is set to an empty list.

- Version: Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form "A.B" are allowed as well as "A.B.C" and "A.B.C.D". If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < "0.9", "0.9.0.1" will be used. By default this parameter is set to "0.8.2".

- Topics: Defines a stream to topic mapping. If a stream is not mapped the stream name is used as topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). By default this parameter is set to an empty list.

- ClientId: Sets the kafka client id used by this producer. By default this parameter is set to "gollum".

- Partitioner: Defines the distribution algorithm to use. Valid values are: Random, Roundrobin and Hash. By default this parameter is set to "Roundrobin".

- PartitionHasher: Defines the hash algorithm to use when Partitioner is set to "Hash". Accepted values are "fnv1-a" and "murmur2".

- KeyFrom: Defines the metadata field that contains the string to be used as the key passed to kafka. When set to an empty string no key is used. By default this parameter is set to "".

- Compression: Defines the compression algorithm to use. Possible values are "none", "zip" and "snappy". By default this parameter is set to "none".

- RequiredAcks: Defines the numbers of acknowledgements required until a message is marked as "sent". When set to -1 all replicas must acknowledge a message. By default this parameter is set to 1.

- TimeoutMs: Denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this parameter is set to 10000.

- GracePeriodMs: Defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is sent to the fallback. This setting mitigates a conceptual problem in the saram API which can lead to long blocking times during startup. By default this parameter is set to 100.

- MaxOpenRequests: Defines the maximum number of simultaneous connections opened to a single broker at a time. By default this parameter is set to 5.

- ServerTimeoutSec: Defines the time after which a connection is set to timed out. By default this parameter is set to 30.

- SendTimeoutMs: Defines the number of milliseconds to wait for a broker to before marking a message as timed out. By default this parameter is set to 250.

- SendRetries: Defines how many times a message should be send again before a broker is marked as not reachable. Please note that this setting should never be 0. See https://github.com/Shopify/sarama/issues/294. By default this parameter is set to 1.

- AllowNilValue: When enabled messages containing an empty or nil payload will not be rejected. By default this parameter is set to false.

- Batch/MinCount: Sets the minimum number of messages required to send a request. By default this parameter is set to 1.

- Batch/MaxCount: Defines the maximum number of messages bufferd before a request is sent. A value of 0 will remove this limit. By default this parameter is set to 0.

- Batch/MinSizeByte: Defines the minimum number of bytes to buffer before sending a request. By default this parameter is set to 8192.

- Batch/SizeMaxKB: Defines the maximum allowed message size in KB. Messages bigger than this limit will be rejected. By default this parameter is set to 1024.

- Batch/TimeoutMs: Defines the maximum time in milliseconds after which a new request will be sent, ignoring of Batch/MinCount and Batch/MinSizeByte By default this parameter is set to 3.

- ElectRetries: Defines how many times a metadata request is to be retried during a leader election phase. By default this parameter is set to 3.

- ElectTimeoutMs: Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.

- MetadataRefreshMs: Defines the interval in milliseconds for refetching cluster metadata. By default this parameter is set to 600000.

- TlsEnable: Enables TLS communication with brokers. By default this parameter is set to false.

- TlsKeyLocation: Path to the client's private key (PEM) used for TLS based authentication. By default this parameter is set to "".

- TlsCertificateLocation: Path to the client's public key (PEM) used for TLS based authentication. By default this parameter is set to "".

- TlsCaLocation: Path to the CA certificate(s) used for verifying the broker's key. By default this parameter is set to "".

- TlsServerName: Used to verify the hostname on the server's certificate unless TlsInsecureSkipVerify is true. By default this parameter is set to "".

- TlsInsecureSkipVerify: Enables server certificate chain and host name verification. By default this parameter is set to false.

- SaslEnable: Enables SASL based authentication. By default this parameter is set to false.

- SaslUsername: Sets the user name used for SASL/PLAIN authentication. By default this parameter is set to "".

- SaslPassword: Sets the password used for SASL/PLAIN authentication. By default this parameter is set to "".

MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.

Examples

kafkaWriter:
  Type: producer.Kafka
  Streams: logs
  Compression: zip
  Servers:
    - "kafka01:9092"
    - "kafka02:9092"
    - "kafka03:9092"
    - "kafka04:9092"

func (*Kafka) Configure

func (prod *Kafka) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Kafka) Produce

func (prod *Kafka) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type Murmur2HashPartitioner added in v0.4.5

type Murmur2HashPartitioner struct {
	// contains filtered or unexported fields
}

Murmur2HashPartitioner implements murmur2 hash to be used for kafka messages. Murmur2HashPartitioner satisfies sarama.Partitioner so it can be directly assigned to sarama kafka producer config. Note: If the key of the message is nil, the message will be partitioned randomly.

func (*Murmur2HashPartitioner) Partition added in v0.4.5

func (p *Murmur2HashPartitioner) Partition(message *kafka.ProducerMessage, numPartitions int32) (int32, error)

Partition chooses a partition based on the murmur2 hash of the key. If no key is given a random parition is chosen.

func (*Murmur2HashPartitioner) RequiresConsistency added in v0.4.5

func (p *Murmur2HashPartitioner) RequiresConsistency() bool

RequiresConsistency always returns true

type Null

type Null struct {
	core.DirectProducer `gollumdoc:"embed_type"`
}

Null producer

This producer is meant to be used as a sink for data. It will throw away all messages without notice.

Examples

TrashCan:
  Type: producer.Null
  Streams: trash

func (*Null) Produce

func (prod *Null) Produce(threads *sync.WaitGroup)

Produce starts a control loop only

type Proxy

type Proxy struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Proxy producer plugin

This producer is a compatible with the Proxy consumer plugin. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms.

Parameters

- Address: This value stores the identifier to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.Proxy". By default this parameter is set to ":5880".

- ConnectionBufferSizeKB: This value sets the connection buffer size in KB. This also defines the size of the buffer used by the message parser. By default this parameter is set to "1024".

- TimeoutSec: This value defines the maximum time in seconds a client is allowed to take for a response. By default this parameter is set to "1".

- Partitioner: This value defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this parameter is set to "delimiter".

  • delimiter: separates messages by looking for a delimiter string. The delimiter is included into the left hand message.

  • ascii: reads an ASCII encoded number at a given offset until a given delimiter is found.

  • binary: reads a binary number at a given offset and size

  • binary_le: is an alias for "binary"

  • binary_be: is the same as "binary" but uses big endian encoding

  • fixed: assumes fixed size messages

- Delimiter: This value defines the delimiter used by the text and delimiter partitioner. By default this parameter is set to "\n".

- Offset: This value defines the offset used by the binary and text partitioner. This setting is ignored by the fixed partitioner. By default this parameter is set to "0".

- Size: This value defines the size in bytes used by the binary or fixed partitioner. For `binary` this can be set to 1,2,4 or 8, for `fixed` this defines the size of a message. BY default this parameter is set to "4" for `binary` or "1" for `fixed` partitioner.

Examples

This example will send 64bit length encoded data on TCP port 5880.

proxyOut:
  Type: producer.Proxy
  Address: ":5880"
  Partitioner: binary
  Size: 8

func (*Proxy) Configure

func (prod *Proxy) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Proxy) Produce

func (prod *Proxy) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given Proxy.

type Redis

type Redis struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Redis producer

This producer sends messages to a redis server. Different redis storage types and database indexes are supported. This producer does not implement support for redis 3.0 cluster.

Parameters

- Address: Stores the identifier to connect to. This can either be any ip address and port like "localhost:6379" or a file like "unix:///var/redis.socket". By default this is set to ":6379".

- Database: Defines the redis database to connect to.

- Key: Defines the redis key to store the values in. This field is ignored when "KeyFormatter" is set. By default this is set to "default".

- Storage: Defines the type of the storage to use. Valid values are: "hash", "list", "set", "sortedset", "string". By default this is set to "hash".

- KeyFrom: Defines the name of the metadata field used as a key for messages sent to redis. If the name is an empty string no key is sent. By default this value is set to an empty string.

- FieldFrom: Defines the name of the metadata field used as a field for messages sent to redis. If the name is an empty string no key is sent. By default this value is set to an empty string.

Examples

.

RedisProducer00:
  Type: producer.Redis
  Address: ":6379"
  Key: "mykey"
  Storage: "hash"

func (*Redis) Configure

func (prod *Redis) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Redis) Produce

func (prod *Redis) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Scribe

type Scribe struct {
	core.BatchedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Scribe producer

This producer allows sending messages to Facebook's scribe service.

Parameters

- Address: Defines the host and port of a scrive endpoint. By default this parameter is set to "localhost:1463".

- ConnectionBufferSizeKB: Sets the connection socket buffer size in KB. By default this parameter is set to 1024.

- HeartBeatIntervalSec: Defines the interval in seconds used to query scribe for status updates. By default this parameter is set to 1.

- WindowSize: Defines the maximum number of messages send to scribe in one call. The WindowSize will reduce when scribe is returing "try later" to reduce load on the scribe server. It will slowly rise again for each successful write until WindowSize is reached again. By default this parameter is set to 2048.

- ConnectionTimeoutSec: Defines the time in seconds after which a connection timeout is assumed. This can happen during writes or status reads. By default this parameter is set to 5.

- Category: Maps a stream to a scribe category. You can define the wildcard stream (*) here, too. When set, all streams that do not have a specific mapping will go to this category (including reserved streams like _GOLLUM_). If no category mappings are set the stream name is used as category. By default this parameter is set to an empty list.

Examples

logs:
  Type: producer.Scribe"
  Stream: ["*", "_GOLLUM"]
  Address: "scribe01:1463"
  HeartBeatIntervalSec: 10
  Category:
    "access"   : "accesslogs"
    "error"    : "errorlogs"
    "_GOLLUM_" : "gollumlogs"

func (*Scribe) Configure

func (prod *Scribe) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Scribe) Produce

func (prod *Scribe) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to scribe.

type Socket

type Socket struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Socket producer plugin

The socket producer connects to a service over TCP, UDP or a UNIX domain socket.

Parameters

- Address: Defines the address to connect to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this parameter is set to ":5880".

- ConnectionBufferSizeKB: This value sets the connection buffer size in KB. By default this parameter is set to "1024".

- Batch/MaxCount: This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to "8192".

- Batch/FlushCount: This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this parameter is set to "Batch/MaxCount / 2".

- Batch/TimeoutSec: This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to "5".

- Acknowledge: This value can be set to a non-empty value to expect the given string as a response from the server after a batch has been sent. If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used. By default this parameter is set to "".

- AckTimeoutMs: This value defines the time in milliseconds to wait for a response from the server. After this timeout the send is marked as failed. By default this parameter is set to "2000".

Examples

This example starts a socket producer on localhost port 5880:

SocketOut:
  Type: producer.Socket
  Address: ":5880"
  Batch
    MaxCount: 1024
    FlushCount: 512
    TimeoutSec: 3
  AckTimeoutMs: 1000

func (*Socket) Configure

func (prod *Socket) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Socket) Produce

func (prod *Socket) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type Spooling added in v0.4.0

type Spooling struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Spooling producer

This producer is meant to be used as a fallback if another producer fails to send messages, e.g. because a service is down. It does not really produce messages to some other service, it buffers them on disk for a certain time and inserts them back to the system after this period.

Parameters

- Path: Sets the output directory for spooling files. Spooling files will be stored as "<path>/<stream name>/<number>.spl". By default this parameter is set to "/var/run/gollum/spooling".

- MaxFileSizeMB: Sets the size limit in MB that causes a spool file rotation. Reading messages back into the system will start only after a file is rotated. By default this parameter is set to 512.

- MaxFileAgeMin: Defines the duration in minutes after which a spool file rotation is triggered (regardless of MaxFileSizeMB). Reading messages back into the system will start only after a file is rotated. By default this parameter is set to 1.

- MaxMessagesSec: Sets the maximum number of messages that will be respooled per second. Setting this value to 0 will cause respooling to send as fast as possible. By default this parameter is set to 100.

- RespoolDelaySec: Defines the number of seconds to wait before trying to load existing spool files from disk after a restart. This setting can be used to define a safe timeframe for gollum to set up all required connections and resources before putting additionl load on it. By default this parameter is set to 10.

- RevertStreamOnFallback: This allows the spooling fallback to handle the messages that would have been sent back by the spooler if it would have handled the message. When set to true it will revert the stream of the message to the previous stream ID before sending it to the Fallback stream. By default this parameter is set to false.

- BufferSizeByte: Defines the initial size of the buffer that is used to read messages from a spool file. If a message is larger than this size, the buffer will be resized. By default this parameter is set to 8192.

- Batch/MaxCount: defines the maximum number of messages stored in memory before a write to file is triggered. By default this parameter is set to 100.

- Batch/TimeoutSec: defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to 5.

Examples

This example will collect messages from the fallback stream and buffer them for 10 minutes. After 10 minutes the first messages will be written back to the system as fast as possible.

spooling:
  Type: producer.Spooling
  Stream: fallback
  MaxMessagesSec: 0
  MaxFileAgeMin: 10

func (*Spooling) Configure added in v0.4.0

func (prod *Spooling) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Spooling) Produce added in v0.4.0

func (prod *Spooling) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

func (*Spooling) TryFallback added in v0.5.0

func (prod *Spooling) TryFallback(msg *core.Message)

TryFallback reverts the message stream before dropping

type StatsdMetrics added in v0.5.0

type StatsdMetrics struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

StatsdMetrics producer

This producer samples the messages it receives and sends metrics about them to statsd.

Parameters

- Server: Defines the server and port to send statsd metrics to. By default this parameter is set to "localhost:8125".

- Prefix: Defines a string that is prepended to every statsd metric name. By default this parameter is set to "gollum.".

- StreamMapping: Defines a translation from gollum stream to statsd metric name. If no mapping is given the gollum stream name is used as the metric name. By default this parameter is set to an empty list.

- UseMessage: Switch between just counting all messages arriving at this producer or summing up the message content. If UseMessage is set to true, the contents will be parsed as an integer, i.e. a string containing a human readable number is expected. By default the parameter is set to false.

- UseGauge: When set to true the statsd data format will switch from counter to gauge. Every stream that does not receive any message but is liste in StreamMapping will have a gauge value of 0. By default this is parameter is set to false.

- Batch/MaxMessages: Defines the maximum number of messages to collect per batch. By default this parameter is set to 500.

- Batch/TimeoutSec: Defines the number of seconds after which a batch is processed, regardless of MaxMessages being reached or not. By default this parameter is set to 10.

Examples

This example will collect all messages going through gollum and sending metrics about the different datastreams to statsd at least every 5 seconds. Metrics will be send as "logs.streamName".

metricsCollector:
  Type: producer.StatsdMetrics
  Stream: "*"
  Server: "stats01:8125"
  BatchTimeoutSec: 5
  Prefix: "logs."
  UseGauge: true

func (*StatsdMetrics) Configure added in v0.5.0

func (prod *StatsdMetrics) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*StatsdMetrics) Produce added in v0.5.0

func (prod *StatsdMetrics) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

type Websocket

type Websocket struct {
	core.BufferedProducer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Websocket producer plugin

The websocket producer opens up a websocket.

Parameters

- Address: This value defines the host and port to bind to. This is allowed be any ip address/dns and port like "localhost:5880". By default this parameter is set to ":81".

- Path: This value defines the url path to listen for. By default this parameter is set to "/"

- ReadTimeoutSec: This value specifies the maximum duration in seconds before timing out read of the request. By default this parameter is set to "3" seconds.

- IgnoreOrigin: Ignore origin check from websocket server. By default this parameter is set to "false".

Examples

This example starts a default Websocket producer on port 8080:

WebsocketOut:
  Type: producer.Websocket
  Address: ":8080"

func (*Websocket) Configure

func (prod *Websocket) Configure(conf core.PluginConfigReader)

Configure initializes this producer with values from a plugin config.

func (*Websocket) Produce

func (prod *Websocket) Produce(workers *sync.WaitGroup)

Produce writes to stdout or stderr.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL