Documentation ¶
Index ¶
- Constants
- Variables
- func GetOrCreateShareDbHandler(config DbConfig) *dbHandler
- func JobUid(fileInfo os.FileInfo) string
- func NewAckWith(state *State) *ack
- func RegisterProcessor(factory ProcessFactory)
- func ReleaseAck(a *ack)
- func StopReader(isolation Isolation)
- func WatchJobId(pipelineName string, sourceName string, jobUid string) string
- type AckChainHandler
- type AckConfig
- type AckListener
- type AckTask
- type AckTaskType
- type CleanFiles
- type CollectConfig
- type Config
- type DbConfig
- type DbOpt
- type DbOptType
- type Isolation
- type IsolationLevel
- type Job
- func (j *Job) Active() (error, bool)
- func (j *Job) ChangeStatusTo(status JobStatus)
- func (j *Job) Delete()
- func (j *Job) File() *os.File
- func (j *Job) GenerateIdentifier() error
- func (j *Job) Index() uint32
- func (j *Job) IsDelete() bool
- func (j *Job) IsDeleteTimeout(timeout time.Duration) bool
- func (j *Job) IsRename() bool
- func (j *Job) IsSame(other *Job) bool
- func (j *Job) IsStop() bool
- func (j *Job) NextOffset(offset int64)
- func (j *Job) ProductEvent(endOffset int64, collectTime time.Time, body []byte)
- func (j *Job) Read()
- func (j *Job) Release() bool
- func (j *Job) RenameTo(newFilename string)
- func (j *Job) Stop()
- func (j *Job) Sync()
- func (j *Job) Uid() string
- func (j *Job) WatchUid() string
- type JobAckChain
- type JobCollectContext
- type JobStatus
- type MultiConfig
- type MultiHolder
- type MultiProcessor
- type MultiTask
- type MultiTaskType
- type Operation
- type ProcessChain
- type ProcessFactory
- type Processor
- type Reader
- type ReaderConfig
- type Source
- func (s *Source) Category() api.Category
- func (s *Source) Commit(events []api.Event)
- func (s *Source) Config() interface{}
- func (s *Source) HandleHttp()
- func (s *Source) Init(context api.Context) error
- func (s *Source) Product() api.Event
- func (s *Source) ProductLoop(productFunc api.ProductFunc)
- func (s *Source) SetCodec(c codec.Codec)
- func (s *Source) Start() error
- func (s *Source) Stop()
- func (s *Source) String() string
- func (s *Source) Type() api.Type
- type State
- type WatchConfig
- type WatchTask
- type WatchTaskType
- type Watcher
Constants ¶
View Source
const ( AckStart = AckTaskType("start") AckStop = AckTaskType("stop") )
View Source
const ( JobActive = JobStatus(1) JobDelete = JobStatus(2) JobStop = JobStatus(3) )
View Source
const ( IsolationPipeline = IsolationLevel("pipeline") IsolationSource = IsolationLevel("source") )
View Source
const ( MultiStart = MultiTaskType("start") MultiStop = MultiTaskType("stop") )
View Source
const ( DeleteByIdOpt = DbOptType(1) DeleteByJobUidOpt = DbOptType(2) UpsertOffsetByJobWatchIdOpt = DbOptType(3) UpdateNameByJobWatchIdOpt = DbOptType(4) )
View Source
const ( START = WatchTaskType("start") STOP = WatchTaskType("stop") )
View Source
const ( CREATE = Operation(0) WRITE = Operation(1) REMOVE = Operation(2) RENAME = Operation(3) )
View Source
const (
SystemStateKey = event.SystemKeyPrefix + "State"
)
View Source
const Type = "file"
Variables ¶
View Source
var NilOfTime, _ = time.ParseInLocation("2006-01-02 15:04:05", "2008-08-08 08:08:08", time.Local)
Functions ¶
func GetOrCreateShareDbHandler ¶
func GetOrCreateShareDbHandler(config DbConfig) *dbHandler
func NewAckWith ¶
func NewAckWith(state *State) *ack
func RegisterProcessor ¶ added in v1.2.0
func RegisterProcessor(factory ProcessFactory)
func ReleaseAck ¶
func ReleaseAck(a *ack)
func StopReader ¶
func StopReader(isolation Isolation)
Types ¶
type AckChainHandler ¶
type AckChainHandler struct {
// contains filtered or unexported fields
}
func GetOrCreateShareAckChainHandler ¶
func GetOrCreateShareAckChainHandler(sinkCount int, ackConfig AckConfig) *AckChainHandler
func NewAckChainHandler ¶
func NewAckChainHandler(sinkCount int, ackConfig AckConfig) *AckChainHandler
func (*AckChainHandler) StartTask ¶
func (ach *AckChainHandler) StartTask(task *AckTask)
func (*AckChainHandler) Stop ¶
func (ach *AckChainHandler) Stop()
func (*AckChainHandler) StopTask ¶
func (ach *AckChainHandler) StopTask(task *AckTask)
type AckListener ¶
type AckListener struct {
// contains filtered or unexported fields
}
func (*AckListener) BeforeQueueConvertBatch ¶
func (al *AckListener) BeforeQueueConvertBatch(events []api.Event)
func (*AckListener) Name ¶
func (al *AckListener) Name() string
func (*AckListener) Stop ¶
func (al *AckListener) Stop()
type AckTask ¶
type AckTask struct { Epoch *pipeline.Epoch PipelineName string SourceName string StopCountDown *sync.WaitGroup // contains filtered or unexported fields }
func NewAckTask ¶
func (*AckTask) NewAckChain ¶
func (at *AckTask) NewAckChain(jobWatchUid string) *JobAckChain
type AckTaskType ¶
type AckTaskType string
type CleanFiles ¶
type CleanFiles struct {
MaxHistoryDays int `yaml:"maxHistoryDays,omitempty"`
}
type CollectConfig ¶
type CollectConfig struct { IsolationLevel string `yaml:"isolationLevel,omitempty" default:"share"` Paths []string `yaml:"paths,omitempty" validate:"required"` // glob pattern ExcludeFiles []string `yaml:"excludeFiles,omitempty"` // regular pattern IgnoreOlder util.Duration `yaml:"ignoreOlder,omitempty"` IgnoreSymlink bool `yaml:"ignoreSymlink,omitempty" default:"false"` RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected AddonMeta bool `yaml:"addonMeta,omitempty"` // contains filtered or unexported fields }
func (CollectConfig) IsFileExcluded ¶
func (cc CollectConfig) IsFileExcluded(file string) bool
func (CollectConfig) IsFileInclude ¶
func (cc CollectConfig) IsFileInclude(file string) bool
func (CollectConfig) IsIgnoreOlder ¶
func (cc CollectConfig) IsIgnoreOlder(info os.FileInfo) bool
type Config ¶
type Config struct { AckConfig AckConfig `yaml:"ack,omitempty"` DbConfig DbConfig `yaml:"db,omitempty"` WatchConfig WatchConfig `yaml:"watcher,omitempty"` ReaderConfig ReaderConfig `yaml:",inline,omitempty"` CollectConfig CollectConfig `yaml:",inline,omitempty" validate:"required,dive"` Isolation string `yaml:"isolation,omitempty" default:"pipeline"` Fields map[string]interface{} `yaml:"fields,omitempty"` }
type DbConfig ¶
type DbConfig struct { File string `yaml:"file,omitempty" default:"./data/loggie.db"` FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"` BufferSize int `yaml:"bufferSize,omitempty" default:"2048"` TableName string `yaml:"tableName,omitempty" default:"registry"` CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"` }
type Isolation ¶
type Isolation struct { Level IsolationLevel PipelineName string SourceName string }
type IsolationLevel ¶
type IsolationLevel string
type Job ¶
func (*Job) ChangeStatusTo ¶
func (*Job) GenerateIdentifier ¶
func (*Job) NextOffset ¶
func (*Job) ProductEvent ¶
type JobAckChain ¶
type JobAckChain struct { Epoch *pipeline.Epoch PipelineName string SourceName string JobWatchUid string Start time.Time // contains filtered or unexported fields }
func (*JobAckChain) Ack ¶
func (ac *JobAckChain) Ack(s *State)
func (*JobAckChain) Append ¶
func (ac *JobAckChain) Append(s *State)
func (*JobAckChain) Key ¶
func (ac *JobAckChain) Key() string
func (*JobAckChain) Release ¶
func (ac *JobAckChain) Release()
type JobCollectContext ¶ added in v1.2.0
type JobCollectContext struct { Job *Job Filename string LastOffset int64 BacklogBuffer []byte ReadBuffer []byte // runtime property WasSend bool IsEOF bool }
func NewJobCollectContextAndValidate ¶ added in v1.2.0
func NewJobCollectContextAndValidate(job *Job, readBuffer, backlogBuffer []byte) (*JobCollectContext, error)
type MultiConfig ¶
type MultiConfig struct { Active bool `yaml:"active,omitempty" default:"false"` Pattern string `yaml:"pattern,omitempty"` MaxLines int `yaml:"maxLines,omitempty" default:"500"` MaxBytes int64 `yaml:"maxBytes,omitempty" default:"131072"` // default 128KB Timeout time.Duration `yaml:"timeout,omitempty" default:"5s"` // default 2 * read.timeout }
type MultiHolder ¶
type MultiHolder struct {
// contains filtered or unexported fields
}
type MultiProcessor ¶
type MultiProcessor struct {
// contains filtered or unexported fields
}
func GetOrCreateShareMultilineProcessor ¶
func GetOrCreateShareMultilineProcessor() *MultiProcessor
func NewMultiProcessor ¶
func NewMultiProcessor() *MultiProcessor
func (*MultiProcessor) StartTask ¶
func (mp *MultiProcessor) StartTask(task *MultiTask)
func (*MultiProcessor) StopTask ¶
func (mp *MultiProcessor) StopTask(task *MultiTask)
type MultiTask ¶
type MultiTask struct {
// contains filtered or unexported fields
}
func NewMultiTask ¶
func NewMultiTask(epoch *pipeline.Epoch, sourceName string, config MultiConfig, eventPool *event.Pool, productFunc api.ProductFunc) *MultiTask
type MultiTaskType ¶
type MultiTaskType string
type ProcessChain ¶ added in v1.2.0
type ProcessChain interface {
Process(ctx *JobCollectContext)
}
func NewProcessChain ¶ added in v1.2.0
func NewProcessChain(config ReaderConfig) ProcessChain
type ProcessFactory ¶ added in v1.2.0
type ProcessFactory func(config ReaderConfig) Processor
type Processor ¶ added in v1.2.0
type Processor interface { Order() int Code() string Process(processorChain ProcessChain, ctx *JobCollectContext) }
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
func GetOrCreateReader ¶
func GetOrCreateReader(isolation Isolation, readerConfig ReaderConfig, watcher *Watcher) *Reader
func GetOrCreateShareReader ¶
func GetOrCreateShareReader(readerConfig ReaderConfig, watcher *Watcher) *Reader
type ReaderConfig ¶
type ReaderConfig struct { WorkerCount int `yaml:"workerCount,omitempty" default:"1"` ReadChanSize int `yaml:"readChanSize,omitempty" default:"512"` ReadBufferSize int `yaml:"readBufferSize,omitempty" default:"65536"` // The buffer size used for the file reading. default 65536 = 64k = 16*PAGE_SIZE MaxContinueRead int `yaml:"maxContinueRead,omitempty" default:"16"` MaxContinueReadTimeout time.Duration `yaml:"maxContinueReadTimeout,omitempty" default:"3s"` InactiveTimeout time.Duration `yaml:"inactiveTimeout,omitempty" default:"3s"` MultiConfig MultiConfig `yaml:"multi,omitempty"` }
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
func (*Source) HandleHttp ¶
func (s *Source) HandleHttp()
func (*Source) ProductLoop ¶
func (s *Source) ProductLoop(productFunc api.ProductFunc)
type State ¶
type State struct { Epoch *pipeline.Epoch `json:"-"` PipelineName string `json:"-"` SourceName string `json:"-"` Offset int64 `json:"offset"` NextOffset int64 `json:"nextOffset"` Filename string `json:"filename,omitempty"` CollectTime time.Time `json:"collectTime,omitempty"` ContentBytes int64 `json:"contentBytes"` JobUid string `json:"jobUid,omitempty"` JobIndex uint32 `json:"-"` EventUid string `json:"-"` LineNumber int64 `json:"lineNumber,omitempty"` Tags string `json:"tags,omitempty"` // contains filtered or unexported fields }
func (*State) AppendTags ¶
type WatchConfig ¶
type WatchConfig struct { EnableOsWatch bool `yaml:"enableOsWatch,omitempty" default:"true"` ScanTimeInterval time.Duration `yaml:"scanTimeInterval,omitempty" default:"10s"` MaintenanceInterval time.Duration `yaml:"maintenanceInterval,omitempty" default:"5m"` CleanFiles *CleanFiles `yaml:"cleanFiles,omitempty"` FdHoldTimeoutWhenInactive time.Duration `yaml:"fdHoldTimeoutWhenInactive,omitempty" default:"5m"` FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"` MaxOpenFds int `yaml:"maxOpenFds,omitempty" default:"512"` MaxEofCount int `yaml:"maxEofCount,omitempty" default:"3"` CleanWhenRemoved bool `yaml:"cleanWhenRemoved,omitempty" default:"true"` ReadFromTail bool `yaml:"readFromTail,omitempty" default:"false"` TaskStopTimeout time.Duration `yaml:"taskStopTimeout,omitempty" default:"30s"` }
type WatchTask ¶
type WatchTask struct {
// contains filtered or unexported fields
}
func NewWatchTask ¶
func (*WatchTask) StopJobsInfo ¶
func (*WatchTask) WatchTaskKey ¶
type WatchTaskType ¶
type WatchTaskType string
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
func GetOrCreateShareWatcher ¶
func GetOrCreateShareWatcher(watchConfig WatchConfig, dbConfig DbConfig) *Watcher
func (*Watcher) StartWatchTask ¶
func (*Watcher) StopWatchTask ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.