file

package
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AckStart = AckTaskType("start")
	AckStop  = AckTaskType("stop")
)
View Source
const (
	JobActive = JobStatus(1)
	JobDelete = JobStatus(2)
	JobStop   = JobStatus(3)
)
View Source
const (
	IsolationPipeline = IsolationLevel("pipeline")
	IsolationSource   = IsolationLevel("source")
	IsolationShare    = IsolationLevel("share")
)
View Source
const (
	MultiStart = MultiTaskType("start")
	MultiStop  = MultiTaskType("stop")
)
View Source
const (
	DeleteByIdOpt               = DbOptType(1)
	DeleteByJobUidOpt           = DbOptType(2)
	UpsertOffsetByJobWatchIdOpt = DbOptType(3)
	UpdateNameByJobWatchIdOpt   = DbOptType(4)
)
View Source
const (
	START = WatchTaskType("start")
	STOP  = WatchTaskType("stop")
)
View Source
const (
	CREATE = Operation(0)
	WRITE  = Operation(1)
	REMOVE = Operation(2)
	RENAME = Operation(3)
)
View Source
const (
	SystemStateKey = event.SystemKeyPrefix + "State"
)
View Source
const Type = "file"

Variables

View Source
var NilOfTime, _ = time.ParseInLocation("2006-01-02 15:04:05", "2008-08-08 08:08:08", time.Local)

Functions

func GetOrCreateShareDbHandler

func GetOrCreateShareDbHandler(config DbConfig) *dbHandler

func JobUid

func JobUid(fileInfo os.FileInfo) string

func NewAckWith

func NewAckWith(state *State) *ack

func RegisterProcessor added in v1.2.0

func RegisterProcessor(factory ProcessFactory)

func ReleaseAck

func ReleaseAck(a *ack)

func StopReader

func StopReader(isolation Isolation)

func WatchJobId

func WatchJobId(pipelineName string, sourceName string, jobUid string) string

Types

type AckChainHandler

type AckChainHandler struct {
	// contains filtered or unexported fields
}

func GetOrCreateShareAckChainHandler

func GetOrCreateShareAckChainHandler(sinkCount int, ackConfig AckConfig) *AckChainHandler

func NewAckChainHandler

func NewAckChainHandler(sinkCount int, ackConfig AckConfig) *AckChainHandler

func (*AckChainHandler) StartTask

func (ach *AckChainHandler) StartTask(task *AckTask)

func (*AckChainHandler) Stop

func (ach *AckChainHandler) Stop()

func (*AckChainHandler) StopTask

func (ach *AckChainHandler) StopTask(task *AckTask)

type AckConfig

type AckConfig struct {
	Enable              bool          `yaml:"enable,omitempty" default:"true"`
	MaintenanceInterval time.Duration `yaml:"maintenanceInterval,omitempty" default:"20h"`
}

type AckListener

type AckListener struct {
	// contains filtered or unexported fields
}

func (*AckListener) BeforeQueueConvertBatch

func (al *AckListener) BeforeQueueConvertBatch(events []api.Event)

func (*AckListener) Name

func (al *AckListener) Name() string

func (*AckListener) Stop

func (al *AckListener) Stop()

type AckTask

type AckTask struct {
	Epoch        *pipeline.Epoch
	PipelineName string
	SourceName   string

	StopCountDown *sync.WaitGroup
	// contains filtered or unexported fields
}

func NewAckTask

func NewAckTask(epoch *pipeline.Epoch, pipelineName string, sourceName string, persistenceFunc persistenceFunc) *AckTask

func (*AckTask) Key

func (at *AckTask) Key() string

func (*AckTask) NewAckChain

func (at *AckTask) NewAckChain(jobWatchUid string) *JobAckChain

type AckTaskType

type AckTaskType string

type CleanFiles

type CleanFiles struct {
	MaxHistoryDays int `yaml:"maxHistoryDays,omitempty"`
}

type CollectConfig

type CollectConfig struct {
	IsolationLevel           string        `yaml:"isolationLevel,omitempty" default:"share"`
	Paths                    []string      `yaml:"paths,omitempty" validate:"required"` // glob pattern
	ExcludeFiles             []string      `yaml:"excludeFiles,omitempty"`              // regular pattern
	IgnoreOlder              util.Duration `yaml:"ignoreOlder,omitempty"`
	IgnoreSymlink            bool          `yaml:"ignoreSymlink,omitempty" default:"false"`
	RereadTruncated          bool          `yaml:"rereadTruncated,omitempty" default:"true"`                           // Read from the beginning when the file is truncated
	FirstNBytesForIdentifier int           `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
	AddonMeta                bool          `yaml:"addonMeta,omitempty"`
	// contains filtered or unexported fields
}

func (CollectConfig) IsFileExcluded

func (cc CollectConfig) IsFileExcluded(file string) bool

func (CollectConfig) IsFileInclude

func (cc CollectConfig) IsFileInclude(file string) bool

func (CollectConfig) IsIgnoreOlder

func (cc CollectConfig) IsIgnoreOlder(info os.FileInfo) bool

type Config

type Config struct {
	AckConfig     AckConfig              `yaml:"ack,omitempty"`
	DbConfig      DbConfig               `yaml:"db,omitempty"`
	WatchConfig   WatchConfig            `yaml:"watcher,omitempty"`
	ReaderConfig  ReaderConfig           `yaml:",inline,omitempty"`
	CollectConfig CollectConfig          `yaml:",inline,omitempty" validate:"required,dive"`
	Isolation     string                 `yaml:"isolation,omitempty" default:"pipeline"`
	Fields        map[string]interface{} `yaml:"fields,omitempty"`
}

type DbConfig

type DbConfig struct {
	File                 string        `yaml:"file,omitempty" default:"./data/loggie.db"`
	FlushTimeout         time.Duration `yaml:"flushTimeout,omitempty" default:"2s"`
	BufferSize           int           `yaml:"bufferSize,omitempty" default:"2048"`
	TableName            string        `yaml:"tableName,omitempty" default:"registry"`
	CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted
	CleanScanInterval    time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"`
}

type DbOpt

type DbOpt struct {
	// contains filtered or unexported fields
}

type DbOptType

type DbOptType int

type Isolation

type Isolation struct {
	Level        IsolationLevel
	PipelineName string
	SourceName   string
}

type IsolationLevel

type IsolationLevel string

type Job

type Job struct {
	EofCount       int
	LastActiveTime time.Time
	// contains filtered or unexported fields
}

func NewJob

func NewJob(task *WatchTask, filename string, fileInfo os.FileInfo) *Job

func (*Job) Active

func (j *Job) Active() (error, bool)

func (*Job) ChangeStatusTo

func (j *Job) ChangeStatusTo(status JobStatus)

func (*Job) Delete

func (j *Job) Delete()

func (*Job) File added in v1.2.0

func (j *Job) File() *os.File

func (*Job) GenerateIdentifier

func (j *Job) GenerateIdentifier() error

func (*Job) Index

func (j *Job) Index() uint32

func (*Job) IsDelete

func (j *Job) IsDelete() bool

func (*Job) IsDeleteTimeout

func (j *Job) IsDeleteTimeout(timeout time.Duration) bool

func (*Job) IsRename

func (j *Job) IsRename() bool

func (*Job) IsSame

func (j *Job) IsSame(other *Job) bool

func (*Job) IsStop

func (j *Job) IsStop() bool

func (*Job) NextOffset

func (j *Job) NextOffset(offset int64)

func (*Job) ProductEvent

func (j *Job) ProductEvent(endOffset int64, collectTime time.Time, body []byte)

func (*Job) Read

func (j *Job) Read()

func (*Job) Release

func (j *Job) Release() bool

func (*Job) RenameTo

func (j *Job) RenameTo(newFilename string)

func (*Job) Stop

func (j *Job) Stop()

func (*Job) Sync

func (j *Job) Sync()

func (*Job) Uid

func (j *Job) Uid() string

func (*Job) WatchUid

func (j *Job) WatchUid() string

WatchUid Support repeated collection of the same file by different sources

type JobAckChain

