file

package
v0.14.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: BSD-3-Clause Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ResetterRegistryInstance = &ResetterRegistry{
	pipelineToResetter: make(map[string]*resetter),
}

ResetterRegistryInstance is an instance of the registry.

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

func NewJobProvider

func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider

func NewWatcher

func NewWatcher(
	path string,
	filenamePattern string,
	dirPattern string,
	notifyFn notifyFn,
	shouldWatchWrites bool,
	logger *zap.SugaredLogger,
) *watcher

NewWatcher creates a watcher that see file creations in the path and if they match filePattern and dirPattern, pass them to notifyFn.

Types

type Config

type Config struct {

	// > @3@4@5@6
	// >
	// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
	// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
	// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
	// > different directories, it's recommended to setup separate pipelines for each.
	WatchingDir string `json:"watching_dir" required:"true"` // *

	// > @3@4@5@6
	// >
	// > The filename to store offsets of processed files. Offsets are loaded only on initialization.
	// > > It's a `yaml` file. You can modify it manually.
	OffsetsFile    string `json:"offsets_file" required:"true"` // *
	OffsetsFileTmp string

	// > @3@4@5@6
	// >
	// > Files that don't meet this pattern will be ignored.
	// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
	FilenamePattern string `json:"filename_pattern" default:"*"` // *

	// > @3@4@5@6
	// >
	// > Dirs that don't meet this pattern will be ignored.
	// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
	DirPattern string `json:"dir_pattern" default:"*"` // *

	// > @3@4@5@6
	// >
	// > It defines how to save the offsets file:
	// > @persistenceMode|comment-list
	// >
	// > Save operation takes three steps:
	// > *  Write the temporary file with all offsets;
	// > *  Call `fsync()` on it;
	// > *  Rename the temporary file to the original one.
	PersistenceMode  string `json:"persistence_mode" default:"async" options:"async|sync"` // *
	PersistenceMode_ persistenceMode

	AsyncInterval  cfg.Duration `json:"async_interval" default:"1s" parse:"duration"` // *! @3 @4 @5 @6 <br> <br> Offsets saving interval. Only used if `persistence_mode` is set to `async`.
	AsyncInterval_ time.Duration

	// > @3@4@5@6
	// >
	// > The buffer size used for the file reading.
	// > > Each worker uses its own buffer so that final memory consumption will be `read_buffer_size*workers_count`.
	ReadBufferSize int `json:"read_buffer_size" default:"131072"` // *

	// > @3@4@5@6
	// >
	// > The max amount of opened files. If the limit is exceeded, `file.d` will exit with fatal.
	// > > Also, it checks your system's file descriptors limit: `ulimit -n`.
	MaxFiles int `json:"max_files" default:"16384"` // *

	// > @3@4@5@6
	// >
	// > An offset operation which will be performed when you add a file as a job:
	// > @offsetsOp|comment-list
	// > > It is only used on an initial scan of `watching_dir`. Files that will be caught up later during work will always use `reset` operation.
	OffsetsOp  string `json:"offsets_op" default:"continue" options:"continue|tail|reset"` // *
	OffsetsOp_ offsetsOp

	// > @3@4@5@6
	// >
	// > It defines how many workers will be instantiated.
	// > Each worker:
	// > * Reads files (I/O bound)
	// > * Decodes events (CPU bound)
	// > > We recommend to set it to 4x-8x of CPU cores.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*8" parse:"expression"` // *
	WorkersCount_ int

	// > @3@4@5@6
	// >
	// > It defines how often to report statistical information to stdout
	ReportInterval  cfg.Duration `json:"report_interval" default:"5s" parse:"duration"` // *
	ReportInterval_ time.Duration

	// > @3@4@5@6
	// >
	// > It defines how often to perform maintenance
	// > @maintenance
	MaintenanceInterval  cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // *
	MaintenanceInterval_ time.Duration

	// > @3@4@5@6
	// >
	// > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
	ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // *
}

type Job

type Job struct {
	// contains filtered or unexported fields
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Commit

func (p *Plugin) Commit(event *pipeline.Event)

func (*Plugin) PassEvent added in v0.6.5

func (p *Plugin) PassEvent(event *pipeline.Event) bool

PassEvent decides pass or discard event.

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

type ResetterRegistry

type ResetterRegistry struct {
	// contains filtered or unexported fields
}

ResetterRegistry is a registry that holds map of pipeline names to file plugins and functions to reset their offsets.

func (*ResetterRegistry) AddResetter

func (rr *ResetterRegistry) AddResetter(pipelineName string, plug *Plugin)

AddResetter adds plugin to the ResetterRegistry.

func (*ResetterRegistry) Reset

func (rr *ResetterRegistry) Reset(_ http.ResponseWriter, request *http.Request)

Reset truncates jobs if the plugin has started or delete the whole offset file or just one entry if inode or source_id was setted in a request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL