Documentation ¶
Index ¶
- func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer
- func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, ...) fswatcher.FileTailer
- func InitWebhookTailer(inputConfig *v2.InputConfig) fswatcher.FileTailer
- func NewLineBuffer() lineBuffer
- func RunStdinTailer() fswatcher.FileTailer
- func WebhookHandler() http.Handler
- func WebhookProcessBody(c *v2.InputConfig, b []byte) []string
- type BufferLoadMetric
- type WebhookTailer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BufferedTailer ¶ added in v0.2.7
func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer
func BufferedTailerWithMetrics ¶ added in v0.2.7
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, log logrus.FieldLogger, maxLinesInBuffer int) fswatcher.FileTailer
Wrapper around a tailer that consumes the lines channel quickly. The idea is that the original tailer can continue reading lines from the logfile, and does not need to wait until the lines are processed. The number of buffered lines are exposed as a Prometheus metric, if lines are constantly produced faster than they are consumed, we will eventually run out of memory.
--- The buffered tailer prevents the following error (this can be reproduced on Windows, where we don't keep the logfile open):
Example test actions --------------------
Sequence of actions simulated in fileTailer_test:
1) write line a 2) write line b 3) move the old logfile away and create a new logfile 4) write line c
Good case event processing --------------------------
How Events.Process() should process the file system events triggered by the actions above:
1) MODIFIED : Process() reads line a 2) MODIFIED : Process() reads line b 3) MOVED_FROM, CREATED : Process() resets the line reader and seeks the file to position 0 4) MODIFIED : Process() reads line c
Bad case event processing -------------------------
When Events.Process() receives a MODIFIED event, it does not know how many lines have been written. Therefore, it reads all new lines until EOF is reached. If line processing is slow (writing to the lines channel blocks until all grok patterns are processed), we might read 'line b' while we are still processing the first MODIFIED event:
1) MODIFIED : Process() reads 'line a' and 'line b'
Meanwhile, the test continues with steps 3 and 4, moving the logfile away, creating a new logfile, and writing 'line c'. When the tailer receives the second MODIFIED event, it learns that the file has been truncated, seeks to position 0, and reads 'line c'.
2) MODIFIED : Process() detects the truncated file, seeks to position 0, reads 'line c'
The tailer now receives MOVED_FROM, which makes it close the logfile, CREATED, which makes it open the logfile and start reading from position 0:
3) MOVED_FROM, CREATED : seek to position 0, read line c again !!!
When the last MODIFIED event is processed, there are no more changes in the file:
4) MODIFIED : no changes in file
As a result, we read 'line c' two times.
To minimize the risk, use the buffered tailer to make sure file system events are handled as quickly as possible without waiting for the grok patterns to be processed.
func InitWebhookTailer ¶ added in v0.2.8
func InitWebhookTailer(inputConfig *v2.InputConfig) fswatcher.FileTailer
func NewLineBuffer ¶ added in v0.2.8
func NewLineBuffer() lineBuffer
func RunStdinTailer ¶
func RunStdinTailer() fswatcher.FileTailer
func WebhookHandler ¶ added in v0.2.8
func WebhookProcessBody ¶ added in v0.2.8
func WebhookProcessBody(c *v2.InputConfig, b []byte) []string
Types ¶
type BufferLoadMetric ¶ added in v0.2.7
type BufferLoadMetric interface { Start() Inc() // put a log line into the buffer Dec() // take a log line from the buffer Set(value int64) // set the current number of lines in the buffer Stop() }
type WebhookTailer ¶ added in v0.2.8
type WebhookTailer struct {
// contains filtered or unexported fields
}
func (*WebhookTailer) Close ¶ added in v0.2.8
func (t *WebhookTailer) Close()
func (*WebhookTailer) Errors ¶ added in v0.2.8
func (t *WebhookTailer) Errors() chan fswatcher.Error
func (*WebhookTailer) Lines ¶ added in v0.2.8
func (t *WebhookTailer) Lines() chan *fswatcher.Line
func (WebhookTailer) ServeHTTP ¶ added in v0.2.8
func (t WebhookTailer) ServeHTTP(w http.ResponseWriter, r *http.Request)