Documentation ¶
Overview ¶
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Index ¶
- Constants
- Variables
- func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)
- type ActiveReader
- type BufReader
- func (b *BufReader) Close() error
- func (b *BufReader) Name() string
- func (b *BufReader) ReadLine() (ret string, err error)
- func (b *BufReader) ReadPattern() (string, error)
- func (b *BufReader) ReadString(delim byte) (ret string, err error)
- func (b *BufReader) SetMode(mode string, v interface{}) (err error)
- func (b *BufReader) Source() string
- func (b *BufReader) Status() utils.StatsInfo
- func (b *BufReader) SyncMeta()
- type CollectionFilter
- type ElasticReader
- func (er *ElasticReader) Close() (err error)
- func (er *ElasticReader) Name() string
- func (er *ElasticReader) ReadLine() (data string, err error)
- func (er *ElasticReader) SetMode(mode string, v interface{}) error
- func (er *ElasticReader) Source() string
- func (er *ElasticReader) Start()
- func (er *ElasticReader) Status() utils.StatsInfo
- func (er *ElasticReader) SyncMeta()
- type FileReader
- type KafkaReader
- func (kr *KafkaReader) Close() (err error)
- func (kr *KafkaReader) Name() string
- func (kr *KafkaReader) ReadLine() (data string, err error)
- func (kr *KafkaReader) SetMode(mode string, v interface{}) error
- func (kr *KafkaReader) Source() string
- func (kr *KafkaReader) Start()
- func (kr *KafkaReader) Status() utils.StatsInfo
- func (kr *KafkaReader) SyncMeta()
- type LastSync
- type Meta
- func (m *Meta) AppendDeleteFile(path string) (err error)
- func (m *Meta) AppendDoneFile(path string) (err error)
- func (m *Meta) BufFile() string
- func (m *Meta) BufMetaFile() string
- func (m *Meta) CacheLineFile() string
- func (m *Meta) Clear() error
- func (m *Meta) DeleteDoneFile(path string) error
- func (m *Meta) DeleteFile() string
- func (m *Meta) DoneFile() string
- func (m *Meta) DoneFilePath() string
- func (m *Meta) FtSaveLogPath() string
- func (m *Meta) GetDataSourceTag() string
- func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)
- func (m *Meta) GetEncodingWay() (e string)
- func (m *Meta) GetMode() string
- func (m *Meta) IsDoneFile(file string) bool
- func (m *Meta) IsExist() bool
- func (m *Meta) IsFileMode() bool
- func (m *Meta) IsNotExist() bool
- func (m *Meta) IsNotValid() bool
- func (m *Meta) IsValid() bool
- func (m *Meta) LogPath() string
- func (m *Meta) MetaFile() string
- func (m *Meta) ReadBuf(buf []byte) (n int, err error)
- func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)
- func (m *Meta) ReadCacheLine() ([]byte, error)
- func (m *Meta) ReadOffset() (currFile string, offset int64, err error)
- func (m *Meta) ReadStatistic() (stat Statistic, err error)
- func (b *Meta) Reset() error
- func (m *Meta) SetEncodingWay(e string)
- func (m *Meta) StatisticFile() string
- func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)
- func (m *Meta) WriteCacheLine(lines string) error
- func (m *Meta) WriteOffset(currFile string, offset int64) (err error)
- func (m *Meta) WriteStatistic(stat *Statistic) error
- type MongoReader
- func (mr *MongoReader) Close() (err error)
- func (mr *MongoReader) LoopRun()
- func (mr *MongoReader) Name() string
- func (mr *MongoReader) ReadLine() (data string, err error)
- func (mr *MongoReader) SetMode(mode string, v interface{}) error
- func (mr *MongoReader) Source() string
- func (mr *MongoReader) Start()
- func (mr *MongoReader) Status() utils.StatsInfo
- func (mr *MongoReader) SyncMeta()
- type MultiReader
- func (mr *MultiReader) Close() (err error)
- func (mr *MultiReader) Expire()
- func (mr *MultiReader) Name() string
- func (mr *MultiReader) ReadLine() (data string, err error)
- func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)
- func (mr *MultiReader) Source() string
- func (mr *MultiReader) Start()
- func (mr *MultiReader) StatLogPath()
- func (mr *MultiReader) Status() utils.StatsInfo
- func (mr *MultiReader) SyncMeta()
- type Reader
- type RedisOptionn
- type RedisReader
- func (rr *RedisReader) Close() (err error)
- func (rr *RedisReader) Name() string
- func (rr *RedisReader) ReadLine() (data string, err error)
- func (rr *RedisReader) SetMode(mode string, v interface{}) error
- func (rr *RedisReader) Source() string
- func (rr *RedisReader) Start()
- func (rr *RedisReader) Status() utils.StatsInfo
- func (rr *RedisReader) SyncMeta()
- type Result
- type SeqFile
- type ServerReader
- type SingleFile
- type SqlReader
- func (mr *SqlReader) Close() (err error)
- func (mr *SqlReader) LoopRun()
- func (mr *SqlReader) Name() string
- func (mr *SqlReader) ReadLine() (data string, err error)
- func (mr *SqlReader) SetMode(mode string, v interface{}) error
- func (mr *SqlReader) Source() string
- func (mr *SqlReader) Start()
- func (mr *SqlReader) Status() utils.StatsInfo
- func (mr *SqlReader) SyncMeta()
- type Statistic
- type StatsReader
Constants ¶
const ( KeyLogPath = "log_path" KeyMetaPath = "meta_path" KeyFileDone = "file_done" KeyMode = "mode" KeyBufSize = "reader_buf_size" KeyWhence = "read_from" KeyEncoding = "encoding" KeyReadIOLimit = "readio_limit" KeyDataSourceTag = "datasource_tag" KeyHeadPattern = "head_pattern" KeyRunnerName = "runner_name" // 忽略隐藏文件 KeyIgnoreHiddenFile = "ignore_hidden" KeyIgnoreFileSuffix = "ignore_file_suffix" KeyValidFilePattern = "valid_file_pattern" KeyExpire = "expire" KeyMaxOpenFiles = "max_open_files" KeyStatInterval = "stat_interval" KeyMysqlOffsetKey = "mysql_offset_key" KeyMysqlReadBatch = "mysql_limit_batch" KeyMysqlDataSource = "mysql_datasource" KeyMysqlDataBase = "mysql_database" KeyMysqlSQL = "mysql_sql" KeyMysqlCron = "mysql_cron" KeyMysqlExecOnStart = "mysql_exec_onstart" KeySQLSchema = "sql_schema" KeyMssqlOffsetKey = "mssql_offset_key" KeyMssqlReadBatch = "mssql_limit_batch" KeyMssqlDataSource = "mssql_datasource" KeyMssqlDataBase = "mssql_database" KeyMssqlSQL = "mssql_sql" KeyMssqlCron = "mssql_cron" KeyMssqlExecOnStart = "mssql_exec_onstart" KeyESReadBatch = "es_limit_batch" KeyESIndex = "es_index" KeyESType = "es_type" KeyESHost = "es_host" KeyESKeepAlive = "es_keepalive" KeyESVersion = "es_version" KeyMongoHost = "mongo_host" KeyMongoDatabase = "mongo_database" KeyMongoCollection = "mongo_collection" KeyMongoOffsetKey = "mongo_offset_key" KeyMongoReadBatch = "mongo_limit_batch" KeyMongoCron = "mongo_cron" KeyMongoExecOnstart = "mongo_exec_onstart" KeyMongoFilters = "mongo_filters" KeyMongoCert = "mongo_cacert" KeyKafkaGroupID = "kafka_groupid" KeyKafkaTopic = "kafka_topic" KeyKafkaZookeeper = "kafka_zookeeper" KeyKafkaZookeeperTimeout = "kafka_zookeeper_timeout" )
FileReader's conf keys
const ( ModeDir = "dir" ModeFile = "file" ModeTailx = "tailx" ModeMysql = "mysql" ModeMssql = "mssql" ModeElastic = "elastic" ModeMongo = "mongo" ModeKafka = "kafka" ModeRedis = "redis" )
FileReader's modes
const ( ReadModeHeadPatternString = "mode_head_pattern_string" ReadModeHeadPatternRegexp = "mode_head_pattern_regexp" )
const ( WhenceOldest = "oldest" WhenceNewest = "newest" )
KeyWhence 的可选项
const ( DataTypeList = "list" DataTypeChannel = "channel" DataTypePatterChannel = "pattern_channel" )
const ( KeyRedisDataType = "redis_datatype" // 必填 KeyRedisDB = "redis_db" //默认 是0 KeyRedisKey = "redis_key" //必填 KeyRedisAddress = "redis_address" // 默认127.0.0.1:6379 KeyRedisPassword = "redis_password" KeyTimeoutDuration = "redis_timeout" )
const ( StatusInit int32 = iota StatusStopped StatusStoping StatusRunning )
const DirMode = "dir"
DirMode 按时间顺序顺次读取文件夹下所有文件的模式
const FileMode = "file"
FileMode 读取单个文件模式
const (
Loop = "loop"
)
const (
MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
const (
MongoDefaultOffsetKey = "_id"
)
const (
SQL_SPLITER = ";"
)
Variables ¶
var ( ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte") ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune") ErrBufferFull = errors.New("bufio: buffer full") ErrNegativeCount = errors.New("bufio: negative count") )
var ( ElasticVersion2 = "2.x" ElasticVersion5 = "5.x" )
var ErrFileNotDir = errors.New("file is not directory")
var ErrFileNotRegular = errors.New("file is not regular")
var ErrMetaFileRead = errors.New("cannot read meta file")
var ErrNoFileChosen = errors.New("no files found")
var ErrStopped = errors.New("runner stopped")
var ModeKeyOptions = map[string][]utils.Option{ ModeDir: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "/home/users/john/log/", DefaultNoUse: true, Description: "日志文件夹路径(log_path)", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, { KeyName: KeyFileDone, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "读取过的文件信息保存路径(file_done)", }, { KeyName: KeyBufSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "文件缓存数据大小(reader_buf_size)", CheckRegex: "\\d+", }, { KeyName: KeyWhence, ChooseOnly: true, ChooseOptions: []string{WhenceOldest, WhenceNewest}, Description: "读取的起始位置(read_from)", }, { KeyName: KeyEncoding, ChooseOnly: true, ChooseOptions: []string{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1", "GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4", "macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256", "windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995", "ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995", "ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999", "KOI8-R", "KOI8-U", "ebcdic-xml-us"}, Default: "UTF-8", DefaultNoUse: false, Description: "编码方式(encoding)", }, OptionDataSourceTag, { KeyName: KeyReadIOLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "读取速度限制(MB/s)(readio_limit)", CheckRegex: "\\d+", }, { KeyName: KeyHeadPattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "多行读取的起始行正则表达式(head_pattern)", }, { KeyName: KeyIgnoreHiddenFile, ChooseOnly: true, ChooseOptions: []string{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否忽略隐藏文件(ignore_hidden)", }, { KeyName: KeyIgnoreFileSuffix, ChooseOnly: false, Default: strings.Join(defaultIgnoreFileSuffix, ","), DefaultNoUse: false, Description: "根据后缀忽略文件(ignore_file_suffix)", }, { KeyName: KeyValidFilePattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "根据正则表达式匹配文件(valid_file_pattern)", }, }, ModeFile: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "/home/users/john/log/my.log", DefaultNoUse: true, Description: "日志文件路径(log_path)", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, { KeyName: KeyBufSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "文件缓存数据大小(reader_buf_size)", CheckRegex: "\\d+", }, { KeyName: KeyWhence, ChooseOnly: true, ChooseOptions: []string{WhenceOldest, WhenceNewest}, Description: "读取的起始位置(read_from)", }, OptionDataSourceTag, { KeyName: KeyEncoding, ChooseOnly: true, ChooseOptions: []string{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1", "GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4", "macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256", "windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995", "ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995", "ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999", "KOI8-R", "KOI8-U", "ebcdic-xml-us"}, Default: "UTF-8", DefaultNoUse: false, Description: "编码方式(encoding)", }, { KeyName: KeyReadIOLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "读取速度限制(MB/s)(readio_limit)", CheckRegex: "\\d+", }, { KeyName: KeyHeadPattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "多行读取的起始行正则表达式(head_pattern)", }, }, ModeTailx: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "/home/users/*/mylog/*.log", DefaultNoUse: true, Description: "日志文件路径模式串(log_path)", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, { KeyName: KeyBufSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "文件缓存数据大小(reader_buf_size)", CheckRegex: "\\d+", }, { KeyName: KeyWhence, ChooseOnly: true, ChooseOptions: []string{WhenceOldest, WhenceNewest}, Description: "读取的起始位置(read_from)", }, { KeyName: KeyEncoding, ChooseOnly: true, ChooseOptions: []string{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1", "GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4", "macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256", "windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995", "ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995", "ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999", "KOI8-R", "KOI8-U", "ebcdic-xml-us"}, Default: "UTF-8", DefaultNoUse: false, Description: "编码方式(encoding)", }, { KeyName: KeyReadIOLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "读取速度限制(MB/s)(readio_limit)", CheckRegex: "\\d+", }, OptionDataSourceTag, { KeyName: KeyHeadPattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "多行读取的起始行正则表达式(head_pattern)", }, { KeyName: KeyExpire, ChooseOnly: false, Default: "24h", DefaultNoUse: false, Description: "文件过期时间(时h,分m,秒s)(expire)", CheckRegex: "\\d+[hms]", }, { KeyName: KeyMaxOpenFiles, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "最大的打开文件数(max_open_files)", CheckRegex: "\\d+", }, { KeyName: KeyStatInterval, ChooseOnly: false, Default: "3m", DefaultNoUse: false, Description: "文件扫描间隔(stat_interval)", CheckRegex: "\\d+[hms]", }, }, ModeMysql: { { KeyName: KeyMysqlDataSource, ChooseOnly: false, Default: "<username>:<password>@tcp(<hostname>:<port>)", DefaultNoUse: true, Description: "数据库地址(mysql_datasource)", }, { KeyName: KeyMysqlDataBase, ChooseOnly: false, Default: "<database>", DefaultNoUse: true, Description: "数据库名称(mysql_database)", }, { KeyName: KeyMysqlSQL, ChooseOnly: false, Default: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(mysql_sql)", }, { KeyName: KeyMysqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "递增的列名称(mysql_offset_key)", }, { KeyName: KeyMysqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mysql_limit_batch)", CheckRegex: "\\d+", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, OptionDataSourceTag, { KeyName: KeyMysqlCron, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "定时任务调度Cron(mysql_cron)", }, { KeyName: KeyMysqlExecOnStart, ChooseOnly: true, ChooseOptions: []string{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mysql_exec_onstart)", }, { KeyName: KeySQLSchema, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "SQL字段类型定义(sql_schema)", }, }, ModeMssql: { { KeyName: KeyMssqlDataSource, ChooseOnly: false, Default: "server=<hostname or instance>;user id=<username>;password=<password>;port=<port>", DefaultNoUse: true, Description: "数据库地址(mssql_datasource)", }, { KeyName: KeyMssqlDataBase, ChooseOnly: false, Default: "<database>", DefaultNoUse: true, Description: "数据库名称(mssql_database)", }, { KeyName: KeyMssqlSQL, ChooseOnly: false, Default: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(mssql_sql)", }, { KeyName: KeyMssqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "递增的列名称(mssql_offset_key)", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, OptionDataSourceTag, { KeyName: KeyMssqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mssql_limit_batch)", CheckRegex: "\\d+", }, { KeyName: KeyMssqlCron, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "定时任务调度Crontab(mssql_cron)", }, { KeyName: KeyMssqlExecOnStart, ChooseOnly: true, ChooseOptions: []string{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mssql_exec_onstart)", }, { KeyName: KeySQLSchema, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "SQL字段类型定义(sql_schema)", }, }, ModeElastic: { { KeyName: KeyESHost, ChooseOnly: false, Default: "http://localhost:9200", DefaultNoUse: true, Description: "数据库地址(es_host)", }, { KeyName: KeyESVersion, ChooseOnly: true, ChooseOptions: []string{ElasticVersion2, ElasticVersion5}, Description: "ES版本号(es_version)", }, { KeyName: KeyESIndex, ChooseOnly: false, Default: "app-repo-123", DefaultNoUse: true, Description: "ES索引名称(es_index)", }, { KeyName: KeyESType, ChooseOnly: false, Default: "type_app", DefaultNoUse: true, Description: "ES的app名称(es_type)", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, OptionDataSourceTag, { KeyName: KeyESReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(es_limit_batch)", }, { KeyName: KeyESKeepAlive, ChooseOnly: false, Default: "1d", DefaultNoUse: false, Description: "ES的Offset保存时间(es_keepalive)", CheckRegex: "\\d+[dms]", }, }, ModeMongo: { { KeyName: KeyMongoHost, ChooseOnly: false, Default: "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]", DefaultNoUse: true, Description: "数据库地址(mongo_host)", }, { KeyName: KeyMongoDatabase, ChooseOnly: false, Default: "app123", DefaultNoUse: true, Description: "数据库名称(mongo_database)", }, { KeyName: KeyMongoCollection, ChooseOnly: false, Default: "collection1", DefaultNoUse: true, Description: "数据表名称(mongo_collection)", }, { KeyName: KeyMongoOffsetKey, ChooseOnly: false, Default: "_id", DefaultNoUse: true, Description: "递增的主键(mongo_offset_key)", }, { KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "断点续传元数据路径(meta_path)", }, OptionDataSourceTag, { KeyName: KeyMongoReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mongo_limit_batch)", CheckRegex: "\\d+", }, { KeyName: KeyMongoCron, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "定时任务调度Cron(mongo_cron)", }, { KeyName: KeyMongoExecOnstart, ChooseOnly: true, ChooseOptions: []string{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mongo_exec_onstart)", }, { KeyName: KeyMongoFilters, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "数据过滤方式(mongo_filters)", }, }, ModeKafka: { { KeyName: KeyKafkaGroupID, ChooseOnly: false, Default: "logkit1", DefaultNoUse: true, Description: "Kafka的consumer组名称(kafka_groupid)", }, { KeyName: KeyKafkaTopic, ChooseOnly: false, Default: "test_topic1", DefaultNoUse: true, Description: "Kafka的topic名称(kafka_topic)", }, { KeyName: KeyKafkaZookeeper, ChooseOnly: false, Default: "localhost:2181", DefaultNoUse: true, Description: "Zookeeper地址(kafka_zookeeper)", }, { KeyName: KeyWhence, ChooseOnly: true, ChooseOptions: []string{WhenceOldest, WhenceNewest}, Description: "读取的起始位置(read_from)", }, { KeyName: KeyKafkaZookeeperTimeout, ChooseOnly: false, Default: "1", DefaultNoUse: false, Description: "zookeeper超时时间(秒)(kafka_zookeeper_timeout)", }, OptionDataSourceTag, }, ModeRedis: { { KeyName: KeyRedisDataType, ChooseOnly: true, ChooseOptions: []string{DataTypeList, DataTypeChannel, DataTypePatterChannel}, Description: "Redis的数据读取模式(redis_datatype)", }, { KeyName: KeyRedisDB, ChooseOnly: false, Default: "0", DefaultNoUse: true, Description: "数据库名称(redis_db)", }, { KeyName: KeyRedisKey, ChooseOnly: false, Default: "key1", DefaultNoUse: true, Description: "redis键(redis_key)", }, { KeyName: KeyRedisAddress, ChooseOnly: false, Default: "127.0.0.1:6379", DefaultNoUse: false, Description: "数据库地址(redis_address)", }, { KeyName: KeyRedisPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "密码(redis_password)", }, { KeyName: KeyTimeoutDuration, ChooseOnly: false, Default: "5s", DefaultNoUse: false, Description: "单次读取超时时间(m(分)、s(秒))(redis_timeout)", CheckRegex: "\\d+[ms]", }, OptionDataSourceTag, }, }
var ModeUsages = []utils.KeyValue{ {ModeDir, "从文件读取( dir 模式)"}, {ModeFile, "从文件读取( file 模式)"}, {ModeTailx, "从文件读取( tailx 模式)"}, {ModeMysql, "从 MySQL 读取"}, {ModeMssql, "从 MSSQL 读取"}, {ModeElastic, "从 Elasticsearch 读取"}, {ModeMongo, "从 MongoDB 读取"}, {ModeKafka, "从 kafka 读取"}, {ModeRedis, "从 redis 读取"}, }
ModeUsages 用途说明
var ( OptionDataSourceTag = utils.Option{ KeyName: KeyDataSourceTag, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "具体的数据文件路径来源标签(datasource_tag)", } )
var WaitNoSuchFile = time.Second
Functions ¶
Types ¶
type ActiveReader ¶
type ActiveReader struct {
// contains filtered or unexported fields
}
func NewActiveReader ¶
func NewActiveReader(logPath, whence string, meta *Meta, msgChan chan<- Result) (ar *ActiveReader, err error)
func (*ActiveReader) Close ¶
func (ar *ActiveReader) Close() error
func (*ActiveReader) Run ¶
func (ar *ActiveReader) Run()
func (*ActiveReader) Status ¶ added in v1.3.1
func (ar *ActiveReader) Status() utils.StatsInfo
func (*ActiveReader) SyncMeta ¶
func (ar *ActiveReader) SyncMeta() string
除了sync自己的bufreader,还要sync一行linecache
type BufReader ¶
type BufReader struct {
// contains filtered or unexported fields
}
BufReader implements buffering for an FileReader object.
func NewReaderSize ¶
func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)
NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.
func (*BufReader) ReadPattern ¶
ReadPattern读取日志直到匹配行首模式串
func (*BufReader) ReadString ¶
ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.
type CollectionFilter ¶
type CollectionFilter map[string]interface{}
CollectionFilter is just a typed map of strings of map[string]interface{}
type ElasticReader ¶
type ElasticReader struct {
// contains filtered or unexported fields
}
func NewESReader ¶
func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, esVersion, keepAlive string) (er *ElasticReader, err error)
func (*ElasticReader) Close ¶
func (er *ElasticReader) Close() (err error)
func (*ElasticReader) Name ¶
func (er *ElasticReader) Name() string
func (*ElasticReader) ReadLine ¶
func (er *ElasticReader) ReadLine() (data string, err error)
func (*ElasticReader) SetMode ¶
func (er *ElasticReader) SetMode(mode string, v interface{}) error
func (*ElasticReader) Source ¶
func (er *ElasticReader) Source() string
func (*ElasticReader) Start ¶
func (er *ElasticReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
func (*ElasticReader) Status ¶ added in v1.3.1
func (er *ElasticReader) Status() utils.StatsInfo
func (*ElasticReader) SyncMeta ¶
func (er *ElasticReader) SyncMeta()
SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。
type FileReader ¶
type FileReader interface { Name() string Source() string Read(p []byte) (n int, err error) Close() error SyncMeta() error }
FileReader reader 接口方法
type KafkaReader ¶
type KafkaReader struct { ConsumerGroup string Topics []string ZookeeperPeers []string ZookeeperChroot string ZookeeperTimeout time.Duration Whence string Consumer *consumergroup.ConsumerGroup // contains filtered or unexported fields }
func NewKafkaReader ¶
func (*KafkaReader) Close ¶
func (kr *KafkaReader) Close() (err error)
func (*KafkaReader) Name ¶
func (kr *KafkaReader) Name() string
func (*KafkaReader) ReadLine ¶
func (kr *KafkaReader) ReadLine() (data string, err error)
func (*KafkaReader) SetMode ¶
func (kr *KafkaReader) SetMode(mode string, v interface{}) error
func (*KafkaReader) Source ¶
func (kr *KafkaReader) Source() string
func (*KafkaReader) Start ¶
func (kr *KafkaReader) Start()
func (*KafkaReader) Status ¶ added in v1.3.1
func (kr *KafkaReader) Status() utils.StatsInfo
func (*KafkaReader) SyncMeta ¶
func (kr *KafkaReader) SyncMeta()
type Meta ¶
type Meta struct { RunnerName string // contains filtered or unexported fields }
func (*Meta) AppendDeleteFile ¶
func (*Meta) AppendDoneFile ¶
AppendDoneFile 将处理完的文件写入doneFile中
func (*Meta) CacheLineFile ¶
func (*Meta) DeleteDoneFile ¶
func (*Meta) DoneFilePath ¶
DoneFilePath 返回meta的filedone文件的存放目录
func (*Meta) FtSaveLogPath ¶ added in v1.3.5
FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径
func (*Meta) GetDataSourceTag ¶
func (*Meta) IsFileMode ¶ added in v1.3.1
func (*Meta) IsNotValid ¶
IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏
func (*Meta) ReadBufMeta ¶
func (*Meta) ReadCacheLine ¶
func (*Meta) ReadOffset ¶
ReadOffset 读取当前读取的文件和offset
func (*Meta) ReadStatistic ¶ added in v1.3.5
func (*Meta) SetEncodingWay ¶
SetEncodingWay 设置文件编码方式,默认为 utf-8
func (*Meta) StatisticFile ¶ added in v1.3.5
StatisticFile 返回 Runner 统计信息的文件路径
func (*Meta) WriteCacheLine ¶
func (*Meta) WriteOffset ¶
WriteOffset 将当前文件和offset写入meta中
func (*Meta) WriteStatistic ¶ added in v1.3.5
type MongoReader ¶
func NewMongoReader ¶
func (*MongoReader) Close ¶
func (mr *MongoReader) Close() (err error)
func (*MongoReader) LoopRun ¶ added in v1.3.3
func (mr *MongoReader) LoopRun()
func (*MongoReader) Name ¶
func (mr *MongoReader) Name() string
func (*MongoReader) ReadLine ¶
func (mr *MongoReader) ReadLine() (data string, err error)
func (*MongoReader) SetMode ¶
func (mr *MongoReader) SetMode(mode string, v interface{}) error
func (*MongoReader) Source ¶
func (mr *MongoReader) Source() string
func (*MongoReader) Start ¶
func (mr *MongoReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
func (*MongoReader) Status ¶ added in v1.3.1
func (mr *MongoReader) Status() utils.StatsInfo
type MultiReader ¶
type MultiReader struct {
// contains filtered or unexported fields
}
func NewMultiReader ¶
func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)
func (*MultiReader) Close ¶
func (mr *MultiReader) Close() (err error)
func (*MultiReader) Name ¶
func (mr *MultiReader) Name() string
func (*MultiReader) ReadLine ¶
func (mr *MultiReader) ReadLine() (data string, err error)
func (*MultiReader) SetMode ¶
func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)
func (*MultiReader) Source ¶
func (mr *MultiReader) Source() string
func (*MultiReader) Start ¶
func (mr *MultiReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务
func (*MultiReader) StatLogPath ¶
func (mr *MultiReader) StatLogPath()
func (*MultiReader) Status ¶ added in v1.3.1
func (mr *MultiReader) Status() utils.StatsInfo
type Reader ¶
type Reader interface { //Name reader名称 Name() string //Source 读取的数据源 Source() string ReadLine() (string, error) SetMode(mode string, v interface{}) error Close() error SyncMeta() }
Reader 是一个通用的行读取reader接口
func NewFileBufReader ¶
NewFileReader 创建FileReader
type RedisOptionn ¶ added in v1.2.1
type RedisOptionn struct {
// contains filtered or unexported fields
}
type RedisReader ¶ added in v1.2.1
type RedisReader struct {
// contains filtered or unexported fields
}
func NewRedisReader ¶ added in v1.2.1
func NewRedisReader(meta *Meta, conf conf.MapConf) (rr *RedisReader, err error)
func (*RedisReader) Close ¶ added in v1.2.1
func (rr *RedisReader) Close() (err error)
func (*RedisReader) Name ¶ added in v1.2.1
func (rr *RedisReader) Name() string
func (*RedisReader) ReadLine ¶ added in v1.2.1
func (rr *RedisReader) ReadLine() (data string, err error)
func (*RedisReader) SetMode ¶ added in v1.2.1
func (rr *RedisReader) SetMode(mode string, v interface{}) error
func (*RedisReader) Source ¶ added in v1.2.1
func (rr *RedisReader) Source() string
func (*RedisReader) Start ¶ added in v1.2.1
func (rr *RedisReader) Start()
func (*RedisReader) Status ¶ added in v1.3.1
func (rr *RedisReader) Status() utils.StatsInfo
func (*RedisReader) SyncMeta ¶ added in v1.2.1
func (rr *RedisReader) SyncMeta()
type SeqFile ¶
type SeqFile struct {
// contains filtered or unexported fields
}
SeqFile 按最终修改时间依次读取文件的Reader类型
func NewSeqFile ¶
type ServerReader ¶ added in v1.2.1
type ServerReader interface { //Name reader名称 Name() string //Source 读取的数据源 Source() string Start() ReadLine() (string, error) Close() error SyncMeta() }
TODO 构建统一的 Server reader框架, 减少重复的编码
type SingleFile ¶
type SingleFile struct {
// contains filtered or unexported fields
}
func NewSingleFile ¶
func NewSingleFile(meta *Meta, path, whence string, isFromWeb bool) (sf *SingleFile, err error)
func (*SingleFile) Close ¶
func (sf *SingleFile) Close() (err error)
func (*SingleFile) Name ¶
func (sf *SingleFile) Name() string
func (*SingleFile) Reopen ¶
func (sf *SingleFile) Reopen() (err error)
func (*SingleFile) Source ¶
func (sf *SingleFile) Source() string
func (*SingleFile) SyncMeta ¶
func (sf *SingleFile) SyncMeta() error
type SqlReader ¶
type StatsReader ¶ added in v1.3.1
StatsReader 是一个通用的带有统计接口的reader