Documentation ¶
Index ¶
- Variables
- func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)
- func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.SugaredLogger) *jobProvider
- func NewWatcher(path string, filenamePattern string, dirPattern string, notifyFn notifyFn, ...) *watcher
- type Config
- type Job
- type Plugin
- type ResetterRegistry
Constants ¶
This section is empty.
Variables ¶
View Source
var ResetterRegistryInstance = &ResetterRegistry{ pipelineToResetter: make(map[string]*resetter), }
ResetterRegistryInstance is an instance of the registry.
Functions ¶
func NewJobProvider ¶
func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.SugaredLogger) *jobProvider
func NewWatcher ¶
func NewWatcher( path string, filenamePattern string, dirPattern string, notifyFn notifyFn, shouldWatchWrites bool, notifyChannelLengthMetric prometheus.Gauge, logger *zap.SugaredLogger, ) *watcher
NewWatcher creates a watcher that see file creations in the path and if they match filePattern and dirPattern, pass them to notifyFn.
Types ¶
type Config ¶
type Config struct { // > @3@4@5@6 // > // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more // > different directories, it's recommended to setup separate pipelines for each. WatchingDir string `json:"watching_dir" required:"true"` // * // > @3@4@5@6 // > // > The filename to store offsets of processed files. Offsets are loaded only on initialization. // > > It's a `yaml` file. You can modify it manually. OffsetsFile string `json:"offsets_file" required:"true"` // * OffsetsFileTmp string // > @3@4@5@6 // > // > Files that don't meet this pattern will be ignored. // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. FilenamePattern string `json:"filename_pattern" default:"*"` // * // > @3@4@5@6 // > // > Dirs that don't meet this pattern will be ignored. // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. DirPattern string `json:"dir_pattern" default:"*"` // * // > @3@4@5@6 // > // > It defines how to save the offsets file: // > @persistenceMode|comment-list // > // > Save operation takes three steps: // > * Write the temporary file with all offsets; // > * Call `fsync()` on it; // > * Rename the temporary file to the original one. PersistenceMode string `json:"persistence_mode" default:"async" options:"async|sync"` // * PersistenceMode_ persistenceMode AsyncInterval cfg.Duration `json:"async_interval" default:"1s" parse:"duration"` // *! @3 @4 @5 @6 <br> <br> Offsets saving interval. Only used if `persistence_mode` is set to `async`. AsyncInterval_ time.Duration // > @3@4@5@6 // > // > The buffer size used for the file reading. // > > Each worker uses its own buffer so that final memory consumption will be `read_buffer_size*workers_count`. ReadBufferSize int `json:"read_buffer_size" default:"131072"` // * // > @3@4@5@6 // > // > The max amount of opened files. If the limit is exceeded, `file.d` will exit with fatal. // > > Also, it checks your system's file descriptors limit: `ulimit -n`. MaxFiles int `json:"max_files" default:"16384"` // * // > @3@4@5@6 // > // > An offset operation which will be performed when you add a file as a job: // > @offsetsOp|comment-list // > > It is only used on an initial scan of `watching_dir`. Files that will be caught up later during work will always use `reset` operation. OffsetsOp string `json:"offsets_op" default:"continue" options:"continue|tail|reset"` // * OffsetsOp_ offsetsOp // > @3@4@5@6 // > // > It defines how many workers will be instantiated. // > Each worker: // > * Reads files (I/O bound) // > * Decodes events (CPU bound) // > > We recommend to set it to 4x-8x of CPU cores. WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*8" parse:"expression"` // * WorkersCount_ int // > @3@4@5@6 // > // > It defines how often to report statistical information to stdout ReportInterval cfg.Duration `json:"report_interval" default:"5s" parse:"duration"` // * ReportInterval_ time.Duration // > @3@4@5@6 // > // > It defines how often to perform maintenance // > @maintenance MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // * MaintenanceInterval_ time.Duration // > @3@4@5@6 // > // > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // * }
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
type ResetterRegistry ¶
type ResetterRegistry struct {
// contains filtered or unexported fields
}
ResetterRegistry is a registry that holds map of pipeline names to file plugins and functions to reset their offsets.
func (*ResetterRegistry) AddResetter ¶
func (rr *ResetterRegistry) AddResetter(pipelineName string, plug *Plugin)
AddResetter adds plugin to the ResetterRegistry.
func (*ResetterRegistry) Reset ¶
func (rr *ResetterRegistry) Reset(_ http.ResponseWriter, request *http.Request)
Reset truncates jobs if the plugin has started or delete the whole offset file or just one entry if inode or source_id was setted in a request.
Click to show internal directories.
Click to hide internal directories.