type JobAckChain struct {
	Epoch        *pipeline.Epoch
	PipelineName string
	SourceName   string
	JobWatchUid  string

	Start time.Time
	// contains filtered or unexported fields
}

func (*JobAckChain) Ack

func (ac *JobAckChain) Ack(s *State)

func (*JobAckChain) Append

func (ac *JobAckChain) Append(s *State)

func (*JobAckChain) Key

func (ac *JobAckChain) Key() string

func (*JobAckChain) Release

func (ac *JobAckChain) Release()

type JobCollectContext added in v1.2.0

type JobCollectContext struct {
	Job           *Job
	Filename      string
	LastOffset    int64
	BacklogBuffer []byte
	ReadBuffer    []byte

	// runtime property
	WasSend bool
	IsEOF   bool
}

func NewJobCollectContextAndValidate added in v1.2.0

func NewJobCollectContextAndValidate(job *Job, readBuffer, backlogBuffer []byte) (*JobCollectContext, error)

type JobStatus

type JobStatus int

type MultiConfig

type MultiConfig struct {
	Active   bool          `yaml:"active,omitempty" default:"false"`
	Pattern  string        `yaml:"pattern,omitempty"`
	MaxLines int           `yaml:"maxLines,omitempty" default:"500"`
	MaxBytes int64         `yaml:"maxBytes,omitempty" default:"131072"` // default 128KB
	Timeout  time.Duration `yaml:"timeout,omitempty" default:"5s"`      // default 2 * read.timeout
}

type MultiHolder

type MultiHolder struct {
	// contains filtered or unexported fields
}

type MultiProcessor

type MultiProcessor struct {
	// contains filtered or unexported fields
}

func GetOrCreateShareMultilineProcessor

func GetOrCreateShareMultilineProcessor() *MultiProcessor

func NewMultiProcessor

func NewMultiProcessor() *MultiProcessor

func (*MultiProcessor) Process

func (mp *MultiProcessor) Process(event api.Event) api.Result

func (*MultiProcessor) StartTask

func (mp *MultiProcessor) StartTask(task *MultiTask)

func (*MultiProcessor) StopTask

func (mp *MultiProcessor) StopTask(task *MultiTask)

type MultiTask

type MultiTask struct {
	// contains filtered or unexported fields
}

func NewMultiTask

func NewMultiTask(epoch *pipeline.Epoch, sourceName string, config MultiConfig, eventPool *event.Pool, productFunc api.ProductFunc) *MultiTask

func (*MultiTask) String

func (mt *MultiTask) String() string

type MultiTaskType

type MultiTaskType string

type Operation

type Operation int

type ProcessChain added in v1.2.0

type ProcessChain interface {
	Process(ctx *JobCollectContext)
}

func NewProcessChain added in v1.2.0

func NewProcessChain(config ReaderConfig) ProcessChain

type ProcessFactory added in v1.2.0

type ProcessFactory func(config ReaderConfig) Processor

type Processor added in v1.2.0

