Documentation ¶
Overview ¶
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Index ¶
- Constants
- Variables
- func Errorf(err error, msg string, format ...interface{}) error
- func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)
- type ActiveReader
- type BufReader
- func (b *BufReader) Close() error
- func (b *BufReader) Lag() (rl *models.LagInfo, err error)
- func (b *BufReader) Name() string
- func (b *BufReader) ReadLine() (ret string, err error)
- func (b *BufReader) ReadPattern() (string, error)
- func (b *BufReader) ReadString(delim byte) (ret string, err error)
- func (b *BufReader) SetMode(mode string, v interface{}) (err error)
- func (b *BufReader) Source() string
- func (b *BufReader) Status() utils.StatsInfo
- func (b *BufReader) SyncMeta()
- type CollectionFilter
- type ElasticReader
- func (er *ElasticReader) Close() (err error)
- func (er *ElasticReader) Name() string
- func (er *ElasticReader) ReadLine() (data string, err error)
- func (er *ElasticReader) SetMode(mode string, v interface{}) error
- func (er *ElasticReader) Source() string
- func (er *ElasticReader) Start()
- func (er *ElasticReader) Status() utils.StatsInfo
- func (er *ElasticReader) SyncMeta()
- type Field
- type FileReader
- type HttpReader
- type KafkaReader
- func (kr *KafkaReader) Close() (err error)
- func (kr *KafkaReader) Lag() (rl *models.LagInfo, err error)
- func (kr *KafkaReader) Name() string
- func (kr *KafkaReader) ReadLine() (data string, err error)
- func (kr *KafkaReader) SetMode(mode string, v interface{}) error
- func (kr *KafkaReader) Source() string
- func (kr *KafkaReader) Start()
- func (kr *KafkaReader) Status() utils.StatsInfo
- func (kr *KafkaReader) SyncMeta()
- type LagReader
- type LastSync
- type Meta
- func (m *Meta) AppendDeleteFile(path string) (err error)
- func (m *Meta) AppendDoneFile(path string) (err error)
- func (m *Meta) BufFile() string
- func (m *Meta) BufMetaFile() string
- func (m *Meta) CacheLineFile() string
- func (m *Meta) Clear() error
- func (m *Meta) DeleteDoneFile(path string) error
- func (m *Meta) DeleteFile() string
- func (m *Meta) DoneFile() string
- func (m *Meta) DoneFilePath() string
- func (m *Meta) FtSaveLogPath() string
- func (m *Meta) GetDataSourceTag() string
- func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)
- func (m *Meta) GetEncodingWay() (e string)
- func (m *Meta) GetMode() string
- func (m *Meta) GetTagFile() string
- func (m *Meta) GetTags() map[string]interface{}
- func (m *Meta) IsDoneFile(file string) bool
- func (m *Meta) IsExist() bool
- func (m *Meta) IsFileMode() bool
- func (m *Meta) IsNotExist() bool
- func (m *Meta) IsNotValid() bool
- func (m *Meta) IsValid() bool
- func (m *Meta) LogPath() string
- func (m *Meta) MetaFile() string
- func (m *Meta) ReadBuf(buf []byte) (n int, err error)
- func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)
- func (m *Meta) ReadCacheLine() ([]byte, error)
- func (m *Meta) ReadOffset() (currFile string, offset int64, err error)
- func (m *Meta) ReadStatistic() (stat Statistic, err error)
- func (m *Meta) Reset() error
- func (m *Meta) SetEncodingWay(e string)
- func (m *Meta) StatisticFile() string
- func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)
- func (m *Meta) WriteCacheLine(lines string) error
- func (m *Meta) WriteOffset(currFile string, offset int64) (err error)
- func (m *Meta) WriteStatistic(stat *Statistic) error
- type MongoReader
- func (mr *MongoReader) Close() (err error)
- func (mr *MongoReader) LoopRun()
- func (mr *MongoReader) Name() string
- func (mr *MongoReader) ReadLine() (data string, err error)
- func (mr *MongoReader) SetMode(mode string, v interface{}) error
- func (mr *MongoReader) Source() string
- func (mr *MongoReader) Start()
- func (mr *MongoReader) Status() utils.StatsInfo
- func (mr *MongoReader) SyncMeta()
- type MultiReader
- func (mr *MultiReader) Close() (err error)
- func (mr *MultiReader) Expire()
- func (mr *MultiReader) Lag() (rl *models.LagInfo, err error)
- func (mr *MultiReader) Name() string
- func (mr *MultiReader) ReadLine() (data string, err error)
- func (mr *MultiReader) Reset() (err error)
- func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)
- func (mr *MultiReader) Source() string
- func (mr *MultiReader) Start()
- func (mr *MultiReader) StatLogPath()
- func (mr *MultiReader) Status() utils.StatsInfo
- func (mr *MultiReader) SyncMeta()
- type NestedError
- type RTable
- type RTableRow
- type Reader
- func NewFileAutoReader(conf conf.MapConf, meta *Meta, isFromWeb bool, bufSize int, whence string, ...) (reader Reader, err error)
- func NewFileBufReader(conf conf.MapConf, isFromWeb bool) (reader Reader, err error)
- func NewFileBufReaderWithMeta(conf conf.MapConf, meta *Meta, isFromWeb bool) (reader Reader, err error)
- type RedisOptionn
- type RedisReader
- func (rr *RedisReader) Close() (err error)
- func (rr *RedisReader) Name() string
- func (rr *RedisReader) ReadLine() (data string, err error)
- func (rr *RedisReader) SetMode(mode string, v interface{}) error
- func (rr *RedisReader) Source() string
- func (rr *RedisReader) Start()
- func (rr *RedisReader) Status() utils.StatsInfo
- func (rr *RedisReader) SyncMeta()
- type Result
- type ScriptReader
- func (sr *ScriptReader) Close() (err error)
- func (sr *ScriptReader) LoopRun()
- func (sr *ScriptReader) Name() string
- func (sr *ScriptReader) ReadLine() (data string, err error)
- func (sr *ScriptReader) SetMode(mode string, v interface{}) error
- func (sr *ScriptReader) Source() string
- func (sr *ScriptReader) Start()
- func (sr *ScriptReader) SyncMeta()
- type SeqFile
- type ServerReader
- type SingleFile
- func (sf *SingleFile) Close() (err error)
- func (sf *SingleFile) Lag() (rl *models.LagInfo, err error)
- func (sf *SingleFile) Name() string
- func (sf *SingleFile) Read(p []byte) (n int, err error)
- func (sf *SingleFile) Reopen() (err error)
- func (sf *SingleFile) Source() string
- func (sf *SingleFile) SyncMeta() error
- type SnmpReader
- func (s *SnmpReader) Close() error
- func (s *SnmpReader) Gather() (err error)
- func (s *SnmpReader) Name() string
- func (s *SnmpReader) ReadLine() (line string, err error)
- func (s *SnmpReader) SetMode(mode string, v interface{}) error
- func (s *SnmpReader) Source() string
- func (s *SnmpReader) Start() error
- func (s *SnmpReader) StoreData(data []map[string]interface{}) (err error)
- func (s *SnmpReader) SyncMeta()
- type SocketReader
- func (sr *SocketReader) Close() (err error)
- func (sr *SocketReader) Name() string
- func (sr *SocketReader) ReadLine() (data string, err error)
- func (sr *SocketReader) SetMode(mode string, v interface{}) error
- func (sr *SocketReader) Source() string
- func (sr *SocketReader) Start() error
- func (sr *SocketReader) SyncMeta()
- type SqlReader
- func (mr *SqlReader) Close() (err error)
- func (mr *SqlReader) LoopRun()
- func (mr *SqlReader) Name() string
- func (mr *SqlReader) ReadLine() (data string, err error)
- func (mr *SqlReader) SetMode(mode string, v interface{}) error
- func (mr *SqlReader) Source() string
- func (mr *SqlReader) Start()
- func (mr *SqlReader) Status() utils.StatsInfo
- func (mr *SqlReader) SyncMeta()
- type Statistic
- type StatsReader
- type Table
Constants ¶
const ( KeyHttpServiceAddress = "http_service_address" KeyHttpServicePath = "http_service_path" DefaultHttpServiceAddress = ":4000" DefaultHttpServicePath = "/logkit/data" DefaultSyncEvery = 10 DefaultMaxBodySize = 100 * 1024 * 1024 DefaultMaxBytesPerFile = 500 * 1024 * 1024 DefaultWriteSpeedLimit = 10 * 1024 * 1024 // 默认写速限制为10MB )
const ( KeyLogPath = "log_path" KeyMetaPath = "meta_path" KeyFileDone = "file_done" KeyMode = "mode" KeyBufSize = "reader_buf_size" KeyWhence = "read_from" KeyEncoding = "encoding" KeyReadIOLimit = "readio_limit" KeyDataSourceTag = "datasource_tag" KeyTagFile = "tag_file" KeyHeadPattern = "head_pattern" KeyRunnerName = "runner_name" KeyNewFileNewLine = "newfile_newline" // 忽略隐藏文件 KeyIgnoreHiddenFile = "ignore_hidden" KeyIgnoreFileSuffix = "ignore_file_suffix" KeyValidFilePattern = "valid_file_pattern" KeyExpire = "expire" KeyMaxOpenFiles = "max_open_files" KeyStatInterval = "stat_interval" KeyMysqlOffsetKey = "mysql_offset_key" KeyMysqlReadBatch = "mysql_limit_batch" KeyMysqlDataSource = "mysql_datasource" KeyMysqlDataBase = "mysql_database" KeyMysqlSQL = "mysql_sql" KeyMysqlCron = "mysql_cron" KeyMysqlExecOnStart = "mysql_exec_onstart" KeySQLSchema = "sql_schema" KeyMssqlOffsetKey = "mssql_offset_key" KeyMssqlReadBatch = "mssql_limit_batch" KeyMssqlDataSource = "mssql_datasource" KeyMssqlDataBase = "mssql_database" KeyMssqlSQL = "mssql_sql" KeyMssqlCron = "mssql_cron" KeyMssqlExecOnStart = "mssql_exec_onstart" KeyPGsqlOffsetKey = "postgres_offset_key" KeyPGsqlReadBatch = "postgres_limit_batch" KeyPGsqlDataSource = "postgres_datasource" KeyPGsqlDataBase = "postgres_database" KeyPGsqlSQL = "postgres_sql" KeyPGsqlCron = "postgres_cron" KeyPGsqlExecOnStart = "postgres_exec_onstart" KeyESReadBatch = "es_limit_batch" KeyESIndex = "es_index" KeyESType = "es_type" KeyESHost = "es_host" KeyESKeepAlive = "es_keepalive" KeyESVersion = "es_version" KeyMongoHost = "mongo_host" KeyMongoDatabase = "mongo_database" KeyMongoCollection = "mongo_collection" KeyMongoOffsetKey = "mongo_offset_key" KeyMongoReadBatch = "mongo_limit_batch" KeyMongoCron = "mongo_cron" KeyMongoExecOnstart = "mongo_exec_onstart" KeyMongoFilters = "mongo_filters" KeyMongoCert = "mongo_cacert" KeyKafkaGroupID = "kafka_groupid" KeyKafkaTopic = "kafka_topic" KeyKafkaZookeeper = "kafka_zookeeper" KeyKafkaZookeeperChroot = "kafka_zookeeper_chroot" KeyKafkaZookeeperTimeout = "kafka_zookeeper_timeout" KeyExecInterpreter = "script_exec_interprepter" KeyScriptCron = "script_cron" KeyScriptExecOnStart = "script_exec_onstart" )
FileReader's conf keys
const ( ModeDir = "dir" ModeFile = "file" ModeTailx = "tailx" ModeFileAuto = "fileauto" ModeMysql = "mysql" ModeMssql = "mssql" ModePG = "postgres" ModeElastic = "elastic" ModeMongo = "mongo" ModeKafka = "kafka" ModeRedis = "redis" ModeSocket = "socket" ModeHttp = "http" ModeScript = "script" ModeSnmp = "snmp" )
FileReader's modes
const ( ReadModeHeadPatternString = "mode_head_pattern_string" ReadModeHeadPatternRegexp = "mode_head_pattern_regexp" )
const ( WhenceOldest = "oldest" WhenceNewest = "newest" )
KeyWhence 的可选项
const ( DateTypeHash = "hash" DateTypeSortedSet = "sortedSet" DataTypeSet = "set" DataTypeString = "string" DataTypeList = "list" DataTypeChannel = "channel" DataTypePatterChannel = "pattern_channel" )
const ( KeyRedisDataType = "redis_datatype" // 必填 KeyRedisDB = "redis_db" //默认 是0 KeyRedisKey = "redis_key" //必填 KeyRedisHashArea = "redisHash_area" KeyRedisAddress = "redis_address" // 默认127.0.0.1:6379 KeyRedisPassword = "redis_password" KeyTimeoutDuration = "redis_timeout" )
const ( KeySnmpReaderAgents = "snmp_agents" KeySnmpReaderTimeOut = "snmp_time_out" KeySnmpReaderInterval = "snmp_interval" KeySnmpReaderRetries = "snmp_retries" KeySnmpReaderVersion = "snmp_version" KeySnmpReaderCommunity = "snmp_community" KeySnmpReaderMaxRepetitions = "snmp_max_repetitions" KeySnmpReaderContextName = "snmp_context_name" KeySnmpReaderSecLevel = "snmp_sec_level" KeySnmpReaderSecName = "snmp_sec_name" KeySnmpReaderAuthProtocol = "snmp_auth_protocol" KeySnmpReaderAuthPassword = "snmp_auth_password" KeySnmpReaderPrivProtocol = "snmp_priv_protocol" KeySnmpReaderPrivPassword = "snmp_priv_password" KeySnmpReaderEngineID = "snmp_engine_id" KeySnmpReaderEngineBoots = "snmp_engine_boots" KeySnmpReaderEngineTime = "snmp_engine_time" KeySnmpReaderTables = "snmp_tables" KeySnmpReaderName = "snmp_reader_name" KeySnmpReaderFields = "snmp_fields" KeySnmpTableName = "snmp_table" KeyTimestamp = "timestamp" )
const ( // 监听的url形式包括: // socket_service_address = "tcp://:3110" // socket_service_address = "tcp://127.0.0.1:http" // socket_service_address = "tcp4://:3110" // socket_service_address = "tcp6://:3110" // socket_service_address = "tcp6://[2001:db8::1]:3110" // socket_service_address = "udp://:3110" // socket_service_address = "udp4://:3110" // socket_service_address = "udp6://:3110" // socket_service_address = "unix:///tmp/sys.sock" // socket_service_address = "unixgram:///tmp/sys.sock" KeySocketServiceAddress = "socket_service_address" // 最大并发连接数 // 仅用于 stream sockets (e.g. TCP). // 0 (default) 为无限制. // socket_max_connections = 1024 KeySocketMaxConnections = "socket_max_connections" // 读的超时时间 // 仅用于 stream sockets (e.g. TCP). // 0 (default) 为没有超时 // socket_read_timeout = "30s" KeySocketReadTimeout = "socket_read_timeout" // Socket的Buffer大小,默认65535 // socket_read_buffer_size = 65535 KeySocketReadBufferSize = "socket_read_buffer_size" // TCP连接的keep_alive时长 // 0 表示关闭keep_alive // 默认5分钟 KeySocketKeepAlivePeriod = "socket_keep_alive_period" )
const ( StatusInit int32 = iota StatusStopped StatusStopping StatusRunning )
const DirMode = "dir"
DirMode 按时间顺序顺次读取文件夹下所有文件的模式
const FileMode = "file"
FileMode 读取单个文件模式
const (
Loop = "loop"
)
const (
MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
const (
ModeMetrics = "metrics"
)
const (
MongoDefaultOffsetKey = "_id"
)
const (
SQL_SPLITER = ";"
)
Variables ¶
var ( ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte") ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune") ErrBufferFull = errors.New("bufio: buffer full") ErrNegativeCount = errors.New("bufio: negative count") )
var ( ElasticVersion3 = "3.x" ElasticVersion5 = "5.x" ElasticVersion6 = "6.x" )
var ( OptionMetaPath = Option{ KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "数据保存路径(meta_path)", Advance: true, } OptionDataSourceTag = Option{ KeyName: KeyDataSourceTag, ChooseOnly: false, Default: "datasource", DefaultNoUse: false, Description: "来源标签(datasource_tag)", Advance: true, } OptionBuffSize = Option{ KeyName: KeyBufSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "数据缓存大小(reader_buf_size)", CheckRegex: "\\d+", Advance: true, } OptionEncoding = Option{ KeyName: KeyEncoding, ChooseOnly: true, ChooseOptions: []interface{}{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1", "GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4", "macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256", "windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995", "ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995", "ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999", "KOI8-R", "KOI8-U", "ebcdic-xml-us"}, Default: "UTF-8", DefaultNoUse: false, Description: "编码方式(encoding)", Advance: true, } OptionWhence = Option{ KeyName: KeyWhence, ChooseOnly: true, ChooseOptions: []interface{}{WhenceOldest, WhenceNewest}, Default: WhenceOldest, Description: "读取起始位置(read_from)", } OptionReadIoLimit = Option{ KeyName: KeyReadIOLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "读取速度限制(MB/s)(readio_limit)", CheckRegex: "\\d+", Advance: true, } OptionHeadPattern = Option{ KeyName: KeyHeadPattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "按正则表达式规则换行(head_pattern)", Advance: true, } OptionSQLSchema = Option{ KeyName: KeySQLSchema, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "手动定义SQL字段类型(sql_schema)", Advance: true, } OptionKeyNewFileNewLine = Option{ KeyName: KeyNewFileNewLine, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "文件末尾添加换行符(newfile_newline)", Advance: true, } OptionKeyValidFilePattern = Option{ KeyName: KeyValidFilePattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "以linux通配符匹配文件(valid_file_pattern)", Advance: true, } )
var ErrFileNotDir = errors.New("file is not directory")
var ErrFileNotRegular = errors.New("file is not regular")
var ErrMetaFileRead = errors.New("cannot read meta file")
var ErrNoFileChosen = errors.New("no files found")
var ErrStopped = errors.New("runner stopped")
var ModeKeyOptions = map[string][]Option{ ModeDir: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Placeholder: "/home/users/john/log/", Required: true, DefaultNoUse: true, Description: "日志文件夹路径(log_path)", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionEncoding, OptionDataSourceTag, OptionReadIoLimit, OptionHeadPattern, OptionKeyNewFileNewLine, { KeyName: KeyIgnoreHiddenFile, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否忽略隐藏文件(ignore_hidden)", Advance: true, }, { KeyName: KeyIgnoreFileSuffix, ChooseOnly: false, Default: strings.Join(defaultIgnoreFileSuffix, ","), DefaultNoUse: false, Description: "忽略此类后缀文件(ignore_file_suffix)", Advance: true, }, OptionKeyValidFilePattern, }, ModeFile: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/users/john/log/my.log", DefaultNoUse: true, Description: "日志文件路径(log_path)", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionDataSourceTag, OptionEncoding, OptionReadIoLimit, OptionHeadPattern, }, ModeTailx: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/users/*/mylog/*.log", DefaultNoUse: true, Description: "日志文件路径模式串(log_path)", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionEncoding, OptionReadIoLimit, OptionDataSourceTag, OptionHeadPattern, { KeyName: KeyExpire, ChooseOnly: false, Default: "24h", DefaultNoUse: false, Required: true, Description: "忽略文件的最大过期时间[时h,分m,秒s](expire)", CheckRegex: "\\d+[hms]", }, { KeyName: KeyMaxOpenFiles, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "最大打开文件数(max_open_files)", CheckRegex: "\\d+", Advance: true, }, { KeyName: KeyStatInterval, ChooseOnly: false, Default: "3m", DefaultNoUse: false, Description: "扫描间隔(stat_interval)", CheckRegex: "\\d+[hms]", Advance: true, }, }, ModeFileAuto: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/your/log/dir/or/path*.log", DefaultNoUse: true, Description: "日志路径(log_path)", }, OptionMetaPath, OptionWhence, OptionEncoding, OptionDataSourceTag, OptionKeyNewFileNewLine, OptionHeadPattern, }, ModeMysql: { { KeyName: KeyMysqlDataSource, ChooseOnly: false, Default: "", Required: true, Placeholder: "<username>:<password>@tcp(<hostname>:<port>)", DefaultNoUse: true, Description: "数据库地址(mysql_datasource)", }, { KeyName: KeyMysqlDataBase, ChooseOnly: false, Placeholder: "<database>", DefaultNoUse: true, Default: "", Required: true, Description: "数据库名称(mysql_database)", }, { KeyName: KeyMysqlSQL, ChooseOnly: false, Default: "", Required: true, Placeholder: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(mysql_sql)", }, { KeyName: KeyMysqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "递增的列名称(mysql_offset_key)", Advance: true, }, { KeyName: KeyMysqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mysql_limit_batch)", CheckRegex: "\\d+", Advance: true, }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyMysqlCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务", Advance: true, }, { KeyName: KeyMysqlExecOnStart, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mysql_exec_onstart)", }, OptionSQLSchema, }, ModeMssql: { { KeyName: KeyMssqlDataSource, ChooseOnly: false, Placeholder: "server=<hostname or instance>;user id=<username>;password=<password>;port=<port>", DefaultNoUse: true, Default: "", Required: true, Description: "数据库地址(mssql_datasource)", }, { KeyName: KeyMssqlDataBase, Default: "", Required: true, ChooseOnly: false, Placeholder: "<database>", DefaultNoUse: true, Description: "数据库名称(mssql_database)", }, { KeyName: KeyMssqlSQL, Default: "", Required: true, ChooseOnly: false, Placeholder: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(mssql_sql)", }, { KeyName: KeyMssqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "递增的列名称(mssql_offset_key)", Advance: true, }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyMssqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mssql_limit_batch)", CheckRegex: "\\d+", Advance: true, }, { KeyName: KeyMssqlCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(mssql_cron)", Advance: true, }, { KeyName: KeyMssqlExecOnStart, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mssql_exec_onstart)", }, OptionSQLSchema, }, ModePG: { { KeyName: KeyPGsqlDataSource, ChooseOnly: false, Default: "", Required: true, Placeholder: "host=localhost port=5432 connect_timeout=10 user=pqgotest password=123456 sslmode=disable", DefaultNoUse: true, Description: "数据库地址(postgres_datasource)", }, { KeyName: KeyPGsqlDataBase, ChooseOnly: false, Default: "", Required: true, Placeholder: "<database>", DefaultNoUse: true, Description: "数据库名称(postgres_database)", }, { KeyName: KeyPGsqlSQL, ChooseOnly: false, Default: "", Required: true, Placeholder: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(postgres_sql)", }, { KeyName: KeyPGsqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "递增的列名称(postgres_offset_key)", Advance: true, }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyPGsqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(postgres_limit_batch)", CheckRegex: "\\d+", Advance: true, }, { KeyName: KeyPGsqlCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(postgres_cron)", Advance: true, }, { KeyName: KeyPGsqlExecOnStart, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(postgres_exec_onstart)", }, OptionSQLSchema, }, ModeElastic: { { KeyName: KeyESHost, ChooseOnly: false, Default: "", DefaultNoUse: true, Required: true, Placeholder: "http://localhost:9200", Description: "数据库地址(es_host)", }, { KeyName: KeyESVersion, ChooseOnly: true, ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6}, Description: "版本(es_version)", }, { KeyName: KeyESIndex, ChooseOnly: false, Placeholder: "app-repo-123", DefaultNoUse: true, Default: "", Required: true, Description: "索引名称(es_index)", }, { KeyName: KeyESType, ChooseOnly: false, Placeholder: "type_app", Default: "", Required: true, DefaultNoUse: true, Description: "app名称(es_type)", }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyESReadBatch, ChooseOnly: false, Default: "100", Required: true, DefaultNoUse: false, Description: "分批查询的单批次大小(es_limit_batch)", Advance: true, }, { KeyName: KeyESKeepAlive, ChooseOnly: false, Default: "1d", Required: true, DefaultNoUse: false, Description: "offset保存时间(es_keepalive)", CheckRegex: "\\d+[dms]", Advance: true, }, }, ModeMongo: { { KeyName: KeyMongoHost, ChooseOnly: false, Default: "", Required: true, Placeholder: "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]", DefaultNoUse: true, Description: "数据库地址(mongo_host)", }, { KeyName: KeyMongoDatabase, ChooseOnly: false, Default: "", Required: true, Placeholder: "app123", DefaultNoUse: true, Description: "数据库名称(mongo_database)", }, { KeyName: KeyMongoCollection, ChooseOnly: false, Default: "", Required: true, Placeholder: "collection1", DefaultNoUse: true, Description: "数据表名称(mongo_collection)", }, { KeyName: KeyMongoOffsetKey, ChooseOnly: false, Default: "_id", Required: true, DefaultNoUse: false, Description: "递增的主键(mongo_offset_key)", }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyMongoReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mongo_limit_batch)", CheckRegex: "\\d+", Advance: true, }, { KeyName: KeyMongoCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(mongo_cron)", Advance: true, }, { KeyName: KeyMongoExecOnstart, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mongo_exec_onstart)", }, { KeyName: KeyMongoFilters, ChooseOnly: false, Default: "", DefaultNoUse: false, Placeholder: "{\"foo\": {\"i\": {\"$gt\": 10}}}", Description: "数据过滤方式(mongo_filters)", Advance: true, }, }, ModeKafka: { { KeyName: KeyKafkaGroupID, ChooseOnly: false, Default: "", Required: true, Placeholder: "logkit1", DefaultNoUse: true, Description: "consumer组名称(kafka_groupid)", }, { KeyName: KeyKafkaTopic, ChooseOnly: false, Default: "", Required: true, Placeholder: "test_topic1", DefaultNoUse: true, Description: "topic名称(kafka_topic)", }, { KeyName: KeyKafkaZookeeper, ChooseOnly: false, Default: "", Required: true, Placeholder: "localhost:2181", DefaultNoUse: true, Description: "zookeeper地址(kafka_zookeeper)", }, { KeyName: KeyKafkaZookeeperChroot, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "zookeeper中kafka根路径(kafka_zookeeper_chroot)", Advance: true, }, OptionWhence, { KeyName: KeyKafkaZookeeperTimeout, ChooseOnly: false, Default: "1", Required: true, DefaultNoUse: false, Description: "zookeeper超时时间[秒](kafka_zookeeper_timeout)", Advance: true, }, OptionDataSourceTag, }, ModeRedis: { { KeyName: KeyRedisDataType, ChooseOnly: true, ChooseOptions: []interface{}{DataTypeList, DataTypeChannel, DataTypePatterChannel, DataTypeString, DataTypeSet, DateTypeSortedSet, DateTypeHash}, Description: "数据读取模式(redis_datatype)", }, { KeyName: KeyRedisDB, ChooseOnly: false, Default: "", Required: true, Placeholder: "db", DefaultNoUse: true, Description: "数据库名称(redis_db)", }, { KeyName: KeyRedisKey, ChooseOnly: false, Default: "", Required: true, Placeholder: "key1", DefaultNoUse: true, Description: "redis键(redis_key)", }, { KeyName: KeyRedisHashArea, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "hash模式对应的数据结构域(redisHash_area)", Advance: true, }, { KeyName: KeyRedisAddress, ChooseOnly: false, Default: "", Placeholder: "127.0.0.1:6379", Required: true, DefaultNoUse: false, Description: "数据库地址(redis_address)", }, { KeyName: KeyRedisPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "密码(redis_password)", Advance: true, }, { KeyName: KeyTimeoutDuration, ChooseOnly: false, Default: "5s", Required: true, DefaultNoUse: false, Description: "单次读取超时时间[m(分)、s(秒)](redis_timeout)", CheckRegex: "\\d+[ms]", Advance: true, }, OptionDataSourceTag, }, ModeSocket: { { KeyName: KeySocketServiceAddress, ChooseOnly: false, Default: "", Placeholder: "tcp://127.0.0.1:3110", Required: true, DefaultNoUse: true, Description: "socket监听的地址[协议://端口](socket_service_address)", }, { KeyName: KeySocketMaxConnections, ChooseOnly: false, Default: "0", DefaultNoUse: false, Description: "最大并发连接数[tcp](socket_max_connections)", Advance: true, }, { KeyName: KeySocketReadTimeout, ChooseOnly: false, Default: "1m", DefaultNoUse: false, Description: "连接超时时间[0为不超时](socket_read_timeout)", Advance: true, }, { KeyName: KeySocketReadBufferSize, ChooseOnly: false, Default: "65535", DefaultNoUse: false, Description: "连接缓存大小[udp](socket_read_buffer_size)", Advance: true, }, { KeyName: KeySocketKeepAlivePeriod, ChooseOnly: false, Default: "5m", DefaultNoUse: false, Description: "连接保持时长[0为关闭](socket_keep_alive_period)", Advance: true, }, OptionDataSourceTag, }, ModeHttp: { { KeyName: KeyHttpServiceAddress, ChooseOnly: false, Default: "", Placeholder: DefaultHttpServiceAddress, Required: true, DefaultNoUse: true, Description: "监听的地址和端口[<ip/host/不填>:port](http_service_address)", }, { KeyName: KeyHttpServicePath, ChooseOnly: false, Default: "", Placeholder: DefaultHttpServicePath, Required: true, DefaultNoUse: true, Description: "监听地址前缀(http_service_path)", }, }, ModeScript: { { KeyName: KeyExecInterpreter, ChooseOnly: false, Default: "/bin/bash", Placeholder: "/bin/bash", Required: true, DefaultNoUse: true, Description: "脚本执行解释器(script_exec_interpreter)", }, { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/users/john/log/my.sh", DefaultNoUse: true, Description: "脚本路径(log_path)", }, { KeyName: KeyScriptCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(script_cron)", Advance: true, }, { KeyName: KeyScriptExecOnStart, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(script_exec_onstart)", }, }, ModeSnmp: { { KeyName: KeySnmpReaderName, ChooseOnly: false, Default: "", Placeholder: "logkit_default_name", DefaultNoUse: true, Description: "名称(snmp_reader_name)", Advance: true, }, { KeyName: KeySnmpReaderAgents, ChooseOnly: false, Default: "", Placeholder: "127.0.0.1:161,10.10.0.1:161", Required: true, DefaultNoUse: true, Description: "agents列表[多个用','分隔](snmp_agents)", }, { KeyName: KeySnmpReaderTables, ChooseOnly: false, Default: "", Placeholder: "[{\"table_name\":\"udpLocalAddress\",\"table_oid\":\"1.3.6.1.2.1.31.1.1\"}]", DefaultNoUse: true, Description: "tables配置[填入json数组字符串](snmp_tables)", }, { KeyName: KeySnmpReaderFields, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "fields配置[填入json数组字符串](snmp_fields)", }, { KeyName: KeySnmpReaderVersion, ChooseOnly: true, ChooseOptions: []interface{}{"1", "2", "3"}, Default: "2", DefaultNoUse: true, Description: "snmp协议版本(snmp_version)", }, { KeyName: KeySnmpReaderTimeOut, ChooseOnly: false, Default: "5s", Required: true, DefaultNoUse: false, Description: "连接超时时间(snmp_time_out)", Advance: true, }, { KeyName: KeySnmpReaderInterval, ChooseOnly: false, Default: "30s", Required: true, DefaultNoUse: false, Description: "收集频率(snmp_interval)", Advance: true, }, { KeyName: KeySnmpReaderRetries, ChooseOnly: false, Default: "3", Required: true, DefaultNoUse: false, Description: "重试次数(snmp_retries)", Advance: true, }, { KeyName: KeySnmpReaderCommunity, ChooseOnly: false, Default: "public", Placeholder: "public", DefaultNoUse: true, Description: "community/秘钥[版本1/2有效](snmp_version)", }, { KeyName: KeySnmpReaderMaxRepetitions, ChooseOnly: false, Default: "50", DefaultNoUse: false, Description: "最大迭代次数[版本2/3有效](snmp_max_repetitions)", Advance: true, }, { KeyName: KeySnmpReaderContextName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "context名称[版本3有效](snmp_version)", Advance: true, }, { KeyName: KeySnmpReaderSecLevel, ChooseOnly: true, ChooseOptions: []interface{}{"noAuthNoPriv", "authNoPriv", "authPriv"}, DefaultNoUse: true, Description: "安全等级[版本3有效](snmp_sec_level)", Advance: true, }, { KeyName: KeySnmpReaderSecName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "认证名称[版本3有效](snmp_sec_name)", Advance: true, }, { KeyName: KeySnmpReaderAuthProtocol, ChooseOnly: true, ChooseOptions: []interface{}{"md5", "sha", ""}, DefaultNoUse: false, Description: "认证协议[版本3有效](snmp_auth_protocol)", Advance: true, }, { KeyName: KeySnmpReaderAuthPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "认证密码[版本3有效](snmp_auth_password)", Advance: true, }, { KeyName: KeySnmpReaderPrivProtocol, ChooseOnly: true, ChooseOptions: []interface{}{"des", "aes", ""}, DefaultNoUse: false, Description: "隐私协议[版本3有效](snmp_priv_protocol)", Advance: true, }, { KeyName: KeySnmpReaderPrivPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "隐私密码[版本3有效](snmp_priv_password)", Advance: true, }, { KeyName: KeySnmpReaderEngineID, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "snmp_priv_engine_id[版本3有效]", Advance: true, }, { KeyName: KeySnmpReaderEngineBoots, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "snmp_engine_boots(版本3有效)", Advance: true, }, { KeyName: KeySnmpReaderEngineTime, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "snmp_engine_time(版本3有效)", Advance: true, }, }, }
var ModeUsages = []KeyValue{ {ModeFileAuto, "从文件读取( fileauto 模式)"}, {ModeDir, "从文件读取( dir 模式)"}, {ModeFile, "从文件读取( file 模式)"}, {ModeTailx, "从文件读取( tailx 模式)"}, {ModeMysql, "从 MySQL 读取"}, {ModeMssql, "从 MSSQL 读取"}, {ModePG, "从 PostgreSQL 读取"}, {ModeElastic, "从 Elasticsearch 读取"}, {ModeMongo, "从 MongoDB 读取"}, {ModeKafka, "从 Kafka 读取"}, {ModeRedis, "从 Redis 读取"}, {ModeSocket, "从 Socket 读取"}, {ModeHttp, "从 http 请求中读取"}, {ModeScript, "从脚本的执行结果中读取"}, {ModeSnmp, "从 SNMP 服务中读取"}, }
ModeUsages 用途说明
var WaitNoSuchFile = time.Second
Functions ¶
Types ¶
type ActiveReader ¶
type ActiveReader struct {
// contains filtered or unexported fields
}
func NewActiveReader ¶
func NewActiveReader(originPath, realPath, whence string, meta *Meta, msgChan chan<- Result) (ar *ActiveReader, err error)
func (*ActiveReader) Close ¶
func (ar *ActiveReader) Close() error
func (*ActiveReader) Lag ¶ added in v1.4.4
func (ar *ActiveReader) Lag() (rl *models.LagInfo, err error)
func (*ActiveReader) Run ¶
func (ar *ActiveReader) Run()
func (*ActiveReader) Status ¶ added in v1.3.1
func (ar *ActiveReader) Status() utils.StatsInfo
func (*ActiveReader) SyncMeta ¶
func (ar *ActiveReader) SyncMeta() string
除了sync自己的bufreader,还要sync一行linecache
type BufReader ¶
type BufReader struct {
// contains filtered or unexported fields
}
BufReader implements buffering for an FileReader object.
func NewReaderSize ¶
func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)
NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.
func (*BufReader) ReadPattern ¶
ReadPattern读取日志直到匹配行首模式串
func (*BufReader) ReadString ¶
ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.
type CollectionFilter ¶
type CollectionFilter map[string]interface{}
CollectionFilter is just a typed map of strings of map[string]interface{}
type ElasticReader ¶
type ElasticReader struct {
// contains filtered or unexported fields
}
func NewESReader ¶
func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, esVersion, keepAlive string) (er *ElasticReader, err error)
func (*ElasticReader) Close ¶
func (er *ElasticReader) Close() (err error)
func (*ElasticReader) Name ¶
func (er *ElasticReader) Name() string
func (*ElasticReader) ReadLine ¶
func (er *ElasticReader) ReadLine() (data string, err error)
func (*ElasticReader) SetMode ¶
func (er *ElasticReader) SetMode(mode string, v interface{}) error
func (*ElasticReader) Source ¶
func (er *ElasticReader) Source() string
func (*ElasticReader) Start ¶
func (er *ElasticReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
func (*ElasticReader) Status ¶ added in v1.3.1
func (er *ElasticReader) Status() utils.StatsInfo
func (*ElasticReader) SyncMeta ¶
func (er *ElasticReader) SyncMeta()
SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。
type FileReader ¶
type FileReader interface { Name() string Source() string Read(p []byte) (n int, err error) Close() error SyncMeta() error }
FileReader reader 接口方法
type HttpReader ¶ added in v1.4.2
type HttpReader struct {
// contains filtered or unexported fields
}
func NewHttpReader ¶ added in v1.4.2
func NewHttpReader(meta *Meta, conf conf.MapConf) (*HttpReader, error)
func (*HttpReader) Close ¶ added in v1.4.2
func (h *HttpReader) Close() error
func (*HttpReader) Name ¶ added in v1.4.2
func (h *HttpReader) Name() string
func (*HttpReader) ReadLine ¶ added in v1.4.2
func (h *HttpReader) ReadLine() (data string, err error)
func (*HttpReader) SetMode ¶ added in v1.4.2
func (h *HttpReader) SetMode(mode string, v interface{}) error
func (*HttpReader) Source ¶ added in v1.4.2
func (h *HttpReader) Source() string
func (*HttpReader) Start ¶ added in v1.4.2
func (h *HttpReader) Start() error
func (*HttpReader) SyncMeta ¶ added in v1.4.2
func (h *HttpReader) SyncMeta()
type KafkaReader ¶
type KafkaReader struct { ConsumerGroup string Topics []string ZookeeperPeers []string ZookeeperChroot string ZookeeperTimeout time.Duration Whence string Consumer *consumergroup.ConsumerGroup // contains filtered or unexported fields }
func NewKafkaReader ¶
func (*KafkaReader) Close ¶
func (kr *KafkaReader) Close() (err error)
func (*KafkaReader) Lag ¶ added in v1.4.4
func (kr *KafkaReader) Lag() (rl *models.LagInfo, err error)
func (*KafkaReader) Name ¶
func (kr *KafkaReader) Name() string
func (*KafkaReader) ReadLine ¶
func (kr *KafkaReader) ReadLine() (data string, err error)
func (*KafkaReader) SetMode ¶
func (kr *KafkaReader) SetMode(mode string, v interface{}) error
func (*KafkaReader) Source ¶
func (kr *KafkaReader) Source() string
func (*KafkaReader) Start ¶
func (kr *KafkaReader) Start()
func (*KafkaReader) Status ¶ added in v1.3.1
func (kr *KafkaReader) Status() utils.StatsInfo
func (*KafkaReader) SyncMeta ¶
func (kr *KafkaReader) SyncMeta()
type Meta ¶
type Meta struct { RunnerName string // contains filtered or unexported fields }
func (*Meta) AppendDeleteFile ¶
func (*Meta) AppendDoneFile ¶
AppendDoneFile 将处理完的文件写入doneFile中
func (*Meta) CacheLineFile ¶
func (*Meta) DeleteDoneFile ¶
func (*Meta) DoneFilePath ¶
DoneFilePath 返回meta的filedone文件的存放目录
func (*Meta) FtSaveLogPath ¶ added in v1.3.5
FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径
func (*Meta) GetDataSourceTag ¶
func (*Meta) GetTagFile ¶ added in v1.4.3
func (*Meta) IsFileMode ¶ added in v1.3.1
func (*Meta) IsNotValid ¶
IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏
func (*Meta) ReadBufMeta ¶
func (*Meta) ReadCacheLine ¶
func (*Meta) ReadOffset ¶
读取当前读取的文件和offset
func (*Meta) ReadStatistic ¶ added in v1.3.5
func (*Meta) SetEncodingWay ¶
SetEncodingWay 设置文件编码方式,默认为 utf-8
func (*Meta) StatisticFile ¶ added in v1.3.5
StatisticFile 返回 Runner 统计信息的文件路径
func (*Meta) WriteCacheLine ¶
func (*Meta) WriteOffset ¶
WriteOffset 将当前文件和offset写入meta中
func (*Meta) WriteStatistic ¶ added in v1.3.5
type MongoReader ¶
func NewMongoReader ¶
func (*MongoReader) Close ¶
func (mr *MongoReader) Close() (err error)
func (*MongoReader) LoopRun ¶ added in v1.3.3
func (mr *MongoReader) LoopRun()
func (*MongoReader) Name ¶
func (mr *MongoReader) Name() string
func (*MongoReader) ReadLine ¶
func (mr *MongoReader) ReadLine() (data string, err error)
func (*MongoReader) SetMode ¶
func (mr *MongoReader) SetMode(mode string, v interface{}) error
func (*MongoReader) Source ¶
func (mr *MongoReader) Source() string
func (*MongoReader) Start ¶
func (mr *MongoReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
func (*MongoReader) Status ¶ added in v1.3.1
func (mr *MongoReader) Status() utils.StatsInfo
type MultiReader ¶
type MultiReader struct {
// contains filtered or unexported fields
}
func NewMultiReader ¶
func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)
func (*MultiReader) Close ¶
func (mr *MultiReader) Close() (err error)
func (*MultiReader) Lag ¶ added in v1.4.4
func (mr *MultiReader) Lag() (rl *models.LagInfo, err error)
func (*MultiReader) Name ¶
func (mr *MultiReader) Name() string
func (*MultiReader) ReadLine ¶
func (mr *MultiReader) ReadLine() (data string, err error)
func (*MultiReader) Reset ¶ added in v1.4.3
func (mr *MultiReader) Reset() (err error)
func (*MultiReader) SetMode ¶
func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)
func (*MultiReader) Source ¶
func (mr *MultiReader) Source() string
func (*MultiReader) Start ¶
func (mr *MultiReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务
func (*MultiReader) StatLogPath ¶
func (mr *MultiReader) StatLogPath()
func (*MultiReader) Status ¶ added in v1.3.1
func (mr *MultiReader) Status() utils.StatsInfo
type NestedError ¶ added in v1.4.3
func (NestedError) Error ¶ added in v1.4.3
func (ne NestedError) Error() string
type Reader ¶
type Reader interface { //Name reader名称 Name() string //Source 读取的数据源 Source() string ReadLine() (string, error) SetMode(mode string, v interface{}) error Close() error SyncMeta() }
Reader 是一个通用的行读取reader接口
func NewFileAutoReader ¶ added in v1.4.1
func NewFileBufReader ¶
NewFileReader 创建FileReader
type RedisOptionn ¶ added in v1.2.1
type RedisOptionn struct {
// contains filtered or unexported fields
}
type RedisReader ¶ added in v1.2.1
type RedisReader struct {
// contains filtered or unexported fields
}
func NewRedisReader ¶ added in v1.2.1
func NewRedisReader(meta *Meta, conf conf.MapConf) (rr *RedisReader, err error)
func (*RedisReader) Close ¶ added in v1.2.1
func (rr *RedisReader) Close() (err error)
func (*RedisReader) Name ¶ added in v1.2.1
func (rr *RedisReader) Name() string
func (*RedisReader) ReadLine ¶ added in v1.2.1
func (rr *RedisReader) ReadLine() (data string, err error)
func (*RedisReader) SetMode ¶ added in v1.2.1
func (rr *RedisReader) SetMode(mode string, v interface{}) error
func (*RedisReader) Source ¶ added in v1.2.1
func (rr *RedisReader) Source() string
func (*RedisReader) Start ¶ added in v1.2.1
func (rr *RedisReader) Start()
func (*RedisReader) Status ¶ added in v1.3.1
func (rr *RedisReader) Status() utils.StatsInfo
func (*RedisReader) SyncMeta ¶ added in v1.2.1
func (rr *RedisReader) SyncMeta()
type ScriptReader ¶ added in v1.4.3
func NewScriptReader ¶ added in v1.4.3
func NewScriptReader(meta *Meta, conf conf.MapConf) (sr *ScriptReader, err error)
func (*ScriptReader) Close ¶ added in v1.4.3
func (sr *ScriptReader) Close() (err error)
func (*ScriptReader) LoopRun ¶ added in v1.4.3
func (sr *ScriptReader) LoopRun()
func (*ScriptReader) Name ¶ added in v1.4.3
func (sr *ScriptReader) Name() string
func (*ScriptReader) ReadLine ¶ added in v1.4.3
func (sr *ScriptReader) ReadLine() (data string, err error)
func (*ScriptReader) SetMode ¶ added in v1.4.3
func (sr *ScriptReader) SetMode(mode string, v interface{}) error
func (*ScriptReader) Source ¶ added in v1.4.3
func (sr *ScriptReader) Source() string
func (*ScriptReader) Start ¶ added in v1.4.3
func (sr *ScriptReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
func (*ScriptReader) SyncMeta ¶ added in v1.4.3
func (sr *ScriptReader) SyncMeta()
type SeqFile ¶
type SeqFile struct {
// contains filtered or unexported fields
}
SeqFile 按最终修改时间依次读取文件的Reader类型
func NewSeqFile ¶
type ServerReader ¶ added in v1.2.1
type ServerReader interface { //Name reader名称 Name() string //Source 读取的数据源 Source() string Start() ReadLine() (string, error) Close() error SyncMeta() }
TODO 构建统一的 Server reader框架, 减少重复的编码
type SingleFile ¶
type SingleFile struct {
// contains filtered or unexported fields
}
func NewSingleFile ¶
func NewSingleFile(meta *Meta, path, whence string, isFromWeb bool) (sf *SingleFile, err error)
func (*SingleFile) Close ¶
func (sf *SingleFile) Close() (err error)
func (*SingleFile) Lag ¶ added in v1.4.4
func (sf *SingleFile) Lag() (rl *models.LagInfo, err error)
func (*SingleFile) Name ¶
func (sf *SingleFile) Name() string
func (*SingleFile) Reopen ¶
func (sf *SingleFile) Reopen() (err error)
func (*SingleFile) Source ¶
func (sf *SingleFile) Source() string
func (*SingleFile) SyncMeta ¶
func (sf *SingleFile) SyncMeta() error
type SnmpReader ¶ added in v1.4.3
type SnmpReader struct { SnmpName string Agents []string // agent address [ip:port] Timeout time.Duration // 等待回复的时间 Interval time.Duration // 收集频率 Retries int Version uint8 // 1, 2, 3 Community string // version 1&2 的参数 MaxRepetitions uint8 // version 2&3 的参数 ContextName string // version 3 参数 SecLevel string //"noAuthNoPriv", "authNoPriv", "authPriv" SecName string AuthProtocol string // "MD5", "SHA", "", 默认: "" AuthPassword string PrivProtocol string // "DES", "AES", "", 默认: "" PrivPassword string EngineID string EngineBoots uint32 EngineTime uint32 Tables []Table Fields []Field ConnectionCache []snmpConnection Meta *Meta Status int32 StopChan chan struct{} DataChan chan interface{} }
SnmpReader holds the configuration for the plugin.
func NewSnmpReader ¶ added in v1.4.3
func NewSnmpReader(meta *Meta, c conf.MapConf) (s *SnmpReader, err error)
func (*SnmpReader) Close ¶ added in v1.4.3
func (s *SnmpReader) Close() error
func (*SnmpReader) Gather ¶ added in v1.4.3
func (s *SnmpReader) Gather() (err error)
func (*SnmpReader) ReadLine ¶ added in v1.4.3
func (s *SnmpReader) ReadLine() (line string, err error)
func (*SnmpReader) SetMode ¶ added in v1.4.3
func (s *SnmpReader) SetMode(mode string, v interface{}) error
func (*SnmpReader) Start ¶ added in v1.4.4
func (s *SnmpReader) Start() error
func (*SnmpReader) StoreData ¶ added in v1.4.3
func (s *SnmpReader) StoreData(data []map[string]interface{}) (err error)
func (*SnmpReader) SyncMeta ¶ added in v1.4.3
func (s *SnmpReader) SyncMeta()
type SocketReader ¶ added in v1.3.6
type SocketReader struct { ServiceAddress string MaxConnections int ReadBufferSize int ReadTimeout time.Duration KeepAlivePeriod time.Duration // resource need close ReadChan chan string Closer io.Closer // contains filtered or unexported fields }
func NewSocketReader ¶ added in v1.3.6
func NewSocketReader(meta *Meta, conf conf.MapConf) (*SocketReader, error)
func (*SocketReader) Close ¶ added in v1.3.6
func (sr *SocketReader) Close() (err error)
func (*SocketReader) Name ¶ added in v1.3.6
func (sr *SocketReader) Name() string
func (*SocketReader) ReadLine ¶ added in v1.3.6
func (sr *SocketReader) ReadLine() (data string, err error)
func (*SocketReader) SetMode ¶ added in v1.3.6
func (sr *SocketReader) SetMode(mode string, v interface{}) error
func (*SocketReader) Source ¶ added in v1.3.6
func (sr *SocketReader) Source() string
func (*SocketReader) Start ¶ added in v1.3.6
func (sr *SocketReader) Start() error
func (*SocketReader) SyncMeta ¶ added in v1.3.6
func (sr *SocketReader) SyncMeta()
type SqlReader ¶
type StatsReader ¶ added in v1.3.1
StatsReader 是一个通用的带有统计接口的reader