Versions in this module Expand all Collapse all v1 v1.1.0 Jun 27, 2017 Changes in this version + const DirMode + const FileMode + const KeyBufSize + const KeyDataSourceTag + const KeyESHost + const KeyESIndex + const KeyESKeepAlive + const KeyESReadBatch + const KeyESType + const KeyEncoding + const KeyExpire + const KeyFileDone + const KeyHeadPattern + const KeyIgnoreFileSuffix + const KeyIgnoreHiddenFile + const KeyKafkaGroupID + const KeyKafkaTopic + const KeyKafkaZookeeper + const KeyLogPath + const KeyMaxOpenFiles + const KeyMetaPath + const KeyMode + const KeyMongoCert + const KeyMongoCollection + const KeyMongoCron + const KeyMongoDatabase + const KeyMongoExecOnstart + const KeyMongoFilters + const KeyMongoHost + const KeyMongoOffsetKey + const KeyMongoReadBatch + const KeyMssqlCron + const KeyMssqlDataBase + const KeyMssqlDataSource + const KeyMssqlExecOnStart + const KeyMssqlOffsetKey + const KeyMssqlReadBatch + const KeyMssqlSQL + const KeyMysqlCron + const KeyMysqlDataBase + const KeyMysqlDataSource + const KeyMysqlExecOnStart + const KeyMysqlOffsetKey + const KeyMysqlReadBatch + const KeyMysqlSQL + const KeyReadIOLimit + const KeyRunnerName + const KeyStatInterval + const KeyValidFilePattern + const KeyWhence + const MaxHeadPatternBufferSize + const ModeDir + const ModeElastic + const ModeFile + const ModeKafka + const ModeMongo + const ModeMssql + const ModeMysql + const ModeTailx + const MongoDefaultOffsetKey + const ReadModeHeadPatternRegexp + const ReadModeHeadPatternString + const SQL_SPLITER + const StatusInit + const StatusRunning + const StatusStoping + const StatusStopped + const WhenceNewest + const WhenceOldest + var ErrBufferFull = errors.New("bufio: buffer full") + var ErrFileNotDir = errors.New("file is not directory") + var ErrFileNotRegular = errors.New("file is not regular") + var ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte") + var ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune") + var ErrMetaFileRead = errors.New("cannot read meta file") + var ErrNegativeCount = errors.New("bufio: negative count") + var ErrNoFileChosen = errors.New("no files found") + var ErrStopped = errors.New("runner stopped") + var WaitNoSuchFile = time.Second + func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error) + type ActiveReader struct + func NewActiveReader(logPath, whence string, meta *Meta) (ar *ActiveReader, err error) + func (ar *ActiveReader) Close() error + func (ar *ActiveReader) Run() + func (ar *ActiveReader) SyncMeta() + type BufReader struct + func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error) + func (b *BufReader) Close() error + func (b *BufReader) Name() string + func (b *BufReader) ReadLine() (ret string, err error) + func (b *BufReader) ReadPattern() (string, error) + func (b *BufReader) ReadString(delim byte) (ret string, err error) + func (b *BufReader) SetMode(mode string, v interface{}) (err error) + func (b *BufReader) Source() string + func (b *BufReader) SyncMeta() + type CollectionFilter map[string]interface + type ElasticReader struct + func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, keepAlive string) (er *ElasticReader, err error) + func (er *ElasticReader) Close() (err error) + func (er *ElasticReader) Name() string + func (er *ElasticReader) ReadLine() (data string, err error) + func (er *ElasticReader) SetMode(mode string, v interface{}) error + func (er *ElasticReader) Source() string + func (er *ElasticReader) Start() + func (er *ElasticReader) SyncMeta() + type FileReader interface + Close func() error + Name func() string + Read func(p []byte) (n int, err error) + Source func() string + SyncMeta func() error + type KafkaReader struct + Consumer *consumergroup.ConsumerGroup + ConsumerGroup string + Topics []string + Whence string + ZookeeperChroot string + ZookeeperPeers []string + func NewKafkaReader(meta *Meta, consumerGroup string, topics []string, zookeeper []string, ...) (kr *KafkaReader, err error) + func (kr *KafkaReader) Close() (err error) + func (kr *KafkaReader) Name() string + func (kr *KafkaReader) ReadLine() (data string, err error) + func (kr *KafkaReader) SetMode(mode string, v interface{}) error + func (kr *KafkaReader) Source() string + func (kr *KafkaReader) Start() + func (kr *KafkaReader) SyncMeta() + type LastSync struct + type Meta struct + RunnerName string + func NewMeta(metadir, filedonedir, logpath, mode string, donefileRetention int) (m *Meta, err error) + func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error) + func (m *Meta) AppendDeleteFile(path string) (err error) + func (m *Meta) AppendDoneFile(path string) (err error) + func (m *Meta) BufFile() string + func (m *Meta) BufMetaFile() string + func (m *Meta) CacheLineFile() string + func (m *Meta) Clear() error + func (m *Meta) DeleteDoneFile(path string) error + func (m *Meta) DeleteFile() string + func (m *Meta) DoneFile() string + func (m *Meta) DoneFilePath() string + func (m *Meta) GetDataSourceTag() string + func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error) + func (m *Meta) GetEncodingWay() (e string) + func (m *Meta) GetMode() string + func (m *Meta) IsDoneFile(file string) bool + func (m *Meta) IsExist() bool + func (m *Meta) IsNotExist() bool + func (m *Meta) IsNotValid() bool + func (m *Meta) IsValid() bool + func (m *Meta) LogPath() string + func (m *Meta) MetaFile() string + func (m *Meta) ReadBuf(buf []byte) (n int, err error) + func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error) + func (m *Meta) ReadCacheLine() ([]byte, error) + func (m *Meta) ReadOffset() (currFile string, offset int64, err error) + func (m *Meta) SetEncodingWay(e string) + func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error) + func (m *Meta) WriteCacheLine(lines string) error + func (m *Meta) WriteOffset(currFile string, offset int64) (err error) + type MongoReader struct + Cron *cron.Cron + func NewMongoReader(meta *Meta, readBatch int, ...) (mr *MongoReader, err error) + func (mr *MongoReader) Close() (err error) + func (mr *MongoReader) Name() string + func (mr *MongoReader) ReadLine() (data string, err error) + func (mr *MongoReader) SetMode(mode string, v interface{}) error + func (mr *MongoReader) Source() string + func (mr *MongoReader) Start() + func (mr *MongoReader) SyncMeta() + type MultiReader struct + func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, ...) (mr *MultiReader, err error) + func (mr *MultiReader) Close() (err error) + func (mr *MultiReader) Expire() + func (mr *MultiReader) Name() string + func (mr *MultiReader) ReadLine() (data string, err error) + func (mr *MultiReader) SetMode(mode string, value interface{}) (err error) + func (mr *MultiReader) Source() string + func (mr *MultiReader) Start() + func (mr *MultiReader) StatLogPath() + func (mr *MultiReader) SyncMeta() + type Reader interface + Close func() error + Name func() string + ReadLine func() (string, error) + SetMode func(mode string, v interface{}) error + Source func() string + SyncMeta func() + func NewFileBufReader(conf conf.MapConf) (reader Reader, err error) + func NewFileBufReaderWithMeta(conf conf.MapConf, meta *Meta) (reader Reader, err error) + type SeqFile struct + func NewSeqFile(meta *Meta, path string, ignoreHidden bool, suffixes []string, ...) (sf *SeqFile, err error) + func (sf *SeqFile) Close() (err error) + func (sf *SeqFile) Name() string + func (sf *SeqFile) Read(p []byte) (n int, err error) + func (sf *SeqFile) Source() string + func (sf *SeqFile) SyncMeta() (err error) + type SingleFile struct + func NewSingleFile(meta *Meta, path, whence string) (sf *SingleFile, err error) + func (sf *SingleFile) Close() (err error) + func (sf *SingleFile) Name() string + func (sf *SingleFile) Read(p []byte) (n int, err error) + func (sf *SingleFile) Reopen() (err error) + func (sf *SingleFile) Source() string + func (sf *SingleFile) SyncMeta() error + type SqlReader struct + Cron *cron.Cron + func NewSQLReader(meta *Meta, readBatch int, ...) (mr *SqlReader, err error) + func (mr *SqlReader) Close() (err error) + func (mr *SqlReader) Name() string + func (mr *SqlReader) ReadLine() (data string, err error) + func (mr *SqlReader) SetMode(mode string, v interface{}) error + func (mr *SqlReader) Source() string + func (mr *SqlReader) Start() + func (mr *SqlReader) SyncMeta()