type Processor interface {
	Order() int
	Code() string
	Process(processorChain ProcessChain, ctx *JobCollectContext)
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func GetOrCreateReader

func GetOrCreateReader(isolation Isolation, readerConfig ReaderConfig, watcher *Watcher) *Reader

func GetOrCreateShareReader

func GetOrCreateShareReader(readerConfig ReaderConfig, watcher *Watcher) *Reader

func (*Reader) Start

func (r *Reader) Start()

func (*Reader) Stop

func (r *Reader) Stop()

type ReaderConfig

type ReaderConfig struct {
	WorkerCount            int           `yaml:"workerCount,omitempty" default:"1"`
	ReadChanSize           int           `yaml:"readChanSize,omitempty" default:"512"`
	ReadBufferSize         int           `yaml:"readBufferSize,omitempty" default:"65536"` // The buffer size used for the file reading. default 65536 = 64k = 16*PAGE_SIZE
	MaxContinueRead        int           `yaml:"maxContinueRead,omitempty" default:"16"`
	MaxContinueReadTimeout time.Duration `yaml:"maxContinueReadTimeout,omitempty" default:"3s"`
	InactiveTimeout        time.Duration `yaml:"inactiveTimeout,omitempty" default:"3s"`
	MultiConfig            MultiConfig   `yaml:"multi,omitempty"`
}

type Source

type Source struct {
	// contains filtered or unexported fields
}

func (*Source) Category

func (s *Source) Category() api.Category

func (*Source) Commit

func (s *Source) Commit(events []api.Event)

func (*Source) Config

func (s *Source) Config() interface{}

func (*Source) HandleHttp

func (s *Source) HandleHttp()

func (*Source) Init

func (s *Source) Init(context api.Context) error

func (*Source) Product

func (s *Source) Product() api.Event

func (*Source) ProductLoop

func (s *Source) ProductLoop(productFunc api.ProductFunc)

func (*Source) SetCodec added in v1.2.0

func (s *Source) SetCodec(c codec.Codec)

func (*Source) Start

func (s *Source) Start() error

func (*Source) Stop

func (s *Source) Stop()

func (*Source) String

func (s *Source) String() string

func (*Source) Type

func (s *Source) Type() api.Type

type State

type State struct {
	Epoch        *pipeline.Epoch `json:"-"`
	PipelineName string          `json:"-"`
	SourceName   string          `json:"-"`
	Offset       int64           `json:"offset"`
	NextOffset   int64           `json:"nextOffset"`
	Filename     string          `json:"filename,omitempty"`
	CollectTime  time.Time       `json:"collectTime,omitempty"`
	ContentBytes int64           `json:"contentBytes"`
	JobUid       string          `json:"jobUid,omitempty"`
	JobIndex     uint32          `json:"-"`
	EventUid     string          `json:"-"`
	LineNumber   int64           `json:"lineNumber,omitempty"`
	Tags         string          `json:"tags,omitempty"`
	// contains filtered or unexported fields
}

func (*State) AppendTags

func (s *State) AppendTags(tag string)

func (*State) WatchUid

func (s *State) WatchUid() string

type WatchConfig

type WatchConfig struct {
	EnableOsWatch             bool          `yaml:"enableOsWatch,omitempty" default:"true"`
	ScanTimeInterval          time.Duration `yaml:"scanTimeInterval,omitempty" default:"10s"`
	MaintenanceInterval       time.Duration `yaml:"maintenanceInterval,omitempty" default:"5m"`
	CleanFiles                *CleanFiles   `yaml:"cleanFiles,omitempty"`
	FdHoldTimeoutWhenInactive time.Duration `yaml:"fdHoldTimeoutWhenInactive,omitempty" default:"5m"`
	FdHoldTimeoutWhenRemove   time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
	MaxOpenFds                int           `yaml:"maxOpenFds,omitempty" default:"512"`
	MaxEofCount               int           `yaml:"maxEofCount,omitempty" default:"3"`
	CleanWhenRemoved          bool          `yaml:"cleanWhenRemoved,omitempty" default:"true"`
	ReadFromTail              bool          `yaml:"readFromTail,omitempty" default:"false"`
	TaskStopTimeout           time.Duration `yaml:"taskStopTimeout,omitempty" default:"30s"`
}

type WatchTask

type WatchTask struct {
	// contains filtered or unexported fields
}

func NewWatchTask

func NewWatchTask(epoch *pipeline.Epoch, pipelineName string, sourceName string, config CollectConfig,
	eventPool *event.Pool, productFunc api.ProductFunc, activeChan chan *Job, sourceFields map[string]interface{}) *WatchTask

func (*WatchTask) StopJobsInfo

func (wt *WatchTask) StopJobsInfo() string

func (*WatchTask) String

func (wt *WatchTask) String() string

func (*WatchTask) WatchTaskKey

func (wt *WatchTask) WatchTaskKey() string

type WatchTaskType

type WatchTaskType string

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func GetOrCreateShareWatcher

func GetOrCreateShareWatcher(watchConfig WatchConfig, dbConfig DbConfig) *Watcher

func (*Watcher) StartWatchTask

func (w *Watcher) StartWatchTask(watchTask *WatchTask)

func (*Watcher) Stop

func (w *Watcher) Stop()

func (*Watcher) StopWatchTask

func (w *Watcher) StopWatchTask(watchTask *WatchTask)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL