file

package
v0.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2024 License: BSD-3-Clause Imports: 27 Imported by: 0

README

File plugin

It watches for files in the provided directory and reads them line by line.

Each line should contain only one event. It also correctly handles rotations (rename/truncate) and symlinks.

From time to time, it instantly releases and reopens descriptors of the completely processed files. Such behavior allows files to be deleted by a third party software even though file.d is still working (in this case the reopening will fail).

A watcher is trying to use the file system events to detect file creation and updates. But update events don't work with symlinks, so watcher also periodically manually fstat all tracking files to detect changes.

⚠ It supports the commitment mechanism. But "least once delivery" is guaranteed only if files aren't being truncated. However, file.d correctly handles file truncation, there is a little chance of data loss. It isn't a file.d issue. The data may have been written just before the file truncation. In this case, you may miss to read some events. If you care about the delivery, you should also know that the logrotate manual clearly states that copy/truncate may cause data loss even on a rotating stage. So use copy/truncate or similar actions only if your data isn't critical. In order to reduce potential harm of truncation, you can turn on notifications of file changes. By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive.

⚠ Use add_file_name plugin if you want to add filename to events.

Reading docker container log files:

pipelines:
  example_docker_pipeline:
    input:
        type: file
        paths:
          include:
            - '/var/lib/docker/containers/**/*-json.log'
          exclude:
            - '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
        offsets_file: /data/offsets.yaml
        persistence_mode: async
Config params

paths Paths

Set paths in glob format

  • include []string
  • exclude []string

offsets_file string required

The filename to store offsets of processed files. Offsets are loaded only on initialization.

It's a yaml file. You can modify it manually.


persistence_mode string default=async options=async|sync

It defines how to save the offsets file:

  • async – it periodically saves the offsets using async_interval. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
  • sync – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.

Save operation takes three steps:

  • Write the temporary file with all offsets;
  • Call fsync() on it;
  • Rename the temporary file to the original one.

async_interval ! cfg.Duration default=1s

Offsets saving interval. Only used if persistence_mode is set to async.

read_buffer_size int default=131072

The buffer size used for the file reading.

Each worker uses its own buffer so that final memory consumption will be read_buffer_size*workers_count.


max_files int default=16384

The max amount of opened files. If the limit is exceeded, file.d will exit with fatal.

Also, it checks your system's file descriptors limit: ulimit -n.


offsets_op string default=continue options=continue|tail|reset

An offset operation which will be performed when you add a file as a job:

  • continue – uses an offset file
  • tail – sets an offset to the end of the file
  • reset – resets an offset to the beginning of the file

It is only used on an initial scan of watching_dir. Files that will be caught up later during work will always use reset operation.


workers_count cfg.Expression default=gomaxprocs*8

It defines how many workers will be instantiated. Each worker:

  • Reads files (I/O bound)
  • Decodes events (CPU bound)

We recommend to set it to 4x-8x of CPU cores.


report_interval cfg.Duration default=5s

It defines how often to report statistical information to stdout


maintenance_interval cfg.Duration default=10s

It defines how often to perform maintenance For now maintenance consists of two stages:

  • Symlinks
  • Jobs

Symlinks maintenance detects if underlying file of symlink is changed. Job maintenance fstat tracked files to detect if new portion of data have been written to the file. If job is in done state when it releases and reopens file descriptor to allow third party software delete the file.


should_watch_file_changes bool default=false

It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation


meta cfg.MetaTemplates

Meta params

Add meta information to an event (look at Meta params) Use go-template syntax

Example: filename: '{{ .filename }}'


watching_dir Deprecated format

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have /var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log structure, watching_dir should be /var/my-logs. Also the filename_pattern/dir_pattern is useful to filter needless files/subdirectories. In the case of using two or more different directories, it's recommended to setup separate pipelines for each.


filename_pattern string default=*

Files that don't meet this pattern will be ignored.

Check out func Glob docs for details.


dir_pattern string default=*

Dirs that don't meet this pattern will be ignored.

Check out func Glob docs for details.


Meta params

filename

symlink

inode
Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var InfoRegistryInstance = &InfoRegistry{
	plugins: make(map[string]*Plugin),
}
View Source
var ResetterRegistryInstance = &ResetterRegistry{
	pipelineToResetter: make(map[string]*resetter),
}

ResetterRegistryInstance is an instance of the registry.

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

func NewJobProvider

func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.SugaredLogger) *jobProvider

func NewWatcher

func NewWatcher(
	dir string,
	paths Paths,
	notifyFn notifyFn,
	shouldWatchWrites bool,
	notifyChannelLengthMetric prometheus.Gauge,
	logger *zap.SugaredLogger,
) *watcher

NewWatcher creates a watcher that see file creations in the path and if they match filePattern and dirPattern, pass them to notifyFn.

Types

type Config

type Config struct {

	// > @3@4@5@6
	// >
	// > Set paths in glob format
	// >
	// > * `include` *`[]string`*
	// > * `exclude` *`[]string`*
	Paths Paths `json:"paths"` // *

	// > @3@4@5@6
	// >
	// > The filename to store offsets of processed files. Offsets are loaded only on initialization.
	// > > It's a `yaml` file. You can modify it manually.
	OffsetsFile    string `json:"offsets_file" required:"true"` // *
	OffsetsFileTmp string

	// > @3@4@5@6
	// >
	// > It defines how to save the offsets file:
	// > @persistenceMode|comment-list
	// >
	// > Save operation takes three steps:
	// > *  Write the temporary file with all offsets;
	// > *  Call `fsync()` on it;
	// > *  Rename the temporary file to the original one.
	PersistenceMode  string `json:"persistence_mode" default:"async" options:"async|sync"` // *
	PersistenceMode_ persistenceMode

	AsyncInterval  cfg.Duration `json:"async_interval" default:"1s" parse:"duration"` // *! @3 @4 @5 @6 <br> <br> Offsets saving interval. Only used if `persistence_mode` is set to `async`.
	AsyncInterval_ time.Duration

	// > @3@4@5@6
	// >
	// > The buffer size used for the file reading.
	// > > Each worker uses its own buffer so that final memory consumption will be `read_buffer_size*workers_count`.
	ReadBufferSize int `json:"read_buffer_size" default:"131072"` // *

	// > @3@4@5@6
	// >
	// > The max amount of opened files. If the limit is exceeded, `file.d` will exit with fatal.
	// > > Also, it checks your system's file descriptors limit: `ulimit -n`.
	MaxFiles int `json:"max_files" default:"16384"` // *

	// > @3@4@5@6
	// >
	// > An offset operation which will be performed when you add a file as a job:
	// > @offsetsOp|comment-list
	// > > It is only used on an initial scan of `watching_dir`. Files that will be caught up later during work will always use `reset` operation.
	OffsetsOp  string `json:"offsets_op" default:"continue" options:"continue|tail|reset"` // *
	OffsetsOp_ offsetsOp

	// > @3@4@5@6
	// >
	// > It defines how many workers will be instantiated.
	// > Each worker:
	// > * Reads files (I/O bound)
	// > * Decodes events (CPU bound)
	// > > We recommend to set it to 4x-8x of CPU cores.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*8" parse:"expression"` // *
	WorkersCount_ int

	// > @3@4@5@6
	// >
	// > It defines how often to report statistical information to stdout
	ReportInterval  cfg.Duration `json:"report_interval" default:"5s" parse:"duration"` // *
	ReportInterval_ time.Duration

	// > @3@4@5@6
	// >
	// > It defines how often to perform maintenance
	// > @maintenance
	MaintenanceInterval  cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // *
	MaintenanceInterval_ time.Duration

	// > @3@4@5@6
	// >
	// > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
	ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Meta params
	// >
	// > Add meta information to an event (look at Meta params)
	// > Use [go-template](https://pkg.go.dev/text/template) syntax
	// >
	// > Example: “`filename: '{{ .filename }}'“`
	Meta cfg.MetaTemplates `json:"meta"` // *

	// > **Deprecated format**
	// >
	// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
	// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
	// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
	// > different directories, it's recommended to setup separate pipelines for each.
	WatchingDir string `json:"watching_dir"` // *

	// > @3@4@5@6
	// >
	// > Files that don't meet this pattern will be ignored.
	// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
	FilenamePattern string `json:"filename_pattern" default:"*"` // *

	// > @3@4@5@6
	// >
	// > Dirs that don't meet this pattern will be ignored.
	// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
	DirPattern string `json:"dir_pattern" default:"*"` // *
}

type InfoRegistry added in v0.25.0

type InfoRegistry struct {
	// contains filtered or unexported fields
}

func (*InfoRegistry) AddPlugin added in v0.25.0

func (ir *InfoRegistry) AddPlugin(pipelineName string, plug *Plugin)

func (*InfoRegistry) Info added in v0.25.0

func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request)

type Job

type Job struct {
	// contains filtered or unexported fields
}

type Paths added in v0.35.0

type Paths struct {
	Include []string `json:"include"`
	Exclude []string `json:"exclude"`
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Commit

func (p *Plugin) Commit(event *pipeline.Event)

func (*Plugin) PassEvent added in v0.6.5

func (p *Plugin) PassEvent(event *pipeline.Event) bool

PassEvent decides pass or discard event.

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

type ResetterRegistry

type ResetterRegistry struct {
	// contains filtered or unexported fields
}

ResetterRegistry is a registry that holds map of pipeline names to file plugins and functions to reset their offsets.

func (*ResetterRegistry) AddResetter

func (rr *ResetterRegistry) AddResetter(pipelineName string, plug *Plugin)

AddResetter adds plugin to the ResetterRegistry.

func (*ResetterRegistry) Reset

func (rr *ResetterRegistry) Reset(_ http.ResponseWriter, request *http.Request)

Reset truncates jobs if the plugin has started or delete the whole offset file or just one entry if inode or source_id was setted in a request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL