file

package
v0.0.0-...-83adff0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2020 License: GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecodeConfig

func DecodeConfig(md toml.MetaData, primValue toml.Primitive) (c interface{}, err error)

func NewFile

func NewFile(ctx context.Context, config interface{}, output chan<- *event.ProcessorEvent) (input.Input, error)

func SafeFileRotate

func SafeFileRotate(path, tempfile string) error

SafeFileRotate safely rotates an existing file under path and replaces it with the tempfile

func ScanLines

func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error)

Types

type Config

type Config struct {
	Paths           []string               `toml:"paths"`
	Symlinks        bool                   `toml:"symlinks"`
	AppId           string                 `toml:"appId"`
	LogId           string                 `toml:"logId"`
	ConfigPath      string                 `toml:"-"`
	MetaPath        string                 `toml:"-"`
	ID              string                 `toml:"-"`
	ReadFrom        string                 `toml:"readFrom"`
	MaxLength       int                    `toml:"maxLength"`
	IgnoreOlder     xtime.Duration         `toml:"ignoreOlder"`
	CleanFilesOlder xtime.Duration         `toml:"cleanFilesOlder"`
	ScanFrequency   xtime.Duration         `toml:"scanFrequency"`
	CleanInactive   xtime.Duration         `toml:"cleanInactive"`
	HarvesterTTL    xtime.Duration         `toml:"harvesterTTL"` // harvester will stop itself if inactive longer than HarvesterTTL
	Multiline       *MultilineConf         `toml:"multiline"`
	Timeout         xtime.Duration         `toml:"timeout"`
	Fields          map[string]interface{} `toml:"fields"`
}

func (*Config) ConfigValidate

func (c *Config) ConfigValidate() error

type File

type File struct {
	// contains filtered or unexported fields
}

func (*File) Ctx

func (f *File) Ctx() context.Context

func (*File) Run

func (f *File) Run() (err error)

func (*File) Stop

func (f *File) Stop()

type Harvester

type Harvester struct {
	// contains filtered or unexported fields
}

Harvester contains all harvester related data

func NewHarvester

func NewHarvester(c *Config, register *Registrar, state State, output chan<- *event.ProcessorEvent) (*Harvester, error)

NewHarvester creates a new harvester

func (*Harvester) Run

func (h *Harvester) Run()

Run start the harvester and reads files line by line and sends events to the defined output

func (*Harvester) Setup

func (h *Harvester) Setup() error

Setup opens the file handler and creates the reader for the harvester

func (*Harvester) Stop

func (h *Harvester) Stop()

func (*Harvester) WriteToProcessor

func (h *Harvester) WriteToProcessor(message []byte)

type Hfile

type Hfile struct {
	*os.File
}

func (Hfile) Continuable

func (Hfile) Continuable() bool

func (Hfile) HasState

func (Hfile) HasState() bool

type LineReader

type LineReader struct {
	// contains filtered or unexported fields
}

func NewLineReader

func NewLineReader(input io.Reader, bufferSize int) (*LineReader, error)

New creates a new reader object

func (*LineReader) Next

func (r *LineReader) Next() ([]byte, int, error)

Next reads the next line until the new line character

type MultilineConf

type MultilineConf struct {
	Pattern  string `toml:"pattern"`
	MaxLines int    `toml:"maxLines"`
}

func (*MultilineConf) ConfigValidate

func (c *MultilineConf) ConfigValidate() error

type Reader

type Reader interface {
	Next() ([]byte, int, error)
}

type Registrar

type Registrar struct {
	Channel chan State
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry(ctx context.Context, registryFile string) (*Registrar, error)

New creates a new Registrar instance, updating the registry file on `file.State` updates. New fails if the file can not be opened or created.

func (*Registrar) FindPrevious

func (r *Registrar) FindPrevious(newState State) State

FindPrevious lookups a registered state, that matching the new state. Returns a zero-state if no match is found.

func (*Registrar) GetHarvester

func (r *Registrar) GetHarvester(i uint64) *Harvester

func (*Registrar) Init

func (r *Registrar) Init() (err error)

Init sets up the Registrar and make sure the registry file is setup correctly

func (*Registrar) RegisterHarvester

func (r *Registrar) RegisterHarvester(h *Harvester) error

func (*Registrar) Run

func (r *Registrar) Run()

func (*Registrar) SendStateUpdate

func (r *Registrar) SendStateUpdate(state State)

func (*Registrar) UnRegisterHarvester

func (r *Registrar) UnRegisterHarvester(h *Harvester) error

type Source

type Source interface {
	io.ReadCloser
	Name() string
	Stat() (os.FileInfo, error)
	Continuable() bool // can we continue processing after EOF?
	HasState() bool    // does this source have a state?
}

type State

type State struct {
	Source    string            `json:"source"`
	Offset    int64             `json:"offset"`
	Inode     uint64            `json:"inode"`
	Fileinfo  os.FileInfo       `json:"-"` // the file info
	Timestamp time.Time         `json:"timestamp"`
	Finished  bool              `json:"finished"`
	Meta      map[string]string `json:"meta"`
	TTL       time.Duration     `json:"ttl"`
}

func NewState

func NewState(fileInfo os.FileInfo, path string) State

NewState creates a new file state

func (*State) ID

func (s *State) ID() uint64

func (*State) IsEmpty

func (s *State) IsEmpty() bool

IsEmpty returns true if the state is empty

func (*State) IsEqual

func (s *State) IsEqual(c *State) bool

IsEqual compares the state to an other state supporting stringer based on the unique string

type States

type States struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

States handles list of FileState. One must use NewStates to instantiate a file states registry. Using the zero-value is not safe.

func NewStates

func NewStates() *States

NewStates generates a new states registry.

func (*States) Cleanup

func (s *States) Cleanup() int

Cleanup cleans up the state array. All states which are older then `older` are removed The number of states that were cleaned up is returned.

func (*States) FindPrevious

func (s *States) FindPrevious(newState State) State

FindPrevious lookups a registered state, that matching the new state. Returns a zero-state if no match is found.

func (*States) GetState

func (s *States) GetState(id uint64) State

GetStates creates copy of the file states.

func (*States) SetStates

func (s *States) SetStates(states map[uint64]State)

SetStates overwrites all internal states with the given states array

func (*States) Update

func (s *States) Update(newState State)

Update updates a state. If previous state didn't exist, new one is created

func (*States) UpdateWithTs

func (s *States) UpdateWithTs(newState State, ts time.Time)

UpdateWithTs updates a state, assigning the given timestamp. If previous state didn't exist, new one is created

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL