Documentation ¶
Overview ¶
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Index ¶
- Constants
- Variables
- func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)
- func ParseLoopDuration(cronSched string) (dur time.Duration, err error)
- func RegisterConstructor(typ string, c Constructor)
- type BufReader
- func (b *BufReader) Close() error
- func (b *BufReader) FormMutiLine() []byte
- func (b *BufReader) Lag() (rl *LagInfo, err error)
- func (b *BufReader) Name() string
- func (b *BufReader) ReadLine() (ret string, err error)
- func (b *BufReader) ReadPattern() (string, error)
- func (b *BufReader) ReadString(delim byte) (ret string, err error)
- func (b *BufReader) SetMode(mode string, v interface{}) (err error)
- func (b *BufReader) Source() string
- func (b *BufReader) Status() StatsInfo
- func (b *BufReader) SyncMeta()
- type Constructor
- type DaemonReader
- type DataReader
- type FileReader
- type LagReader
- type LastSync
- type LineSkipper
- type Meta
- func (m *Meta) AddSubMeta(key string, meta *Meta) error
- func (m *Meta) AppendDeleteFile(path string) (err error)
- func (m *Meta) AppendDoneFile(path string) (err error)
- func (m *Meta) AppendDoneFileInode(path string, inode uint64) (err error)
- func (m *Meta) BufFile() string
- func (m *Meta) BufMetaFile() string
- func (m *Meta) CacheLineFile() string
- func (m *Meta) CheckExpiredSubMetas(expire time.Duration)
- func (m *Meta) CleanExpiredSubMetas(expire time.Duration)
- func (m *Meta) Clear() error
- func (m *Meta) DeleteDoneFile(path string) error
- func (m *Meta) DeleteFile() string
- func (m *Meta) DoneFile() string
- func (m *Meta) ExtraInfo() map[string]string
- func (m *Meta) FtSaveLogPath() string
- func (m *Meta) GetDataSourceTag() string
- func (m *Meta) GetDoneFileContent() ([]string, error)
- func (m *Meta) GetDoneFileInode() map[string]bool
- func (m *Meta) GetDoneFiles() ([]File, error)
- func (m *Meta) GetEncodingWay() (e string)
- func (m *Meta) GetMode() string
- func (m *Meta) GetTagFile() string
- func (m *Meta) GetTags() map[string]interface{}
- func (m *Meta) IsDoneFile(file string) bool
- func (m *Meta) IsExist() bool
- func (m *Meta) IsFileMode() bool
- func (m *Meta) IsNotExist() bool
- func (m *Meta) IsNotValid() bool
- func (m *Meta) IsValid() bool
- func (m *Meta) LogPath() string
- func (m *Meta) MetaFile() string
- func (m *Meta) ReadBuf(buf []byte) (n int, err error)
- func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)
- func (m *Meta) ReadCacheLine() ([]byte, error)
- func (m *Meta) ReadDBDoneFile(database string) (content []string, err error)
- func (m *Meta) ReadOffset() (currFile string, offset int64, err error)
- func (m *Meta) ReadRecordsFile(recordsFile string) ([]string, error)
- func (m *Meta) ReadStatistic() (stat Statistic, err error)
- func (m *Meta) RemoveSubMeta(key string)
- func (m *Meta) Reset() error
- func (m *Meta) SetEncodingWay(e string)
- func (m *Meta) StatisticFile() string
- func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)
- func (m *Meta) WriteCacheLine(lines string) error
- func (m *Meta) WriteOffset(currFile string, offset int64) (err error)
- func (m *Meta) WriteStatistic(stat *Statistic) error
- type NewLineBytesRecorder
- type Reader
- func NewFileBufReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)deprecated
- func NewFileDirReader(meta *Meta, conf conf.MapConf) (reader Reader, err error)
- func NewReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)
- func NewSingleFileReader(meta *Meta, conf conf.MapConf) (reader Reader, err error)
- type Registry
- type SeqFile
- func (sf *SeqFile) Close() (err error)
- func (sf *SeqFile) IsNewOpen() bool
- func (sf *SeqFile) Lag() (rl *LagInfo, err error)
- func (sf *SeqFile) Name() string
- func (sf *SeqFile) NewLineBytesIndex() []SourceIndex
- func (sf *SeqFile) Read(p []byte) (n int, err error)
- func (sf *SeqFile) SetSkipped()
- func (sf *SeqFile) Source() string
- func (sf *SeqFile) SyncMeta() (err error)
- type SingleFile
- func (sf *SingleFile) Close() (err error)
- func (sf *SingleFile) Lag() (rl *LagInfo, err error)
- func (sf *SingleFile) Name() string
- func (sf *SingleFile) Read(p []byte) (n int, err error)
- func (sf *SingleFile) Reopen() (err error)
- func (sf *SingleFile) Source() string
- func (sf *SingleFile) SyncMeta() error
- type SourceIndex
- type Statistic
- type StatsReader
Constants ¶
const ( DefaultBufSize = 4096 MaxHeadPatternBufferSize = 20 * 1024 * 1024 )
const ( DoneFileName = "file.done" FtSaveLogPath = "ft_log" // ft log 在 meta 中的文件夹名字 )
const ( DefautFileRetention = 7 ModeMetrics = "metrics" )
const ( KeyLogPath = "log_path" KeyMetaPath = "meta_path" KeyFileDone = "file_done" KeyMode = "mode" KeyBufSize = "reader_buf_size" KeyWhence = "read_from" KeyEncoding = "encoding" KeyMysqlEncoding = "encoding" KeyReadIOLimit = "readio_limit" KeyDataSourceTag = "datasource_tag" KeyTagFile = "tag_file" KeyHeadPattern = "head_pattern" KeyNewFileNewLine = "newfile_newline" KeySkipFileFirstLine = "skip_first_line" // 忽略隐藏文件 KeyIgnoreHiddenFile = "ignore_hidden" KeyIgnoreFileSuffix = "ignore_file_suffix" KeyValidFilePattern = "valid_file_pattern" KeyExpire = "expire" KeySubmetaExpire = "submeta_expire" KeyMaxOpenFiles = "max_open_files" KeyStatInterval = "stat_interval" KeyMysqlOffsetKey = "mysql_offset_key" KeyMysqlReadBatch = "mysql_limit_batch" KeyMysqlDataSource = "mysql_datasource" KeyMysqlDataBase = "mysql_database" KeyMysqlSQL = "mysql_sql" KeyMysqlCron = "mysql_cron" KeyMysqlExecOnStart = "mysql_exec_onstart" KeyMysqlHistoryAll = "mysql_history_all" KyeMysqlTable = "mysql_table" KeySQLSchema = "sql_schema" KeyMagicLagDuration = "magic_lag_duration" KeyMssqlOffsetKey = "mssql_offset_key" KeyMssqlReadBatch = "mssql_limit_batch" KeyMssqlDataSource = "mssql_datasource" KeyMssqlDataBase = "mssql_database" KeyMssqlSchema = "mssql_schema" KeyMssqlSQL = "mssql_sql" KeyMssqlCron = "mssql_cron" KeyMssqlExecOnStart = "mssql_exec_onstart" KeyPGsqlOffsetKey = "postgres_offset_key" KeyPGsqlReadBatch = "postgres_limit_batch" KeyPGsqlDataSource = "postgres_datasource" KeyPGsqlDataBase = "postgres_database" KeyPGsqlSchema = "postgres_schema" KeyPGsqlSQL = "postgres_sql" KeyPGsqlCron = "postgres_cron" KeyPGsqlExecOnStart = "postgres_exec_onstart" KeyESReadBatch = "es_limit_batch" KeyESIndex = "es_index" KeyESType = "es_type" KeyESHost = "es_host" KeyESKeepAlive = "es_keepalive" KeyESVersion = "es_version" KeyMongoHost = "mongo_host" KeyMongoDatabase = "mongo_database" KeyMongoCollection = "mongo_collection" KeyMongoOffsetKey = "mongo_offset_key" KeyMongoReadBatch = "mongo_limit_batch" KeyMongoCron = "mongo_cron" KeyMongoExecOnstart = "mongo_exec_onstart" KeyMongoFilters = "mongo_filters" KeyMongoCert = "mongo_cacert" KeyKafkaGroupID = "kafka_groupid" KeyKafkaTopic = "kafka_topic" KeyKafkaZookeeper = "kafka_zookeeper" KeyKafkaZookeeperChroot = "kafka_zookeeper_chroot" KeyKafkaZookeeperTimeout = "kafka_zookeeper_timeout" KeyExecInterpreter = "script_exec_interprepter" KeyScriptCron = "script_cron" KeyScriptExecOnStart = "script_exec_onstart" KeyErrDirectReturn = "errDirectReturn" )
FileReader's conf keys
const ( ModeDir = "dir" ModeFile = "file" ModeTailx = "tailx" ModeFileAuto = "fileauto" ModeDirx = "dirx" ModeMySQL = "mysql" ModeMSSQL = "mssql" ModePostgreSQL = "postgres" ModeElastic = "elastic" ModeMongo = "mongo" ModeKafka = "kafka" ModeRedis = "redis" ModeSocket = "socket" ModeHTTP = "http" ModeScript = "script" ModeSnmp = "snmp" ModeCloudWatch = "cloudwatch" ModeCloudTrail = "cloudtrail" )
FileReader's modes
const ( ReadModeHeadPatternString = "mode_head_pattern_string" ReadModeHeadPatternRegexp = "mode_head_pattern_regexp" )
const ( WhenceOldest = "oldest" WhenceNewest = "newest" )
KeyWhence 的可选项
const ( StatusInit int32 = iota StatusStopped StatusStopping StatusRunning )
const ( KeyS3Region = "s3_region" KeyS3AccessKey = "s3_access_key" KeyS3SecretKey = "s3_secret_key" KeyS3Bucket = "s3_bucket" KeyS3Prefix = "s3_prefix" KeySyncDirectory = "sync_directory" KeySyncMetastore = "sync_metastore" KeySyncInterval = "sync_interval" KeySyncConcurrent = "sync_concurrent" )
Constants for cloudtrail
const ( KeyRegion = "region" /* 认证顺序: 1. role_arn 2. ak,sk 3. profile 4. 环境变量 5. shared_credential_file 6. EC2 instance profile */ KeyRoleArn = "role_arn" KeyAWSAccessKey = "aws_access_key" KeyAWSSecretKey = "aws_secret_key" KeyAWSToken = "aws_token" KeyAWSProfile = "aws_profile" KeyCollectInterval = "interval" KeyNamespace = "namespace" KeyRateLimit = "ratelimit" KeyMetrics = "metrics" KeyDimension = "dimensions" KeyCacheTTL = "cache_ttl" KeyPeriod = "period" KeyDelay = "delay" )
Constants for cloudwatch
const ( ElasticVersion3 = "3.x" ElasticVersion5 = "5.x" ElasticVersion6 = "6.x" )
Constants for Elastic
const ( KeyHTTPServiceAddress = "http_service_address" KeyHTTPServicePath = "http_service_path" DefaultHTTPServiceAddress = ":4000" DefaultHTTPServicePath = "/logkit/data" )
Constants for HTTP
const ( DataTypeHash = "hash" DataTypeSortedSet = "zset" DataTypeSet = "set" DataTypeString = "string" DataTypeList = "list" DataTypeChannel = "channel" DataTypePatterChannel = "pattern_channel" KeyRedisDataType = "redis_datatype" // 必填 KeyRedisDB = "redis_db" //默认 是0 KeyRedisKey = "redis_key" //必填 KeyRedisHashArea = "redisHash_area" KeyRedisAddress = "redis_address" // 默认127.0.0.1:6379 KeyRedisPassword = "redis_password" KeyTimeoutDuration = "redis_timeout" )
Constants for Redis
const ( SocketRulePacket = "按原始包读取" SocketRuleJson = "按json格式读取" SocketRuleLine = "按换行符读取" )
const ( KeySnmpReaderAgents = "snmp_agents" KeySnmpTableInitHost = "snmp_table_init_host" KeySnmpReaderTimeOut = "snmp_time_out" KeySnmpReaderInterval = "snmp_interval" KeySnmpReaderRetries = "snmp_retries" KeySnmpReaderVersion = "snmp_version" KeySnmpReaderCommunity = "snmp_community" KeySnmpReaderMaxRepetitions = "snmp_max_repetitions" KeySnmpReaderContextName = "snmp_context_name" KeySnmpReaderSecLevel = "snmp_sec_level" KeySnmpReaderSecName = "snmp_sec_name" KeySnmpReaderAuthProtocol = "snmp_auth_protocol" KeySnmpReaderAuthPassword = "snmp_auth_password" KeySnmpReaderPrivProtocol = "snmp_priv_protocol" KeySnmpReaderPrivPassword = "snmp_priv_password" KeySnmpReaderEngineID = "snmp_engine_id" KeySnmpReaderEngineBoots = "snmp_engine_boots" KeySnmpReaderEngineTime = "snmp_engine_time" KeySnmpReaderTables = "snmp_tables" KeySnmpReaderName = "snmp_reader_name" KeySnmpReaderFields = "snmp_fields" KeySnmpTableName = "snmp_table" KeyTimestamp = "timestamp" )
Constants for SNMP
const ( // 监听的url形式包括: // socket_service_address = "tcp://:3110" // socket_service_address = "tcp://127.0.0.1:http" // socket_service_address = "tcp4://:3110" // socket_service_address = "tcp6://:3110" // socket_service_address = "tcp6://[2001:db8::1]:3110" // socket_service_address = "udp://:3110" // socket_service_address = "udp4://:3110" // socket_service_address = "udp6://:3110" // socket_service_address = "unix:///tmp/sys.sock" // socket_service_address = "unixgram:///tmp/sys.sock" KeySocketServiceAddress = "socket_service_address" KeySocketSplitByLine = "socket_split_by_line" KeySocketRule = "socket_rule" // 最大并发连接数 // 仅用于 stream sockets (e.g. TCP). // 0 (default) 为无限制. // socket_max_connections = 1024 KeySocketMaxConnections = "socket_max_connections" // 读的超时时间 // 仅用于 stream sockets (e.g. TCP). // 0 (default) 为没有超时 // socket_read_timeout = "30s" KeySocketReadTimeout = "socket_read_timeout" // Socket的Buffer大小,默认65535 // socket_read_buffer_size = 65535 KeySocketReadBufferSize = "socket_read_buffer_size" // TCP连接的keep_alive时长 // 0 表示关闭keep_alive // 默认5分钟 KeySocketKeepAlivePeriod = "socket_keep_alive_period" )
Constants for Socket
const DirMode = "dir"
DirMode 按时间顺序顺次读取文件夹下所有文件的模式
const FileMode = "file"
FileMode 读取单个文件模式
const (
Loop = "loop"
)
Variables ¶
var ( ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte") ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune") ErrBufferFull = errors.New("bufio: buffer full") ErrNegativeCount = errors.New("bufio: negative count") )
var ( ModeUsages = KeyValueSlice{ {ModeFileAuto, "从文件读取( fileauto 模式)", ""}, {ModeFile, "从文件读取( file 模式)", ""}, {ModeTailx, "从文件读取( tailx 模式)", ""}, {ModeDir, "从文件读取( dir 模式)", ""}, {ModeDirx, "从文件读取( dirx 模式)", ""}, {ModeMySQL, "从 MySQL 读取", ""}, {ModeMSSQL, "从 MSSQL 读取", ""}, {ModePostgreSQL, "从 PostgreSQL 读取", ""}, {ModeElastic, "从 Elasticsearch 读取", ""}, {ModeMongo, "从 MongoDB 读取", ""}, {ModeKafka, "从 Kafka 读取 (针对0.8及以前版本)", "Kafka"}, {ModeRedis, "从 Redis 读取", ""}, {ModeSocket, "从 Socket 读取", ""}, {ModeHTTP, "从 Http 请求中读取", ""}, {ModeScript, "从脚本的执行结果中读取", ""}, {ModeSnmp, "从 SNMP 服务中读取", ""}, {ModeCloudWatch, "从 AWS Cloudwatch 中读取", ""}, {ModeCloudTrail, "从 AWS S3(原Cloudtrail) 中读取", ""}, } ModeToolTips = KeyValueSlice{ {ModeFileAuto, "会自动根据路径匹配 dir, file, tailx 三种模式中的一种", ""}, {ModeDir, "logkit会在启动时根据文件夹下文件时间顺序依次读取文件,当读到时间最新的文件时会不断读取追加的数据,直到该文件夹下出现新的文件。该模式的经典日志存储方式为整个文件夹下存储业务日志,文件夹下的日志使用统一前缀,后缀为时间戳,根据日志的大小rotate到新的文件。", ""}, {ModeFile, "logkit会不断读取文件追加的数据。该模式的经典日志存储方式类似于nginx的日志rotate方式,日志名称为固定的名称,如access.log,rotate时直接move成新的文件如access.log.1,新的数据仍然写入到access.log。", ""}, {ModeTailx, "展开并匹配所有符合表达式的文件,并持续读取所有有数据追加的文件。每隔stat_interval的时间,重新刷新一遍logpath模式串,添加新增的文件。该模式比较灵活,几乎可以读取所有日志更新,需要注意的是,使用tailx模式容易导致文件句柄打开过多。tailx模式的文件重复判断标准为文件名称,若使用rename, copy等方式改变日志名称,并且新的名字在logpath模式串的包含范围内,在read_from为oldest的情况下则会导致重复写入数据。", ""}, {ModeDirx, "展开并匹配所有符合表达式的目录下的文件,并持续读取目录中新产生的文件,或者最新文件的数据追加。每隔stat_interval的时间,重新刷新一遍logpath模式串,监听新增的文件夹。该模式与tailx最大的区别时底层的实现方式是按文件夹读取(dir),而tailx是按文件读取,同样,使用dirx模式容易导致文件句柄打开过多,当相对于tailx会大大减少。dirx模式监听到文件夹后,会按文件夹内文件的最后更新依次读取,适合文件夹内文件依次滚动产生的场景。如果文件夹内的不同文件都会随时更新,则会导致重复读取,此时请选择tailx。", ""}, {ModeMySQL, "MySQL Reader是以定时任务的形式去执行mysql语句,将mysql读取到的内容全部获取则任务结束,等到下一个定时任务的到来,也可以仅执行一次。", ""}, {ModeMSSQL, "Microsoft SQL Server Reader是以定时任务的形式去执行sql语句,将sql读取到的内容全部获取则任务结束,等到下一个定时任务的到来,也可以仅执行一次。", ""}, {ModePostgreSQL, "PostgreSQL Reader是以定时任务的形式去执行 PostgreSQL 查询语句,将 PostgreSQL 读取到的内容全部获取则任务结束,等到下一个定时任务的到来,也可以仅执行一次。", ""}, {ModeElastic, "Elasticsearch Reader 是logkit提供的从Elasticsearch读取日志的配置方式。Elasticsearch Reader输出的是json字符串,需要使用json的parser解析。", ""}, {ModeMongo, "MongoDB reader 是logkit提供的从MongoDB读取数据的配置方式。MongoDB reader 输出的是json字符串,需要使用json的parser解析。", ""}, {ModeKafka, "Kafka reader 是logkit提供的从Kafka读取数据的配置方式。针对0.8及以前版本的Kafka服务", "Kafka"}, {ModeRedis, "Redis Reader 是logkit提供的从Redis读取日志的配置方式。Redis Reader 输出的是redis中存储的字符串,具体字符串是什么格式,可以在parser中用对应方式解析。", ""}, {ModeSocket, `Socket Reader 是logkit提供的以端口监听的方式接受并读取日志的形式,主要支持tcp\udp\unix套接字 这三大类协议。`, ""}, {ModeHTTP, `Http Reader 是 logkit 提供的以 http post 请求的方式接受并读取日志的形式。该 reader 支持 gzip, 但请在请求头中添加Content-Encoding=gzip 或者 Content-Type=application/gzip,默认接收 request body 中所有的数据作为要读取的日志, 限制 request body 小于 100MB,默认将 request body 中的数据使用 \n 分割, 每行作为一条数据`, ""}, {ModeScript, "Script Reader是以定时任务的形式执行脚本,将脚本执行的结果全部获取则任务结束,等到下一个定时任务的到来,也可以仅执行一次。", ""}, {ModeSnmp, "Snmp Reader 可以从 Snmp 服务中收集数据。snmp_fields 和 snmp_tables 这两项配置需要填入符合 json数组 格式的字符串, 字符串内的双引号需要转义。", ""}, {ModeCloudWatch, "CloudWatch Reader 可以从 AWS CloudWatch 服务的接口中获取数据。", ""}, {ModeCloudTrail, "AWS S3(原Cloudtrail) Reader 可以从 AWS S3(原Cloudtrail) 服务的接口中获取数据。", ""}, } )
ModeUsages 和 ModeTooltips 用途说明
var ( OptionMetaPath = Option{ KeyName: KeyMetaPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "数据保存路径(meta_path)", Advance: true, ToolTip: "一个文件夹,记录本次reader的读取位置,默认会自动生成", } OptionDataSourceTag = Option{ KeyName: KeyDataSourceTag, ChooseOnly: false, Default: "datasource", DefaultNoUse: false, Description: "来源标签(datasource_tag)", Advance: true, ToolTip: "把读取日志的路径名称也作为标签,记录到解析出来的数据结果中,此处填写标签名称", } OptionBuffSize = Option{ KeyName: KeyBufSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "数据缓存大小(reader_buf_size)", CheckRegex: "\\d+", Advance: true, ToolTip: "读取数据的缓存大小,默认4096,单位字节", } OptionEncoding = Option{ KeyName: KeyEncoding, ChooseOnly: true, ChooseOptions: []interface{}{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1", "GBK", "latin1", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4", "macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251", "windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256", "windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995", "ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995", "ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999", "KOI8-R", "KOI8-U", "ebcdic-xml-us"}, Default: "UTF-8", DefaultNoUse: false, Description: "编码方式(encoding)", Advance: true, ToolTip: "读取日志文件的编码方式,默认为UTF-8,即按照UTF-8的编码方式读取文件", } OptionWhence = Option{ KeyName: KeyWhence, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{WhenceOldest, WhenceNewest}, Default: WhenceOldest, Description: "读取起始位置(read_from)", ToolTip: "在创建新文件或meta信息损坏的时候(即历史读取记录不存在),将从数据源的哪个位置开始读取,最新或最老", } OptionReadIoLimit = Option{ KeyName: KeyReadIOLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "读取速度限制(readio_limit)", CheckRegex: "\\d+", Advance: true, ToolTip: "读取文件的磁盘限速,填写正整数,单位为MB/s, 默认限速20MB/s", } OptionHeadPattern = Option{ KeyName: KeyHeadPattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "按正则表达式规则换行(head_pattern)", Advance: true, ToolTip: "reader每次读取一行,若要读取多行,请填写head_pattern,表示匹配多行时新的一行的开始符合该正则表达式", } OptionSQLSchema = Option{ KeyName: KeySQLSchema, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "手动定义SQL字段类型(sql_schema)", Advance: true, ToolTip: `默认情况下会自动识别数据字段的类型,当不能识别时,可以sql_schema指定 mysql 数据字段的类型,目前支持string、long、float三种类型,单个字段左边为字段名称,右边为类型,空格分隔 abc float;不同的字段用逗号分隔。支持简写为float=>f,long=>l,string=>s. 如:"sql_schema":"abc string,bcd float,xyz long"`, } OptionMagicLagDuration = Option{ KeyName: KeyMagicLagDuration, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "魔法变量时间延迟(magic_lag_duration)", Advance: true, ToolTip: `针对魔法变量进行时间延迟,单位支持h(时)、m(分)、s(秒),如写24h,则渲染出来的时间魔法变量往前1天`, } OptionKeyNewFileNewLine = Option{ KeyName: KeyNewFileNewLine, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "文件末尾添加换行符(newfile_newline)", Advance: true, ToolTip: "开启后,不同文件结尾自动添加换行符", } OptionKeySkipFileFirstLine = Option{ KeyName: KeySkipFileFirstLine, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "跳过新文件的第一行(skip_first_line)", Advance: true, Required: false, ToolTip: "常用于带抬头的csv文件,抬头与实际数据类型不一致", } OptionKeyValidFilePattern = Option{ KeyName: KeyValidFilePattern, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "以linux通配符匹配文件(valid_file_pattern)", Advance: true, ToolTip: `针对dir读取模式需要解析的日志文件,可以设置匹配文件名的模式串,匹配方式为linux通配符展开方式,默认为*,即匹配文件夹下全部文件`, } OptionKeyIgnoreHiddenFile = Option{ KeyName: KeyIgnoreHiddenFile, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否忽略隐藏文件(ignore_hidden)", Advance: true, ToolTip: "读取的过程中是否忽略隐藏文件", } OptionKeyIgnoreFileSuffix = Option{ KeyName: KeyIgnoreFileSuffix, ChooseOnly: false, Default: strings.Join(DefaultIgnoreFileSuffixes, ","), DefaultNoUse: false, Description: "忽略此类后缀文件(ignore_file_suffix)", Advance: true, ToolTip: `针对dir读取模式需要解析的日志文件,可以设置读取的过程中忽略哪些文件后缀名,默认忽略的后缀包括".pid", ".swap", ".go", ".conf", ".tar.gz", ".tar", ".zip",".a", ".o", ".so"`, } OptionKeySubmetaExpire = Option{ KeyName: KeySubmetaExpire, ChooseOnly: false, Default: "720h", DefaultNoUse: false, Required: true, Description: "清理元数据的过期时间(submeta_expire)", CheckRegex: "\\d+[hms]", ToolTip: `当元数据的文件达到expire时间,则对其进行清理操作。写法为:数字加单位符号,组成字符串duration写法,支持时h、分m、秒s为单位,类似3h(3小时),10m(10分钟),5s(5秒),默认的expire时间是720h,即30天。若最大过期时间(expire)为0s,则不会清理元数据`, } OptionKeyMaxOpenFiles = Option{ KeyName: KeyMaxOpenFiles, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "最大打开文件数(max_open_files)", CheckRegex: "\\d+", Advance: true, ToolTip: "最大同时追踪的文件数,默认为256", } OptionKeyStatInterval = Option{ KeyName: KeyStatInterval, ChooseOnly: false, Default: "3m", DefaultNoUse: false, Description: "扫描间隔(stat_interval)", CheckRegex: "\\d+[hms]", Advance: true, ToolTip: `感知新增日志的定时检查时间`, } )
var DefaultIgnoreFileSuffixes = []string{
".pid", ".swap", ".go", ".conf", ".tar.gz", ".tar", ".zip",
".a", ".o", ".so"}
var ErrFileNotDir = errors.New("file is not directory")
var ErrFileNotRegular = errors.New("file is not regular")
var ErrMetaFileRead = errors.New("cannot read meta file")
var ErrNoFileChosen = errors.New("no files found")
var ErrStopped = errors.New("runner stopped")
var ModeKeyOptions = map[string][]Option{ ModeDir: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Placeholder: "/home/users/john/log/", Required: true, DefaultNoUse: true, Description: "日志文件夹路径(log_path)", ToolTip: "需要收集的日志的文件夹路径", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionEncoding, OptionDataSourceTag, OptionReadIoLimit, OptionHeadPattern, OptionKeyNewFileNewLine, OptionKeySkipFileFirstLine, OptionKeyIgnoreHiddenFile, OptionKeyIgnoreFileSuffix, OptionKeyValidFilePattern, }, ModeFile: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/users/john/log/my.log", DefaultNoUse: true, Description: "日志文件路径(log_path)", ToolTip: "需要收集的日志的文件路径", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionDataSourceTag, OptionEncoding, OptionReadIoLimit, OptionHeadPattern, }, ModeTailx: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/users/*/mylog/*.log", DefaultNoUse: true, Description: "日志文件路径模式串(log_path)", ToolTip: "需要收集的日志的文件(夹)模式串路径,写 * 代表通配", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionEncoding, OptionReadIoLimit, OptionDataSourceTag, OptionHeadPattern, { KeyName: KeyExpire, ChooseOnly: false, Default: "24h", DefaultNoUse: false, Required: true, Description: "忽略文件的最大过期时间(expire)", CheckRegex: "\\d+[hms]", ToolTip: `当日志达到expire时间,则放弃追踪。写法为:数字加单位符号,组成字符串duration写法,支持时h、分m、秒s为单位,类似3h(3小时),10m(10分钟),5s(5秒),默认的expire时间是24h,当expire时间是0s时表示永不过期`, }, OptionKeySubmetaExpire, OptionKeyMaxOpenFiles, OptionKeyStatInterval, }, ModeDirx: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Placeholder: "/home/users/*", Required: true, DefaultNoUse: true, Description: "日志文件夹路径模式串(log_path)", ToolTip: "需要收集的日志的文件夹模式串路径,写 * 代表通配", }, OptionMetaPath, OptionBuffSize, OptionWhence, OptionEncoding, OptionDataSourceTag, OptionReadIoLimit, OptionHeadPattern, OptionKeyNewFileNewLine, OptionKeySkipFileFirstLine, OptionKeyIgnoreHiddenFile, OptionKeyIgnoreFileSuffix, { KeyName: KeyExpire, ChooseOnly: false, Default: "0s", DefaultNoUse: false, Required: true, Description: "忽略文件夹内文件的最大过期时间(expire)", CheckRegex: "\\d+[hms]", ToolTip: `当文件夹内所有的日志达到expire时间,则放弃追踪。写法为:数字加单位符号,组成字符串duration写法,支持时h、分m、秒s为单位,类似3h(3小时),10m(10分钟),5s(5秒),默认的expire时间是0s,即永不过期`, }, OptionKeySubmetaExpire, OptionKeyMaxOpenFiles, OptionKeyStatInterval, OptionKeyValidFilePattern, }, ModeFileAuto: { { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/your/log/dir/or/path*.log", DefaultNoUse: true, Description: "日志路径(log_path)", ToolTip: "需要收集的日志文件(夹)路径", }, OptionMetaPath, OptionWhence, OptionEncoding, OptionDataSourceTag, OptionKeyNewFileNewLine, OptionHeadPattern, }, ModeMySQL: { { KeyName: KeyMysqlDataSource, Element: Text, ChooseOnly: false, Default: "", Required: true, Placeholder: "<username>:<password>@tcp(<hostname>:<port>)", DefaultNoUse: true, Description: "数据库地址(mysql_datasource)", ToolTip: `mysql数据源所需信息: username: 用户名, password: 用户密码, hostname: mysql地址, port: mysql端口, 示例:一个填写完整的字段类似于:"admin:123456@tcp(10.101.111.1:3306)"`, }, { KeyName: KeyMysqlDataBase, ChooseOnly: false, Placeholder: "<database>,默认读取所有用户数据库", DefaultNoUse: false, Default: "", Description: "数据库名称(mysql_database)", ToolTip: `mysql数据库名称,不填默认读取所有用户数据库`, }, { KeyName: KeyMysqlSQL, ChooseOnly: false, Default: "", Required: false, Placeholder: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(mysql_sql)", ToolTip: "填写要执行的sql语句", }, { KeyName: KeyMysqlEncoding, ChooseOnly: true, ChooseOptions: []interface{}{"big5", "dec8", "cp850", "hp8", "koi8r", "latin1", "latin2", "swe7", "ascii", "ujis", "sjis", "hebrew", "tis620", "euckr", "koi8u", "gb2312", "greek", "cp1250", "gbk", "latin5", "armscii8", "utf8", "ucs2", "cp866", "keybcs2", "macce", "macroman", "cp852", "latin7", "utf8mb4", "cp1251", "utf16", "utf16le", "cp1256", "cp1257", "utf32", "binary", "geostd8", "cp932", "eucjpms", "gb18030"}, Default: "utf8", DefaultNoUse: false, Description: "编码方式(encoding)", Advance: true, ToolTip: "读取数据库的编码方式,默认为utf8,即按照utf8的编码方式读取数据库", }, { KeyName: KeyMysqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "递增的列名称(mysql_offset_key)", Advance: true, ToolTip: "指定一个 mysql 的列名,作为 offset 的记录,类型必须是整型,建议使用插入(或修改)数据的时间戳(unixnano)作为该字段", }, { KeyName: KeyMysqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mysql_limit_batch)", CheckRegex: "\\d+", Advance: true, ToolTip: "若数据量大,可以填写该字段,分批次查询", }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyMysqlCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务", Advance: true, ToolTip: `定时任务触发周期,直接写"loop"循环执行,crontab的写法, 类似于* * * * * *,对应的是秒(0~59),分(0~59),时(0~23),日(1~31),月(1-12),星期(0~6),填*号表示所有遍历都执行`, }, { KeyName: KeyMysqlExecOnStart, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mysql_exec_onstart)", ToolTip: "启动时立即执行一次", }, { KeyName: KeyMysqlHistoryAll, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "false", DefaultNoUse: false, Description: "导入历史数据(history_all)", ToolTip: "帮助导入符合要求的所有历史数据", }, { KeyName: KyeMysqlTable, ChooseOnly: false, Placeholder: "<table>", DefaultNoUse: true, Default: "", Description: "数据库表名(mysql_table)", }, OptionSQLSchema, OptionMagicLagDuration, }, ModeMSSQL: { { KeyName: KeyMssqlDataSource, Element: Text, ChooseOnly: false, Placeholder: "server=<hostname or instance>;user id=<username>;password=<password>;port=<port>", DefaultNoUse: true, Default: "", Required: true, Description: "数据库地址(mssql_datasource)", ToolTip: `mssql数据源所需信息, username: 用户名, password: 用户密码, hostname: mssql地址,实例,port: mssql端口,默认1433, 示例:一个填写完整的mssql_datasource字段类似于:"server=localhost\SQLExpress;user id=sa;password=PassWord;port=1433"`, }, { KeyName: KeyMssqlDataBase, Default: "", Required: true, ChooseOnly: false, Placeholder: "<database>", DefaultNoUse: true, Description: "数据库名称(mssql_database)", ToolTip: "数据库名称", }, { KeyName: KeyMssqlSchema, Default: "", Required: false, ChooseOnly: false, Placeholder: "<schema>", DefaultNoUse: true, Description: "数据库模式名称(mssql_schema)", ToolTip: "数据库模式名称", }, { KeyName: KeyMssqlSQL, Default: "", Required: false, ChooseOnly: false, Placeholder: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(mssql_sql)", ToolTip: "要执行的sql语句", }, { KeyName: KeyMssqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "递增的列名称(mssql_offset_key)", Advance: true, ToolTip: `指定一个mssql的列名,作为offset的记录,类型必须是整型,建议使用插入(或修改)数据的时间戳(unixnano)作为该字段`, }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyMssqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mssql_limit_batch)", CheckRegex: "\\d+", Advance: true, ToolTip: "若数据量大,可以填写该字段,分批次查询", }, { KeyName: KeyMssqlCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(mssql_cron)", Advance: true, ToolTip: `定时任务触发周期,直接写"loop"循环执行,crontab的写法,类似于* * * * * *,对应的是秒(0~59),分(0~59),时(0~23),日(1~31),月(1-12),星期(0~6),填*号表示所有遍历都执行`, }, { KeyName: KeyMssqlExecOnStart, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mssql_exec_onstart)", ToolTip: "启动时立即执行一次", }, OptionSQLSchema, OptionMagicLagDuration, }, ModePostgreSQL: { { KeyName: KeyPGsqlDataSource, Element: Text, ChooseOnly: false, Default: "", Required: true, Placeholder: "host=localhost port=5432 connect_timeout=10 user=pqgotest password=123456 sslmode=disable", DefaultNoUse: true, Description: "数据库地址(postgres_datasource)", ToolTip: `PostgreSQL数据源所需信息,填写的形式如 host=localhost port=5432, 属性和实际的值用=(等于)符号连接,中间不要有空格,不同的属性用(空格)隔开,一个填写完整的 postgres_datasource 字段类似于:"host=localhost port=5432 connect_timeout=10 user=pqgotest password=123456 sslmode=disable"`, }, { KeyName: KeyPGsqlDataBase, ChooseOnly: false, Default: "", Required: true, Placeholder: "<database>", DefaultNoUse: true, Description: "数据库名称(postgres_database)", ToolTip: "数据库名称", }, { KeyName: KeyPGsqlSchema, ChooseOnly: false, Default: "", Required: false, Placeholder: "<schema>", DefaultNoUse: true, Description: "数据库模式名称(postgres_schema)", ToolTip: "数据库模式名称", }, { KeyName: KeyPGsqlSQL, ChooseOnly: false, Default: "", Required: false, Placeholder: "select * from <table>;", DefaultNoUse: true, Description: "数据查询语句(postgres_sql)", ToolTip: "要执行的查询语句", }, { KeyName: KeyPGsqlOffsetKey, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "递增的列名称(postgres_offset_key)", Advance: true, ToolTip: `指定一个 PostgreSQL 的列名,作为 offset 的记录,类型必须是整型,建议使用插入(或修改)数据的时间戳(unixnano)作为该字段`, }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyPGsqlReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(postgres_limit_batch)", CheckRegex: "\\d+", Advance: true, ToolTip: "若数据量大,可以填写该字段,分批次查询", }, { KeyName: KeyPGsqlCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(postgres_cron)", Advance: true, ToolTip: `定时任务触发周期,直接写"loop"循环执行,crontab的写法,类似于* * * * * *,对应的是秒(0~59),分(0~59),时(0~23),日(1~31),月(1-12),星期(0~6),填*号表示所有遍历都执行`, }, { KeyName: KeyPGsqlExecOnStart, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(postgres_exec_onstart)", ToolTip: "启动时立即执行一次", }, OptionSQLSchema, OptionMagicLagDuration, }, ModeElastic: { { KeyName: KeyESHost, ChooseOnly: false, Default: "", DefaultNoUse: true, Required: true, Placeholder: "http://localhost:9200", Description: "数据库地址(es_host)", ToolTip: "es的host地址以及端口,常用端口是9200", }, { KeyName: KeyESVersion, ChooseOnly: true, ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6}, Description: "版本(es_version)", ToolTip: "版本,3.x包含了2.x", }, { KeyName: KeyESIndex, ChooseOnly: false, Placeholder: "app-repo-123", DefaultNoUse: true, Default: "", Required: true, Description: "索引名称(es_index)", }, { KeyName: KeyESType, ChooseOnly: false, Placeholder: "type_app", Default: "", Required: true, DefaultNoUse: true, Description: "app名称(es_type)", }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyESReadBatch, ChooseOnly: false, Default: "100", Required: true, DefaultNoUse: false, Description: "分批查询的单批次大小(es_limit_batch)", Advance: true, ToolTip: "单批次查询数据大小,默认100", }, { KeyName: KeyESKeepAlive, ChooseOnly: false, Default: "1d", Required: true, DefaultNoUse: false, Description: "offset保存时间(es_keepalive)", CheckRegex: "\\d+[dms]", Advance: true, ToolTip: "logkit重启后可以继续读取ES数据的Offset记录在es服务端保存的时长,默认1d", }, }, ModeMongo: { { KeyName: KeyMongoHost, ChooseOnly: false, Default: "", Required: true, Placeholder: "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]", DefaultNoUse: true, Description: "数据库地址(mongo_host)", ToolTip: `mongodb的url地址,基础的是mongo的host地址以及端口,默认是localhost:9200,扩展形式可以写为: mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`, }, { KeyName: KeyMongoDatabase, ChooseOnly: false, Default: "", Required: true, Placeholder: "app123", DefaultNoUse: true, Description: "数据库名称(mongo_database)", ToolTip: "", }, { KeyName: KeyMongoCollection, ChooseOnly: false, Default: "", Required: true, Placeholder: "collection1", DefaultNoUse: true, Description: "数据表名称(mongo_collection)", ToolTip: "", }, { KeyName: KeyMongoOffsetKey, ChooseOnly: false, Default: "_id", Required: true, DefaultNoUse: false, Description: "递增的主键(mongo_offset_key)", ToolTip: `指定一个mongo的列名,作为offset的记录,类型必须是整型(比如unixnano的时间,或者自增的primary key)`, }, OptionMetaPath, OptionDataSourceTag, { KeyName: KeyMongoReadBatch, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "分批查询的单批次大小(mongo_limit_batch)", CheckRegex: "\\d+", Advance: true, ToolTip: "单次请求获取的数据量", }, { KeyName: KeyMongoCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(mongo_cron)", Advance: true, ToolTip: `定时任务触发周期,直接写"loop"循环执行,crontab的写法,类似于* * * * * *,对应的是秒(0~59),分(0~59),时(0~23),日(1~31),月(1-12),星期(0~6),填*号表示所有遍历都执行`, }, { KeyName: KeyMongoExecOnstart, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(mongo_exec_onstart)", ToolTip: "启动时立即执行一次", }, { KeyName: KeyMongoFilters, ChooseOnly: false, Default: "", DefaultNoUse: false, Placeholder: "{\"foo\": {\"i\": {\"$gt\": 10}}}", Description: "数据过滤方式(mongo_filters)", Advance: true, ToolTip: "表示collection的过滤规则,默认不过滤,全部获取", }, }, ModeKafka: { { KeyName: KeyKafkaGroupID, ChooseOnly: false, Default: "", Required: true, Placeholder: "logkit1", DefaultNoUse: true, Description: "consumer组名称(kafka_groupid)", ToolTip: "kafka组名,多个logkit同时消费时写同一个组名可以协同读取数据", }, { KeyName: KeyKafkaTopic, ChooseOnly: false, Default: "", Required: true, Placeholder: "test_topic1", DefaultNoUse: true, Description: "topic名称(kafka_topic)", }, { KeyName: KeyKafkaZookeeper, ChooseOnly: false, Default: "", Required: true, Placeholder: "localhost:2181", DefaultNoUse: true, Description: "zookeeper地址(kafka_zookeeper)", ToolTip: "zookeeper地址列表,多个用逗号分隔,常用端口是2181", }, { KeyName: KeyKafkaZookeeperChroot, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "zookeeper中kafka根路径(kafka_zookeeper_chroot)", Advance: true, ToolTip: "kafka在zookeeper根路径中的地址", }, OptionWhence, { KeyName: KeyKafkaZookeeperTimeout, ChooseOnly: false, Default: "1", Required: true, DefaultNoUse: false, Description: "zookeeper超时时间(kafka_zookeeper_timeout)", Advance: true, ToolTip: "zookeeper连接超时时间,单位为秒", }, OptionDataSourceTag, }, ModeRedis: { { KeyName: KeyRedisDataType, ChooseOnly: true, ChooseOptions: []interface{}{DataTypeList, DataTypeChannel, DataTypePatterChannel, DataTypeString, DataTypeSet, DataTypeSortedSet, DataTypeHash}, Description: "数据读取模式(redis_datatype)", ToolTip: "", }, { KeyName: KeyRedisDB, ChooseOnly: false, Default: "", Required: true, Placeholder: "db", DefaultNoUse: true, Description: "数据库名称(redis_db)", ToolTip: `Redis的数据库名,默认为"0"`, }, { KeyName: KeyRedisKey, ChooseOnly: false, Default: "", Required: true, Placeholder: "key1", DefaultNoUse: true, Description: "redis键(redis_key)", ToolTip: `Redis监听的键值,在不同模式(redis_datatype)下表示不同含义`, }, { KeyName: KeyRedisHashArea, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "hash模式对应的数据结构域(redisHash_area)", Advance: true, ToolTip: "", }, { KeyName: KeyRedisAddress, ChooseOnly: false, Default: "", Placeholder: "127.0.0.1:6379", Required: true, DefaultNoUse: false, Description: "数据库地址(redis_address)", ToolTip: `Redis的地址(IP+端口),默认为"127.0.0.1:6379"`, }, { KeyName: KeyRedisPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "密码(redis_password)", Advance: true, }, { KeyName: KeyTimeoutDuration, ChooseOnly: false, Default: "5s", Required: true, DefaultNoUse: false, Description: "单次读取超时时间(redis_timeout)", CheckRegex: "\\d+[ms]", Advance: true, ToolTip: "每次等待键值数据的超时时间[m(分)、s(秒)]", }, OptionDataSourceTag, }, ModeSocket: { { KeyName: KeySocketServiceAddress, ChooseOnly: false, Default: "", Placeholder: "tcp://127.0.0.1:3110", Required: true, DefaultNoUse: true, Description: "socket监听的地址(socket_service_address)", ToolTip: "socket监听的地址[协议://端口],如udp://127.0.0.1:3110", }, { KeyName: KeySocketMaxConnections, ChooseOnly: false, Default: "0", DefaultNoUse: false, Description: "最大并发连接数(socket_max_connections)", Advance: true, ToolTip: "仅tcp协议下生效", }, { KeyName: KeySocketSplitByLine, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", Advance: true, Description: "是否按行分隔内容(socket_split_by_line)", ToolTip: "开启后,对socket内容按行进行分隔", }, { KeyName: KeySocketRule, ChooseOnly: true, ChooseOptions: []interface{}{SocketRulePacket, SocketRuleLine, SocketRuleJson}, Default: "false", Advance: true, Description: "获取方式(socket_rule)", ToolTip: "默认对socket内容按包获取, json仅对tcp有效", }, { KeyName: KeySocketReadTimeout, ChooseOnly: false, Default: "1m", DefaultNoUse: false, Description: "连接超时时间(socket_read_timeout)", Advance: true, ToolTip: "填0为不设置超时", }, { KeyName: KeySocketReadBufferSize, ChooseOnly: false, Default: "65535", DefaultNoUse: false, Description: "连接缓存大小(socket_read_buffer_size)", Advance: true, ToolTip: "仅udp协议下生效", }, { KeyName: KeySocketKeepAlivePeriod, ChooseOnly: false, Default: "5m", DefaultNoUse: false, Description: "连接保持时长[0为关闭](socket_keep_alive_period)", Advance: true, ToolTip: "填0为关闭keep_alive", }, OptionDataSourceTag, }, ModeHTTP: { { KeyName: KeyHTTPServiceAddress, ChooseOnly: false, Default: "", Placeholder: DefaultHTTPServiceAddress, Required: true, DefaultNoUse: true, Description: "监听的地址和端口(http_service_address)", ToolTip: "监听的地址和端口,格式为:[<ip/host/不填>:port],如 :3000 , 监听3000端口的http请求", }, { KeyName: KeyHTTPServicePath, ChooseOnly: false, Default: "", Placeholder: DefaultHTTPServicePath, Required: true, DefaultNoUse: true, Description: "监听地址前缀(http_service_path)", ToolTip: "监听的请求地址,如 /data ", }, OptionDataSourceTag, }, ModeScript: { { KeyName: KeyExecInterpreter, ChooseOnly: false, Default: "/bin/bash", Placeholder: "/bin/bash", Required: true, DefaultNoUse: false, Description: "脚本执行解释器(script_exec_interpreter)", ToolTip: "脚本的解释器", }, { KeyName: KeyLogPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/users/john/log/my.sh", DefaultNoUse: true, Description: "脚本路径(log_path)", ToolTip: "脚本的路径", }, { KeyName: KeyScriptCron, ChooseOnly: false, Default: "", Placeholder: "00 00 04 * * *", DefaultNoUse: false, Description: "定时任务(script_cron)", Advance: true, ToolTip: `定时任务触发周期,直接写"loop"循环执行,crontab的写法,类似于* * * * * *,对应的是秒(0~59),分(0~59),时(0~23),日(1~31),月(1-12),星期(0~6),填*号表示所有遍历都执行`, }, { KeyName: KeyScriptExecOnStart, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "启动时立即执行(script_exec_onstart)", ToolTip: "", }, }, ModeCloudWatch: { { KeyName: KeyRegion, ChooseOnly: false, Default: "", Placeholder: "us-east-1", DefaultNoUse: true, Required: true, Description: "区域(region)", ToolTip: "服务所在区域", }, { KeyName: KeyNamespace, ChooseOnly: false, Default: "", Placeholder: "AWS/ELB", DefaultNoUse: true, Required: true, Description: "命名空间(namespace)", ToolTip: "Cloudwatch数据的命名空间", }, { KeyName: KeyRoleArn, ChooseOnly: false, Default: "", Placeholder: "选填一种方式鉴权", DefaultNoUse: true, Description: "授权角色(role_arn)", Advance: true, ToolTip: "AWS的授权角色(鉴权第一优先)", }, { KeyName: KeyAWSAccessKey, ChooseOnly: false, Default: "", Placeholder: "选填一种方式鉴权", DefaultNoUse: true, Description: "AK(aws_access_key)", ToolTip: "AWS的access_key(鉴权第二优先)", }, { KeyName: KeyAWSSecretKey, ChooseOnly: false, Default: "", Placeholder: "选填一种方式鉴权", DefaultNoUse: true, Description: "SK(aws_secret_key)", ToolTip: "AWS的secret_key(鉴权第二优先)", }, { KeyName: KeyAWSToken, ChooseOnly: false, Default: "", Placeholder: "选填一种方式鉴权", DefaultNoUse: true, Description: "鉴权token(aws_token)", ToolTip: "AWS的鉴权token", }, { KeyName: KeyAWSProfile, ChooseOnly: false, Default: "", Placeholder: "选填一种方式鉴权", DefaultNoUse: true, Description: "共享profile(aws_profile)", ToolTip: "鉴权第三优先", }, { KeyName: KeySharedCredentialFile, ChooseOnly: false, Default: "", Placeholder: "选填一种方式鉴权", DefaultNoUse: true, Description: "鉴权文件(shared_credential_file)", ToolTip: "鉴权文件路径(鉴权第四优先)", }, { KeyName: KeyCollectInterval, ChooseOnly: false, Default: "5m", Placeholder: "", DefaultNoUse: false, Description: "收集间隔(interval)", ToolTip: "最小设置1分钟(1m)", }, { KeyName: KeyRateLimit, ChooseOnly: false, Default: "200", Placeholder: "", DefaultNoUse: false, Required: false, Description: "每秒最大请求数(ratelimit)", Advance: true, ToolTip: "请求限速", }, { KeyName: KeyMetrics, ChooseOnly: false, Default: "", Placeholder: "Latency, RequestCount", DefaultNoUse: false, Required: false, Description: "metric名称(metrics)", ToolTip: "可填写多个,逗号连接,为空全部收集", }, { KeyName: KeyDimension, ChooseOnly: false, Default: "", Placeholder: "LoadBalancerName p-example,LatencyName y-example", DefaultNoUse: false, Required: false, Description: "收集维度(dimensions)", ToolTip: "可填写多个,逗号连接,为空全部收集", }, { KeyName: KeyDelay, ChooseOnly: false, Default: "5m", Placeholder: "", DefaultNoUse: false, Required: false, Description: "收集延迟(delay)", Advance: true, ToolTip: "根据CloudWatch中数据产生的实际时间确定是否需要增加收集延迟", }, { KeyName: KeyCacheTTL, ChooseOnly: false, Default: "60m", Placeholder: "60m", DefaultNoUse: false, Required: false, Description: "刷新metrics时间(cache_ttl)", Advance: true, ToolTip: "从namespace中刷新metrics的周期", }, { KeyName: KeyPeriod, ChooseOnly: false, Default: "", Placeholder: "1m", DefaultNoUse: false, Required: false, Description: "聚合间隔(period)", Advance: true, ToolTip: "从cloudwatch收集数据聚合的间隔", }, OptionDataSourceTag, }, ModeSnmp: { { KeyName: KeySnmpReaderName, ChooseOnly: false, Default: "", Placeholder: "logkit_default_name", DefaultNoUse: true, Description: "名称(snmp_reader_name)", Advance: true, ToolTip: "reader的读取名称", }, { KeyName: KeySnmpReaderAgents, ChooseOnly: false, Default: "", Placeholder: "127.0.0.1:161,10.10.0.1:161", Required: true, DefaultNoUse: true, Description: "agents列表(snmp_agents)", ToolTip: "多个可用逗号','分隔", }, { KeyName: KeySnmpTableInitHost, ChooseOnly: false, Default: "127.0.0.1", Placeholder: "127.0.0.1", DefaultNoUse: true, Description: "tables初始化连的snmpd服务地址(snmp_table_init_host)", ToolTip: "tables初始化连的snmpd服务地址,默认127.0.0.1", }, { KeyName: KeySnmpReaderTables, ChooseOnly: false, Default: "", Placeholder: "[{\"table_name\":\"udpLocalAddress\",\"table_oid\":\"1.3.6.1.2.1.31.1.1\"}]", DefaultNoUse: true, Description: "tables配置(snmp_tables)", ToolTip: "请填入json数组字符串", }, { KeyName: KeySnmpReaderFields, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "fields配置(snmp_fields)", ToolTip: "请填入json数组字符串", }, { KeyName: KeySnmpReaderVersion, ChooseOnly: true, ChooseOptions: []interface{}{"1", "2", "3"}, Default: "2", DefaultNoUse: true, Description: "snmp协议版本(snmp_version)", }, { KeyName: KeySnmpReaderTimeOut, ChooseOnly: false, Default: "5s", Required: true, DefaultNoUse: false, Description: "连接超时时间(snmp_time_out)", Advance: true, ToolTip: "超时时间,单位支持m(分)、s(秒)", }, { KeyName: KeySnmpReaderInterval, ChooseOnly: false, Default: "30s", Required: true, DefaultNoUse: false, Description: "收集频率(snmp_interval)", Advance: true, ToolTip: "收集频率,单位支持m(分)、s(秒)", }, { KeyName: KeySnmpReaderRetries, ChooseOnly: false, Default: "3", Required: true, DefaultNoUse: false, Description: "重试次数(snmp_retries)", Advance: true, }, { KeyName: KeySnmpReaderCommunity, ChooseOnly: false, Default: "public", Placeholder: "public", DefaultNoUse: true, Description: "community/秘钥(snmp_version)", ToolTip: "community秘钥,没有特殊设置为public[版本1/2有效]", }, { KeyName: KeySnmpReaderMaxRepetitions, ChooseOnly: false, Default: "50", DefaultNoUse: false, Description: "最大迭代次数(snmp_max_repetitions)", Advance: true, ToolTip: "版本2/3有效", }, { KeyName: KeySnmpReaderContextName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "context名称(snmp_version)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderSecLevel, ChooseOnly: true, ChooseOptions: []interface{}{"noAuthNoPriv", "authNoPriv", "authPriv"}, DefaultNoUse: true, Description: "安全等级(snmp_sec_level)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderSecName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "认证名称(snmp_sec_name)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderAuthProtocol, ChooseOnly: true, ChooseOptions: []interface{}{"md5", "sha", ""}, DefaultNoUse: false, Description: "认证协议(snmp_auth_protocol)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderAuthPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "认证密码(snmp_auth_password)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderPrivProtocol, ChooseOnly: true, ChooseOptions: []interface{}{"des", "aes", ""}, DefaultNoUse: false, Description: "隐私协议(snmp_priv_protocol)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderPrivPassword, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "隐私密码(snmp_priv_password)", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderEngineID, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "snmp_priv_engine_id", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderEngineBoots, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "snmp_engine_boots", Advance: true, ToolTip: "版本3有效", }, { KeyName: KeySnmpReaderEngineTime, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "snmp_engine_time", Advance: true, ToolTip: "版本3有效", }, }, ModeCloudTrail: { { KeyName: KeyS3Region, ChooseOnly: false, Default: "", Placeholder: "us-east-1", DefaultNoUse: false, Required: true, Description: "区域(s3_region)", ToolTip: "S3服务区域", }, { KeyName: KeyS3AccessKey, ChooseOnly: false, Default: "", Placeholder: "访问密钥", DefaultNoUse: false, Required: true, Description: "AK(s3_access_key)", ToolTip: "访问密钥ID(AK)", }, { KeyName: KeyS3SecretKey, ChooseOnly: false, Default: "", Placeholder: "访问密钥", DefaultNoUse: false, Required: true, Description: "SK(s3_secret_key)", ToolTip: "与访问密钥ID结合使用的密钥(SK)", }, { KeyName: KeyS3Bucket, ChooseOnly: false, Default: "", Placeholder: "", DefaultNoUse: false, Required: true, Description: "存储桶名称(s3_bucket)", ToolTip: "存储桶名称", }, { KeyName: KeyS3Prefix, ChooseOnly: false, Default: "", Placeholder: "", DefaultNoUse: false, Description: "文件前缀(s3_prefix)", Advance: true, ToolTip: "文件前缀", }, { KeyName: KeySyncDirectory, ChooseOnly: false, Default: "", Placeholder: "", DefaultNoUse: false, Advance: true, Description: "本地同步目录,不填自动生成(sync_directory)", ToolTip: "本地同步目录,不填自动生成", }, { KeyName: KeySyncInterval, ChooseOnly: false, Default: "5m", Placeholder: "", DefaultNoUse: false, Advance: true, Description: "文件同步间隔(sync_interval)", ToolTip: "文件同步的最小间隔(1m)", }, { KeyName: KeySyncConcurrent, ChooseOnly: false, Default: "5", Placeholder: "", DefaultNoUse: false, Advance: true, Description: "文件同步的并发个数(sync_concurrent)", ToolTip: "文件同步的最小并发个数(1)", }, OptionKeyValidFilePattern, OptionKeySkipFileFirstLine, OptionDataSourceTag, }, }
var WaitNoSuchFile = 100 * time.Millisecond
Functions ¶
func HeadPatternMode ¶
func RegisterConstructor ¶
func RegisterConstructor(typ string, c Constructor)
RegisterConstructor adds a new constructor for a given type of reader.
Types ¶
type BufReader ¶
type BufReader struct { Meta *Meta // 存放offset的元信息 // contains filtered or unexported fields }
BufReader implements buffering for an FileReader object.
func NewReaderSize ¶
func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)
NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.
func (*BufReader) FormMutiLine ¶
func (*BufReader) ReadPattern ¶
ReadPattern读取日志直到匹配行首模式串
func (*BufReader) ReadString ¶
ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.
type DaemonReader ¶
type DaemonReader interface { // Start 用于非阻塞的启动读取器对应的守护线程,需要读取器自行负责其生命周期 Start() error }
DaemonReader 代表了一个需要守护线程的读取器
type DataReader ¶
DataReader 代表了一个可直接读取内存数据结构的读取器
type FileReader ¶
type FileReader interface { Name() string Source() string Read(p []byte) (n int, err error) Close() error SyncMeta() error }
FileReader reader 接口方法
type LineSkipper ¶
type LineSkipper interface { IsNewOpen() bool SetSkipped() }
type Meta ¶
type Meta struct { Dir string // 记录文件处理进度的路径 DoneFilePath string // 记录扫描过文件记录的文件 TagFile string //记录tag文件路径的标签名称 Readlimit int //读取磁盘限速单位 MB/s RunnerName string // contains filtered or unexported fields }
func (*Meta) AppendDeleteFile ¶
func (*Meta) AppendDoneFile ¶
AppendDoneFile 将处理完的文件写入doneFile中
func (*Meta) AppendDoneFileInode ¶
AppendDoneFileInode 将处理完的文件路径、inode以及完成时间写入doneFile中
func (*Meta) CacheLineFile ¶
func (*Meta) CheckExpiredSubMetas ¶
CheckExpiredSubMetas 仅用于轮询收集所有过期的 submeta,清理操作应通过调用 CleanExpiredSubMetas 方法完成。 一般情况下,应由 reader 实现启动 goroutine 单独调用以避免 submeta 数量过多导致进程被长时间阻塞。 另外,如果 submeta 没有存放在该 meta 的子目录则调用此方法无效
func (*Meta) CleanExpiredSubMetas ¶
CleanExpiredSubMetas 清除超过指定过期时长的 submeta 目录,清理数目单次调用存在上限以减少阻塞时间
func (*Meta) DeleteDoneFile ¶
func (*Meta) FtSaveLogPath ¶
FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径
func (*Meta) GetDataSourceTag ¶
func (*Meta) GetDoneFileContent ¶
func (*Meta) GetDoneFileInode ¶
func (*Meta) GetDoneFiles ¶
func (*Meta) GetTagFile ¶
func (*Meta) IsFileMode ¶
func (*Meta) IsNotValid ¶
IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏
func (*Meta) ReadBufMeta ¶
func (*Meta) ReadCacheLine ¶
func (*Meta) ReadDBDoneFile ¶
ReadDBDoneFile 读取当前Database已经读取的表
func (*Meta) ReadOffset ¶
ReadOffset 读取当前读取的文件和offset
func (*Meta) ReadRecordsFile ¶
ReadRecordsFile 读取当前runner已经读取的表
func (*Meta) ReadStatistic ¶
func (*Meta) RemoveSubMeta ¶
func (*Meta) SetEncodingWay ¶
SetEncodingWay 设置文件编码方式,默认为 utf-8
func (*Meta) StatisticFile ¶
StatisticFile 返回 Runner 统计信息的文件路径
func (*Meta) WriteCacheLine ¶
func (*Meta) WriteOffset ¶
WriteOffset 将当前文件和offset写入meta中
func (*Meta) WriteStatistic ¶
type NewLineBytesRecorder ¶
type NewLineBytesRecorder interface {
NewLineBytesIndex() []SourceIndex
}
type Reader ¶
type Reader interface { // Name 用于返回读取器的具体名称 Name() string // SetMode 用于设置读取器的匹配模式 SetMode(mode string, v interface{}) error // Source 用于返回当前读取的数据源 Source() string // ReadLine 用于向读取器请求返回一行数据 ReadLine() (string, error) // SyncMeta 用于通知读取器保存同步相关元数据 SyncMeta() // Close 用于关闭读取器 Close() error }
Reader 代表了一个通用的行读取器
func NewFileDirReader ¶
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry reader 的工厂类。可以注册自定义reader
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) NewReaderWithMeta ¶
func (*Registry) RegisterReader ¶
func (reg *Registry) RegisterReader(readerType string, constructor Constructor) error
type SeqFile ¶
type SeqFile struct { SkipFileFirstLine bool //跳过新文件的第一行,常用于带title的csv文件,title与实际格式不同 // contains filtered or unexported fields }
SeqFile 按最终修改时间依次读取文件的Reader类型
func NewSeqFile ¶
func (*SeqFile) NewLineBytesIndex ¶
func (sf *SeqFile) NewLineBytesIndex() []SourceIndex
func (*SeqFile) SetSkipped ¶
func (sf *SeqFile) SetSkipped()
type SingleFile ¶
type SingleFile struct {
// contains filtered or unexported fields
}
func NewSingleFile ¶
func NewSingleFile(meta *Meta, path, whence string, errDirectReturn bool) (sf *SingleFile, err error)
func (*SingleFile) Close ¶
func (sf *SingleFile) Close() (err error)
func (*SingleFile) Lag ¶
func (sf *SingleFile) Lag() (rl *LagInfo, err error)
func (*SingleFile) Name ¶
func (sf *SingleFile) Name() string
func (*SingleFile) Reopen ¶
func (sf *SingleFile) Reopen() (err error)
func (*SingleFile) Source ¶
func (sf *SingleFile) Source() string
func (*SingleFile) SyncMeta ¶
func (sf *SingleFile) SyncMeta() error
type SourceIndex ¶
type Statistic ¶
type Statistic struct { ReaderCnt int64 `json:"reader_count"` // 读取总条数 ParserCnt [2]int64 `json:"parser_connt"` // [解析成功, 解析失败] SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败] ReadErrors ErrorQueue `json:"read_errors"` ParseErrors ErrorQueue `json:"parse_errors"` TransformErrors map[string]ErrorQueue `json:"transform_errors"` SendErrors map[string]ErrorQueue `json:"send_errors"` }
type StatsReader ¶
type StatsReader interface { //Name reader名称 Name() string Status() StatsInfo }
StatsReader 是一个通用的带有统计接口的reader
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package builtin does nothing but import all builtin readers to execute their init functions.
|
Package builtin does nothing but import all builtin readers to execute their init functions. |