
v0.53.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2025 License: BSD-3-Clause Imports: 29 Imported by: 0



Pipeline is an entity which handles data. It consists of input plugin, list of action plugins and output plugin. The input plugin sends the data to pipeline.In controller. There the data is validated, if the data is empty, it is discarded, the data size is also checked, the behaviour for the long logs is defined by cut_off_event_by_limit setting. Then the data is checked in antispam if it is enabled. After all checks are passed the data is converted to the Event structure, the events are limited by the EventPool, and decoded depending on the pipeline settings. The event is sent to stream which are handled with processors. In the processors the event is passed through the list of action plugins and sent to the output plugin. Output plugin commits the Event by calling pipeline.Commit function and after the commit is finished the data is considered as processed. More details and architecture is presented in architecture page.


capacity int default=1024

Capacity of the EventPool. There can only be processed no more than capacity events at the same time. It can be considered as one of the rate limiting tools, but its primary role is to control the amount of RAM used by File.d.

avg_log_size int default=4096

Expected average size of the input logs in bytes. Used in standard event pool to release buffer memory when its size exceeds this value.

max_event_size int default=0

Maximum allowed size of the input logs in bytes. If set to 0, logs of any size are allowed. If set to the value greater than 0, logs with size greater than max_event_size are discarded unless cut_off_event_by_limit is set to true.

cut_off_event_by_limit bool default=false

Flag indicating whether to cut logs which have exceeded the max_event_size. If set to true huge logs are cut and only the first max_event_size bytes of the logs are passed further. If set to false huge logs are discarded. Only works if max_event_size is greater than 0, otherwise does nothing. Useful when there are huge logs which affect the logging system but it is prefferable to deliver them at least partially.

cut_off_event_by_limit_field string

Field to add to log if it was cut by max_event_size. E.g. with cut_off_event_by_limit_field: _cropped, if the log was cut, the output event will have field "_cropped":true. Only works if cut_off_event_by_limit is set to true and max_event_size is greater than 0. Useful for marking cut logs.

decoder string default=auto

Which decoder to use on every log from input plugin. Defaults to auto meaning the usage of the decoder suggested by the input plugin. Currently most of the time json decoder is suggested, the only exception is k8s input plugin with CRI type not docker, in that case cri decoder is suggested. The full list of the decoders is available on the decoders page.

decoder_params map[string]any

Additional parameters for the chosen decoder. The params list varies. It can be found on the decoders page for each of them.

stream_field string default=stream

Which field in the log indicates stream. Mostly used for distinguishing stdout from stderr in k8s logs.

maintenance_interval string default=5s

How often to perform maintenance. Maintenance includes antispammer maintenance and metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (<number>(ms|s|m|h)).

event_timeout bool default=30s

How long the event can process in action plugins and block stream in streamer until it is marked as a timeout event and unlocks stream so that the whole pipeline does not get stuck. The value must be passed in format of duration (<number>(ms|s|m|h)).

antispam_threshold int default=0

Threshold value for the antispammer to ban sources. If set to 0 antispammer is disabled. If set to the value greater than 0 antispammer is enabled and bans sources which write antispam_threshold or more logs in maintenance_interval time.

antispam_exceptions []antispam.Exception

The list of antispammer exceptions. If the log matches at least one of the exceptions it is not accounted in antispammer.

meta_cache_size int default=1024

Amount of entries in metadata cache.

source_name_meta_field string

The metadata field used to retrieve the name or origin of a data source. You can use it for antispam. Metadata is configured via meta parameter in input plugin. For example:

    type: k8s
        pod_namespace: '{{ .pod_name }}.{{ .namespace_name }}'
    antispam_threshold: 2000
    source_name_meta_field: pod_namespace

is_strict bool default=false

Whether to fatal on decoding error.

metric_hold_duration string default=30m

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (<number>(ms|s|m|h)).

pool string options=std|low_memory

Type of EventPool that file.d uses to reuse memory. std pool is an original event pool with pre-allocated events at the start of the application. This pool only frees up memory if event exceeds avg_log_size.

low_memory event pool based on Go's sync.Pool with lazy memory allocation. It frees up memory depending on the application load - if file.d processes a lot of events, then a lot of memory will be allocated. If the application load decreases, then the extra events will be freed up in background.

Note that low_memory pool increases the load on the garbage collector. If you are confident in what you are doing, you can change the GOGC (file.d uses GOGC=30 as default value) environment variable to adjust the frequency of garbage collection – this can reduce the load on the CPU.

Both pools support the capacity setting, which both pools use to ensure that they do not exceed the number of allocated events. This parameter is useful for avoiding OOM.

Default pool is low_memory.

Datetime parse formats

Most of the plugins which work with parsing datetime call pipeline.ParseTime function. It accepts datetime layouts the same way as Go time.Parse (in format of datetime like 2006-01-02T15:04:05.999999999Z07:00) except unix timestamp formats, they can only be specified via aliases.

For the comfort of use there are aliases to some datetime formats:

  • ansic - Mon Jan _2 15:04:05 2006
  • unixdate - Mon Jan _2 15:04:05 MST 2006
  • rubydate - Mon Jan 02 15:04:05 -0700 2006
  • rfc822 - 02 Jan 06 15:04 MST
  • rfc822z - 02 Jan 06 15:04 -0700
  • rfc850 - Monday, 02-Jan-06 15:04:05 MST
  • rfc1123 - Mon, 02 Jan 2006 15:04:05 MST
  • rfc1123z - Mon, 02 Jan 2006 15:04:05 -0700
  • rfc3339 - 2006-01-02T15:04:05Z07:00
  • rfc3339nano - 2006-01-02T15:04:05.999999999Z07:00
  • kitchen - 3:04PM
  • stamp - Jan _2 15:04:05
  • stampmilli - Jan _2 15:04:05.000
  • stampmicro - Jan _2 15:04:05.000000
  • stampnano - Jan _2 15:04:05.000000000
  • nginx_errorlog - 2006/01/02 15:04:05
  • unixtime - unix timestamp in seconds: 1739959880
  • unixtimemilli - unix timestamp in milliseconds: 1739959880999
  • unixtimemicro - unix timestamp in microseconds: 1739959880999999 (e.g. journalctl writes timestamp in that format in __REALTIME_TIMESTAMP field when using json output format)
  • unixtimenano - unix timestamp in nanoseconds: 1739959880999999999

Note: when using unixtime(|milli|micro|nano) if there is a float value its whole part is always considered as seconds and the fractional part is fractions of a second.

Match modes

Note: consider using DoIf match rules instead, since it is an advanced version of match modes.


match_mode: and — matches fields with AND operator


      - type: discard
          k8s_namespace: [payment, tarifficator] # use exact match
          k8s_pod: /^payment-api.*/              # use regexp match
        match_mode: and


{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd"}         # discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"payment-api"}         # discarded
{"k8s_namespace": "payment-tarifficator", "k8s_pod":"payment-api"} # won't be discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"no-payment-api"}      # won't be discarded


match_mode: or — matches fields with OR operator


      - type: discard
          k8s_namespace: [payment, tarifficator] # use exact match
          k8s_pod: /^payment-api.*/              # use regexp match
        match_mode: or


{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd"}         # discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"payment-api"}         # discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api"}                  # discarded
{"k8s_namespace": "payment", "k8s_pod":"map-api"}                  # discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"tarifficator-go-api"} # discarded
{"k8s_namespace": "sre", "k8s_pod":"cpu-quotas-abcd-1234"}         # won't be discarded


match_mode: and_prefix — matches fields with AND operator


      - type: discard
          k8s_namespace: payment # use prefix match
          k8s_pod: payment-api- # use prefix match
        match_mode: and_prefix


{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd-1234"}    # discarded
{"k8s_namespace": "payment-2", "k8s_pod":"payment-api-abcd-1234"}  # discarded
{"k8s_namespace": "payment", "k8s_pod":"checkout"}                 # won't be discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api-abcd-1234"}        # won't be discarded
{"k8s_namespace": "payment-abcd", "k8s_pod":"payment-api"}         # won't be discarded


match_mode: or_prefix — matches fields with OR operator


      - type: discard
          k8s_namespace: [payment, tarifficator] # use prefix match
          k8s_pod: /-api-.*/ # use regexp match
        match_mode: or_prefix


{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd-1234"}    # discarded
{"k8s_namespace": "payment", "k8s_pod":"checkout"}                 # discarded
{"k8s_namespace": "map", "k8s_pod":"map-go-api-abcd-1234"}         # discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api"}                  # won't be discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api-abcd-1234"}        # discarded
{"k8s_namespace": "tariff", "k8s_pod":"tarifficator"}              # won't be discarded

Generated using insane-doc




View Source
const (
	DefaultAntispamThreshold       = 0
	DefaultSourceNameMetaField     = ""
	DefaultDecoder                 = "auto"
	DefaultIsStrict                = false
	DefaultStreamField             = "stream"
	DefaultCapacity                = 1024
	DefaultAvgInputEventSize       = 4 * 1024
	DefaultMaxInputEventSize       = 0
	DefaultCutOffEventByLimit      = false
	DefaultCutOffEventByLimitField = ""
	DefaultJSONNodePoolSize        = 16
	DefaultMaintenanceInterval     = time.Second * 5
	DefaultEventTimeout            = time.Second * 30
	DefaultFieldValue              = "not_set"
	DefaultStreamName              = StreamName("not_set")
	DefaultMetricHoldDuration      = time.Minute * 30
	DefaultMetaCacheSize           = 1024

	EventSeqIDError = uint64(0)
View Source
const (
	UnixTime      = "unixtime"
	UnixTimeMilli = "unixtimemilli"
	UnixTimeMicro = "unixtimemicro"
	UnixTimeNano  = "unixtimenano"
View Source
const (
	ByNameSelector = iota + 1
View Source
const LevelUnknownStr = ""


View Source
var MatchModes = map[string]MatchMode{
	"": MatchModeAnd,

	"and": MatchModeAnd,

	"or": MatchModeOr,

	"and_prefix": MatchModeAndPrefix,

	"or_prefix": MatchModeOrPrefix,


func ByteToStringUnsafe

func ByteToStringUnsafe(b []byte) string

ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)

func CloneString added in v0.4.8

func CloneString(s string) string

Clone deeply copies string

func CreateNestedField added in v0.5.10

func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node

CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }

func GetBackoff added in v0.21.0

func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff

func MergeToRoot added in v0.50.0

func MergeToRoot(root *insaneJSON.Root, src *insaneJSON.Node)

MergeToRoot like insaneJSON.Node.MergeWith, but without allocations.

func ParseFormatName

func ParseFormatName(formatName string) (string, error)

func ParseLevelAsString added in v0.5.10

func ParseLevelAsString(level string) string

ParseLevelAsString converts log level to the string representation according to the RFC-5424.

func ParseTime added in v0.8.12

func ParseTime(format, value string) (time.Time, error)

func StringToByteUnsafe

func StringToByteUnsafe(s string) []byte

StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)


type ActionPlugin

type ActionPlugin interface {
	Start(config AnyConfig, params *ActionPluginParams)
	Do(*Event) ActionResult

type ActionPluginController

type ActionPluginController interface {
	Propagate(event *Event) // throw held event back to pipeline
	Spawn(parent *Event, nodes []*insaneJSON.Node)
	IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter

type ActionPluginInfo

type ActionPluginInfo struct {

type ActionPluginParams

type ActionPluginParams struct {
	Controller ActionPluginController
	Logger     *zap.SugaredLogger

type ActionPluginStaticInfo

type ActionPluginStaticInfo struct {

	MetricName       string
	MetricLabels     []string
	MetricSkipStatus bool
	MatchConditions  MatchConditions
	MatchMode        MatchMode
	MatchInvert      bool

	DoIfChecker *doif.Checker

type ActionResult

type ActionResult int
const (
	// ActionPass pass event to the next action in a pipeline
	ActionPass ActionResult = iota
	// ActionCollapse skip further processing of event and request next event from the same stream and source as current
	// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream
	// ActionDiscard skip further processing of event and request next event from any stream and source
	// ActionHold hold event in a plugin and request next event from the same stream and source as current.
	// same as ActionCollapse but held event should be manually committed or returned into pipeline.
	// check out Commit()/Propagate() functions in InputPluginController.
	// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream.
	// ActionBreak abort the event processing and pass it to an output.

type AnyConfig

type AnyConfig any

func GetConfig added in v0.18.0

func GetConfig(info *PluginStaticInfo, configJson []byte, values map[string]int) (AnyConfig, error)

type AnyPlugin

type AnyPlugin any

type BackoffOpts added in v0.21.0

type BackoffOpts struct {
	MinRetention time.Duration
	Multiplier   float64
	AttemptNum   int

type Batch

type Batch struct {
	// contains filtered or unexported fields

func NewPreparedBatch added in v0.16.2

func NewPreparedBatch(events []*Event) *Batch

func (*Batch) ForEach added in v0.16.2

func (b *Batch) ForEach(cb func(event *Event))

type BatchStatus added in v0.12.3

type BatchStatus byte
const (
	BatchStatusNotReady BatchStatus = iota

type Batcher

type Batcher struct {
	// contains filtered or unexported fields

func NewBatcher

func NewBatcher(opts BatcherOptions) *Batcher

func (*Batcher) Add

func (b *Batcher) Add(event *Event)

func (*Batcher) Start

func (b *Batcher) Start(ctx context.Context)

func (*Batcher) Stop

func (b *Batcher) Stop()

type BatcherMaintenanceFn

type BatcherMaintenanceFn func(*WorkerData)

type BatcherOptions added in v0.5.15

type BatcherOptions struct {
	PipelineName        string
	OutputType          string
	OutFn               BatcherOutFn
	MaintenanceFn       BatcherMaintenanceFn
	Controller          OutputPluginController
	Workers             int
	BatchSizeCount      int
	BatchSizeBytes      int
	FlushTimeout        time.Duration
	MaintenanceInterval time.Duration
	MetricCtl           *metric.Ctl

type BatcherOutFn

type BatcherOutFn func(*WorkerData, *Batch)

type ConditionType

type ConditionType int

type DeltaWrapper added in v0.5.5

type DeltaWrapper struct {
	// contains filtered or unexported fields

DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metric (see the maintenance function).

type Event

type Event struct {
	Root *insaneJSON.Root
	Buf  []byte

	SeqID      uint64
	Offset     int64
	SourceID   SourceID
	SourceName string

	// Size in bytes of the raw event before any action plugin.
	Size int
	// contains filtered or unexported fields

func (*Event) Encode

func (e *Event) Encode(outBuf []byte) ([]byte, int)

func (*Event) IsChildKind added in v0.16.2

func (e *Event) IsChildKind() bool

func (*Event) IsChildParentKind added in v0.16.2

func (e *Event) IsChildParentKind() bool

func (*Event) IsIgnoreKind

func (e *Event) IsIgnoreKind() bool

func (*Event) IsRegularKind

func (e *Event) IsRegularKind() bool

func (*Event) IsTimeoutKind

func (e *Event) IsTimeoutKind() bool

func (*Event) IsUnlockKind

func (e *Event) IsUnlockKind() bool

func (*Event) SetChildKind added in v0.16.2

func (e *Event) SetChildKind()

func (*Event) SetChildParentKind added in v0.16.2

func (e *Event) SetChildParentKind()

func (*Event) SetTimeoutKind

func (e *Event) SetTimeoutKind()

func (*Event) SetUnlockKind

func (e *Event) SetUnlockKind()

func (*Event) StreamNameBytes

func (e *Event) StreamNameBytes() []byte

func (*Event) String

func (e *Event) String() string

type InputPlugin

type InputPlugin interface {
	Start(config AnyConfig, params *InputPluginParams)
	PassEvent(event *Event) bool

type InputPluginController

type InputPluginController interface {
	In(sourceID SourceID, sourceName string, offsets Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
	UseSpread()                            // don't use stream field and spread all events across all processors
	DisableStreams()                       // don't use stream field
	SuggestDecoder(t decoder.Type)         // set decoder type if pipeline uses "auto" value for decoder
	IncReadOps()                           // inc read ops for metric
	IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter

type InputPluginInfo

type InputPluginInfo struct {

type InputPluginParams

type InputPluginParams struct {
	Controller InputPluginController
	Logger     *zap.SugaredLogger

type Kind added in v0.13.0

type Kind byte
const (
	EventKindRegular Kind = iota


func (Kind) String added in v0.13.1

func (k Kind) String() string

type LogLevel added in v0.5.10

type LogLevel int
const (
	LevelUnknown LogLevel = iota - 1

func ParseLevelAsNumber added in v0.5.10

func ParseLevelAsNumber(level string) LogLevel

ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.

type MatchCondition

type MatchCondition struct {
	// Slice for nested fields. Separator is a dot symbol.
	Field  []string
	Values []string
	Regexp *regexp.Regexp

type MatchConditions

type MatchConditions []MatchCondition

type MatchMode

type MatchMode int
const (
	MatchModeAnd MatchMode = iota

func MatchModeFromString added in v0.7.3

func MatchModeFromString(mm string) MatchMode

type Offsets added in v0.40.4

type Offsets struct {
	// contains filtered or unexported fields

func NewOffsets added in v0.41.1

func NewOffsets(current int64, streamOffsets SliceMap) Offsets

type OutputPlugin

type OutputPlugin interface {
	Start(config AnyConfig, params *OutputPluginParams)

type OutputPluginController

type OutputPluginController interface {
	Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
	Error(err string)

type OutputPluginInfo

type OutputPluginInfo struct {

type OutputPluginParams

type OutputPluginParams struct {
	Controller OutputPluginController
	Logger     *zap.SugaredLogger

type Pipeline

type Pipeline struct {
	Name string

	Procs []*processor
	// contains filtered or unexported fields

func New

func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline

New creates new pipeline. Consider using `SetupHTTPHandlers` next.

func (*Pipeline) AddAction

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)

func (*Pipeline) Commit

func (p *Pipeline) Commit(event *Event)

func (*Pipeline) DisableParallelism

func (p *Pipeline) DisableParallelism()

func (*Pipeline) DisableStreams

func (p *Pipeline) DisableStreams()

func (*Pipeline) EnableEventLog

func (p *Pipeline) EnableEventLog()

func (*Pipeline) Error

func (p *Pipeline) Error(err string)

func (*Pipeline) GetEventLogItem

func (p *Pipeline) GetEventLogItem(index int) string

func (*Pipeline) GetEventsTotal

func (p *Pipeline) GetEventsTotal() int

func (*Pipeline) GetInput

func (p *Pipeline) GetInput() InputPlugin

func (*Pipeline) GetOutput

func (p *Pipeline) GetOutput() OutputPlugin

func (*Pipeline) In

func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64)

In decodes message and passes it to event stream.

func (*Pipeline) IncCountEventPanicsRecovered added in v0.40.2

func (p *Pipeline) IncCountEventPanicsRecovered()

func (*Pipeline) IncMaxEventSizeExceeded added in v0.5.11

func (p *Pipeline) IncMaxEventSizeExceeded(lvs ...string)

func (*Pipeline) IncReadOps added in v0.5.5

func (p *Pipeline) IncReadOps()

func (*Pipeline) SetInput

func (p *Pipeline) SetInput(info *InputPluginInfo)

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(info *OutputPluginInfo)

func (*Pipeline) SetupHTTPHandlers

func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)

SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.

func (*Pipeline) Start

func (p *Pipeline) Start()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

func (*Pipeline) SuggestDecoder

func (p *Pipeline) SuggestDecoder(t decoder.Type)

func (*Pipeline) UseSpread

func (p *Pipeline) UseSpread()

type PluginDefaultParams

type PluginDefaultParams struct {
	PipelineName     string
	PipelineSettings *Settings
	MetricCtl        *metric.Ctl

type PluginFactory

type PluginFactory func() (AnyPlugin, AnyConfig)

type PluginKind

type PluginKind string
const (
	PluginKindInput  PluginKind = "input"
	PluginKindAction PluginKind = "action"
	PluginKindOutput PluginKind = "output"

type PluginRuntimeInfo

type PluginRuntimeInfo struct {
	Plugin AnyPlugin
	ID     string

type PluginSelector

type PluginSelector struct {
	CondType  ConditionType
	CondValue string

PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.

type PluginStaticInfo

type PluginStaticInfo struct {
	Type    string
	Factory PluginFactory
	Config  AnyConfig

	// Endpoints is the map of endpoint name to handlerFunc.
	// Every plugin can provide their own API through Endpoints.
	Endpoints         map[string]func(http.ResponseWriter, *http.Request)
	AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config

type PluginsStarterData

type PluginsStarterData struct {
	Config AnyConfig
	Params *OutputPluginParams

type PluginsStarterMap

type PluginsStarterMap map[string]PluginsStarterData

type PoolType added in v0.40.4

type PoolType string
const (
	PoolTypeStd    PoolType = "std"
	PoolTypeLowMem PoolType = "low_memory"

type RetriableBatcher added in v0.21.0

type RetriableBatcher struct {
	// contains filtered or unexported fields

func NewRetriableBatcher added in v0.21.0

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher

func (*RetriableBatcher) Add added in v0.21.0

func (b *RetriableBatcher) Add(event *Event)

func (*RetriableBatcher) Out added in v0.21.0

func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch)

func (*RetriableBatcher) Start added in v0.21.0

func (b *RetriableBatcher) Start(ctx context.Context)

func (*RetriableBatcher) Stop added in v0.21.0

func (b *RetriableBatcher) Stop()

type RetriableBatcherOutFn added in v0.21.0

type RetriableBatcherOutFn func(*WorkerData, *Batch) error

type Settings

type Settings struct {
	Decoder                 string
	DecoderParams           map[string]any
	Capacity                int
	MetaCacheSize           int
	MaintenanceInterval     time.Duration
	EventTimeout            time.Duration
	AntispamThreshold       int
	AntispamExceptions      antispam.Exceptions
	SourceNameMetaField     string
	AvgEventSize            int
	MaxEventSize            int
	CutOffEventByLimit      bool
	CutOffEventByLimitField string
	StreamField             string
	IsStrict                bool
	MetricHoldDuration      time.Duration
	Pool                    PoolType

type SliceMap added in v0.41.1

type SliceMap []kv

SliceMap is a map of streamName to offset. It could be just map[k]v, but Go > 1.17 internal map implementation can't work with mutable strings that occurs when using unsafe cast from []byte. Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job.

func SliceFromMap added in v0.41.1

func SliceFromMap(m map[StreamName]int64) SliceMap

func (*SliceMap) Get added in v0.41.1

func (so *SliceMap) Get(streamName StreamName) (int64, bool)

func (*SliceMap) Set added in v0.41.1

func (so *SliceMap) Set(streamName StreamName, offset int64)

type SourceID

type SourceID uint64

type StreamID added in v0.5.20

type StreamID uint64

type StreamName

type StreamName string

type WorkerData

type WorkerData any


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL