Documentation ¶
Index ¶
- Variables
- func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)
- func NewJobProvider(config *Config, controller pipeline.InputPluginController, ...) *jobProvider
- func NewWatcher(path string, filenamePattern string, dirPattern string, notifyFn notifyFn, ...) *watcher
- type Config
- type Job
- type Plugin
- type ResetterRegistry
Constants ¶
This section is empty.
Variables ¶
View Source
var ResetterRegistryInstance = &ResetterRegistry{ pipelineToResetter: make(map[string]*resetter), }
ResetterRegistryInstance is an instance of the registry.
Functions ¶
func NewJobProvider ¶
func NewJobProvider(config *Config, controller pipeline.InputPluginController, logger *zap.SugaredLogger) *jobProvider
func NewWatcher ¶
func NewWatcher( path string, filenamePattern string, dirPattern string, notifyFn notifyFn, shouldWatchWrites bool, logger *zap.SugaredLogger, ) *watcher
NewWatcher creates a watcher that see file creations in the path and if they match filePattern and dirPattern, pass them to notifyFn.
Types ¶
type Config ¶
type Config struct { //> @3@4@5@6 //> //> The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have //> `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. //> Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more //> different directories, it's recommended to setup separate pipelines for each. WatchingDir string `json:"watching_dir" required:"true"` //* //> @3@4@5@6 //> //> The filename to store offsets of processed files. Offsets are loaded only on initialization. //> > It's a `yaml` file. You can modify it manually. OffsetsFile string `json:"offsets_file" required:"true"` //* OffsetsFileTmp string //> @3@4@5@6 //> //> Files that don't meet this pattern will be ignored. //> > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. FilenamePattern string `json:"filename_pattern" default:"*"` //* //> @3@4@5@6 //> //> Dirs that don't meet this pattern will be ignored. //> > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. DirPattern string `json:"dir_pattern" default:"*"` //* //> @3@4@5@6 //> //> It defines how to save the offsets file: //> @persistenceMode|comment-list //> //> Save operation takes three steps: //> * Write the temporary file with all offsets; //> * Call `fsync()` on it; //> * Rename the temporary file to the original one. PersistenceMode string `json:"persistence_mode" default:"async" options:"async|sync"` //* PersistenceMode_ persistenceMode AsyncInterval cfg.Duration `json:"async_interval" default:"1s" parse:"duration"` // *! @3 @4 @5 @6 <br> <br> Offsets saving interval. Only used if `persistence_mode` is set to `async`. AsyncInterval_ time.Duration //> @3@4@5@6 //> //> The buffer size used for the file reading. //> > Each worker uses its own buffer so that final memory consumption will be `read_buffer_size*workers_count`. ReadBufferSize int `json:"read_buffer_size" default:"131072"` //* //> @3@4@5@6 //> //> The max amount of opened files. If the limit is exceeded, `file.d` will exit with fatal. //> > Also, it checks your system's file descriptors limit: `ulimit -n`. MaxFiles int `json:"max_files" default:"16384"` //* //> @3@4@5@6 //> //> An offset operation which will be performed when you add a file as a job: //> @offsetsOp|comment-list //> > It is only used on an initial scan of `watching_dir`. Files that will be caught up later during work will always use `reset` operation. OffsetsOp string `json:"offsets_op" default:"continue" options:"continue|tail|reset"` //* OffsetsOp_ offsetsOp //> @3@4@5@6 //> //> It defines how many workers will be instantiated. //> Each worker: //> * Reads files (I/O bound) //> * Decodes events (CPU bound) //> > We recommend to set it to 4x-8x of CPU cores. WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*8" parse:"expression"` //* WorkersCount_ int //> @3@4@5@6 //> //> It defines how often to report statistical information to stdout ReportInterval cfg.Duration `json:"report_interval" default:"5s" parse:"duration"` //* ReportInterval_ time.Duration //> @3@4@5@6 //> //> It defines how often to perform maintenance //> @maintenance MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` //* MaintenanceInterval_ time.Duration //> @3@4@5@6 //> //> It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` //* }
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
type ResetterRegistry ¶
type ResetterRegistry struct {
// contains filtered or unexported fields
}
ResetterRegistry is a registry that holds map of pipeline names to file plugins and functions to reset their offsets.
func (*ResetterRegistry) AddResetter ¶
func (rr *ResetterRegistry) AddResetter(pipelineName string, plug *Plugin)
AddResetter adds plugin to the ResetterRegistry.
func (*ResetterRegistry) Reset ¶
func (rr *ResetterRegistry) Reset(_ http.ResponseWriter, request *http.Request)
Reset truncates jobs if the plugin has started or delete the whole offset file or just one entry if inode or source_id was setted in a request.
Click to show internal directories.
Click to hide internal directories.