reader

package
v1.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2017 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Overview

Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.

Index

Constants

View Source
const (
	KeyLogPath       = "log_path"
	KeyMetaPath      = "meta_path"
	KeyFileDone      = "file_done"
	KeyMode          = "mode"
	KeyBufSize       = "reader_buf_size"
	KeyWhence        = "read_from"
	KeyEncoding      = "encoding"
	KeyReadIOLimit   = "readio_limit"
	KeyDataSourceTag = "datasource_tag"
	KeyHeadPattern   = "head_pattern"
	KeyRunnerName    = "runner_name"

	// 忽略隐藏文件
	KeyIgnoreHiddenFile = "ignore_hidden"
	KeyIgnoreFileSuffix = "ignore_file_suffix"
	KeyValidFilePattern = "valid_file_pattern"

	KeyExpire       = "expire"
	KeyMaxOpenFiles = "max_open_files"
	KeyStatInterval = "stat_interval"

	KeyMysqlOffsetKey   = "mysql_offset_key"
	KeyMysqlReadBatch   = "mysql_limit_batch"
	KeyMysqlDataSource  = "mysql_datasource"
	KeyMysqlDataBase    = "mysql_database"
	KeyMysqlSQL         = "mysql_sql"
	KeyMysqlCron        = "mysql_cron"
	KeyMysqlExecOnStart = "mysql_exec_onstart"

	KeySQLSchema = "sql_schema"

	KeyMssqlOffsetKey   = "mssql_offset_key"
	KeyMssqlReadBatch   = "mssql_limit_batch"
	KeyMssqlDataSource  = "mssql_datasource"
	KeyMssqlDataBase    = "mssql_database"
	KeyMssqlSQL         = "mssql_sql"
	KeyMssqlCron        = "mssql_cron"
	KeyMssqlExecOnStart = "mssql_exec_onstart"

	KeyESReadBatch = "es_limit_batch"
	KeyESIndex     = "es_index"
	KeyESType      = "es_type"
	KeyESHost      = "es_host"
	KeyESKeepAlive = "es_keepalive"
	KeyESVersion   = "es_version"

	KeyMongoHost        = "mongo_host"
	KeyMongoDatabase    = "mongo_database"
	KeyMongoCollection  = "mongo_collection"
	KeyMongoOffsetKey   = "mongo_offset_key"
	KeyMongoReadBatch   = "mongo_limit_batch"
	KeyMongoCron        = "mongo_cron"
	KeyMongoExecOnstart = "mongo_exec_onstart"
	KeyMongoFilters     = "mongo_filters"
	KeyMongoCert        = "mongo_cacert"

	KeyKafkaGroupID          = "kafka_groupid"
	KeyKafkaTopic            = "kafka_topic"
	KeyKafkaZookeeper        = "kafka_zookeeper"
	KeyKafkaZookeeperTimeout = "kafka_zookeeper_timeout"
)

FileReader's conf keys

View Source
const (
	ModeDir     = "dir"
	ModeFile    = "file"
	ModeTailx   = "tailx"
	ModeMysql   = "mysql"
	ModeMssql   = "mssql"
	ModeElastic = "elastic"
	ModeMongo   = "mongo"
	ModeKafka   = "kafka"
	ModeRedis   = "redis"
)

FileReader's modes

View Source
const (
	ReadModeHeadPatternString = "mode_head_pattern_string"
	ReadModeHeadPatternRegexp = "mode_head_pattern_regexp"
)
View Source
const (
	WhenceOldest = "oldest"
	WhenceNewest = "newest"
)

KeyWhence 的可选项

View Source
const (
	DataTypeList          = "list"
	DataTypeChannel       = "channel"
	DataTypePatterChannel = "pattern_channel"
)
View Source
const (
	KeyRedisDataType   = "redis_datatype" // 必填
	KeyRedisDB         = "redis_db"       //默认 是0
	KeyRedisKey        = "redis_key"      //必填
	KeyRedisAddress    = "redis_address"  // 默认127.0.0.1:6379
	KeyRedisPassword   = "redis_password"
	KeyTimeoutDuration = "redis_timeout"
)
View Source
const (
	StatusInit int32 = iota
	StatusStopped
	StatusStoping
	StatusRunning
)
View Source
const DirMode = "dir"

DirMode 按时间顺序顺次读取文件夹下所有文件的模式

View Source
const FileMode = "file"

FileMode 读取单个文件模式

View Source
const (
	Loop = "loop"
)
View Source
const (
	MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
View Source
const (
	MongoDefaultOffsetKey = "_id"
)
View Source
const (
	SQL_SPLITER = ";"
)

Variables

View Source
var (
	ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte")
	ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune")
	ErrBufferFull        = errors.New("bufio: buffer full")
	ErrNegativeCount     = errors.New("bufio: negative count")
)
View Source
var (
	ElasticVersion2 = "2.x"
	ElasticVersion5 = "5.x"
)
View Source
var ErrFileNotDir = errors.New("file is not directory")
View Source
var ErrFileNotRegular = errors.New("file is not regular")
View Source
var ErrMetaFileRead = errors.New("cannot read meta file")
View Source
var ErrNoFileChosen = errors.New("no files found")
View Source
var ErrStopped = errors.New("runner stopped")
View Source
var ModeKeyOptions = map[string][]utils.Option{
	ModeDir: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/home/users/john/log/",
			DefaultNoUse: true,
			Description:  "日志文件夹路径(log_path)",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		{
			KeyName:      KeyFileDone,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "读取过的文件信息保存路径(file_done)",
		},
		{
			KeyName:      KeyBufSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "文件缓存数据大小(reader_buf_size)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyWhence,
			ChooseOnly:    true,
			ChooseOptions: []string{WhenceOldest, WhenceNewest},
			Description:   "读取的起始位置(read_from)",
		},
		{
			KeyName:    KeyEncoding,
			ChooseOnly: true,
			ChooseOptions: []string{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1",
				"GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS",
				"ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7",
				"ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13",
				"ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4",
				"macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251",
				"windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256",
				"windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995",
				"ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995",
				"ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999",

				"KOI8-R", "KOI8-U", "ebcdic-xml-us"},
			Default:      "UTF-8",
			DefaultNoUse: false,
			Description:  "编码方式(encoding)",
		},
		OptionDataSourceTag,
		{
			KeyName:      KeyReadIOLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "读取速度限制(MB/s)(readio_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyHeadPattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "多行读取的起始行正则表达式(head_pattern)",
		},
		{
			KeyName:       KeyIgnoreHiddenFile,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否忽略隐藏文件(ignore_hidden)",
		},
		{
			KeyName:      KeyIgnoreFileSuffix,
			ChooseOnly:   false,
			Default:      strings.Join(defaultIgnoreFileSuffix, ","),
			DefaultNoUse: false,
			Description:  "根据后缀忽略文件(ignore_file_suffix)",
		},
		{
			KeyName:      KeyValidFilePattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "根据正则表达式匹配文件(valid_file_pattern)",
		},
	},
	ModeFile: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/home/users/john/log/my.log",
			DefaultNoUse: true,
			Description:  "日志文件路径(log_path)",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		{
			KeyName:      KeyBufSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "文件缓存数据大小(reader_buf_size)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyWhence,
			ChooseOnly:    true,
			ChooseOptions: []string{WhenceOldest, WhenceNewest},
			Description:   "读取的起始位置(read_from)",
		},
		OptionDataSourceTag,
		{
			KeyName:    KeyEncoding,
			ChooseOnly: true,
			ChooseOptions: []string{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1",
				"GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS",
				"ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7",
				"ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13",
				"ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4",
				"macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251",
				"windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256",
				"windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995",
				"ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995",
				"ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999",

				"KOI8-R", "KOI8-U", "ebcdic-xml-us"},
			Default:      "UTF-8",
			DefaultNoUse: false,
			Description:  "编码方式(encoding)",
		},
		{
			KeyName:      KeyReadIOLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "读取速度限制(MB/s)(readio_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyHeadPattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "多行读取的起始行正则表达式(head_pattern)",
		},
	},
	ModeTailx: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/home/users/*/mylog/*.log",
			DefaultNoUse: true,
			Description:  "日志文件路径模式串(log_path)",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		{
			KeyName:      KeyBufSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "文件缓存数据大小(reader_buf_size)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyWhence,
			ChooseOnly:    true,
			ChooseOptions: []string{WhenceOldest, WhenceNewest},
			Description:   "读取的起始位置(read_from)",
		},
		{
			KeyName:    KeyEncoding,
			ChooseOnly: true,
			ChooseOptions: []string{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1",
				"GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS",
				"ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7",
				"ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13",
				"ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4",
				"macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251",
				"windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256",
				"windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995",
				"ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995",
				"ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999",

				"KOI8-R", "KOI8-U", "ebcdic-xml-us"},
			Default:      "UTF-8",
			DefaultNoUse: false,
			Description:  "编码方式(encoding)",
		},
		{
			KeyName:      KeyReadIOLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "读取速度限制(MB/s)(readio_limit)",
			CheckRegex:   "\\d+",
		},
		OptionDataSourceTag,
		{
			KeyName:      KeyHeadPattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "多行读取的起始行正则表达式(head_pattern)",
		},
		{
			KeyName:      KeyExpire,
			ChooseOnly:   false,
			Default:      "24h",
			DefaultNoUse: false,
			Description:  "文件过期时间(时h,分m,秒s)(expire)",
			CheckRegex:   "\\d+[hms]",
		},
		{
			KeyName:      KeyMaxOpenFiles,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "最大的打开文件数(max_open_files)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyStatInterval,
			ChooseOnly:   false,
			Default:      "3m",
			DefaultNoUse: false,
			Description:  "文件扫描间隔(stat_interval)",
			CheckRegex:   "\\d+[hms]",
		},
	},
	ModeMysql: {
		{
			KeyName:      KeyMysqlDataSource,
			ChooseOnly:   false,
			Default:      "<username>:<password>@tcp(<hostname>:<port>)",
			DefaultNoUse: true,
			Description:  "数据库地址(mysql_datasource)",
		},
		{
			KeyName:      KeyMysqlDataBase,
			ChooseOnly:   false,
			Default:      "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(mysql_database)",
		},
		{
			KeyName:      KeyMysqlSQL,
			ChooseOnly:   false,
			Default:      "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(mysql_sql)",
		},
		{
			KeyName:      KeyMysqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "递增的列名称(mysql_offset_key)",
		},
		{
			KeyName:      KeyMysqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mysql_limit_batch)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		OptionDataSourceTag,
		{
			KeyName:      KeyMysqlCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Cron(mysql_cron)",
		},
		{
			KeyName:       KeyMysqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mysql_exec_onstart)",
		},
		{
			KeyName:      KeySQLSchema,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "SQL字段类型定义(sql_schema)",
		},
	},
	ModeMssql: {
		{
			KeyName:      KeyMssqlDataSource,
			ChooseOnly:   false,
			Default:      "server=<hostname or instance>;user id=<username>;password=<password>;port=<port>",
			DefaultNoUse: true,
			Description:  "数据库地址(mssql_datasource)",
		},
		{
			KeyName:      KeyMssqlDataBase,
			ChooseOnly:   false,
			Default:      "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(mssql_database)",
		},
		{
			KeyName:      KeyMssqlSQL,
			ChooseOnly:   false,
			Default:      "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(mssql_sql)",
		},
		{
			KeyName:      KeyMssqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "递增的列名称(mssql_offset_key)",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		OptionDataSourceTag,
		{
			KeyName:      KeyMssqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mssql_limit_batch)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyMssqlCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Crontab(mssql_cron)",
		},
		{
			KeyName:       KeyMssqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mssql_exec_onstart)",
		},
		{
			KeyName:      KeySQLSchema,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "SQL字段类型定义(sql_schema)",
		},
	},
	ModeElastic: {
		{
			KeyName:      KeyESHost,
			ChooseOnly:   false,
			Default:      "http://localhost:9200",
			DefaultNoUse: true,
			Description:  "数据库地址(es_host)",
		},
		{
			KeyName:       KeyESVersion,
			ChooseOnly:    true,
			ChooseOptions: []string{ElasticVersion2, ElasticVersion5},
			Description:   "ES版本号(es_version)",
		},
		{
			KeyName:      KeyESIndex,
			ChooseOnly:   false,
			Default:      "app-repo-123",
			DefaultNoUse: true,
			Description:  "ES索引名称(es_index)",
		},
		{
			KeyName:      KeyESType,
			ChooseOnly:   false,
			Default:      "type_app",
			DefaultNoUse: true,
			Description:  "ES的app名称(es_type)",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		OptionDataSourceTag,
		{
			KeyName:      KeyESReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(es_limit_batch)",
		},
		{
			KeyName:      KeyESKeepAlive,
			ChooseOnly:   false,
			Default:      "1d",
			DefaultNoUse: false,
			Description:  "ES的Offset保存时间(es_keepalive)",
			CheckRegex:   "\\d+[dms]",
		},
	},
	ModeMongo: {
		{
			KeyName:      KeyMongoHost,
			ChooseOnly:   false,
			Default:      "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]",
			DefaultNoUse: true,
			Description:  "数据库地址(mongo_host)",
		},
		{
			KeyName:      KeyMongoDatabase,
			ChooseOnly:   false,
			Default:      "app123",
			DefaultNoUse: true,
			Description:  "数据库名称(mongo_database)",
		},
		{
			KeyName:      KeyMongoCollection,
			ChooseOnly:   false,
			Default:      "collection1",
			DefaultNoUse: true,
			Description:  "数据表名称(mongo_collection)",
		},
		{
			KeyName:      KeyMongoOffsetKey,
			ChooseOnly:   false,
			Default:      "_id",
			DefaultNoUse: true,
			Description:  "递增的主键(mongo_offset_key)",
		},
		{
			KeyName:      KeyMetaPath,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "断点续传元数据路径(meta_path)",
		},
		OptionDataSourceTag,
		{
			KeyName:      KeyMongoReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mongo_limit_batch)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyMongoCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Cron(mongo_cron)",
		},
		{
			KeyName:       KeyMongoExecOnstart,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mongo_exec_onstart)",
		},
		{
			KeyName:      KeyMongoFilters,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "数据过滤方式(mongo_filters)",
		},
	},
	ModeKafka: {
		{
			KeyName:      KeyKafkaGroupID,
			ChooseOnly:   false,
			Default:      "logkit1",
			DefaultNoUse: true,
			Description:  "Kafka的consumer组名称(kafka_groupid)",
		},
		{
			KeyName:      KeyKafkaTopic,
			ChooseOnly:   false,
			Default:      "test_topic1",
			DefaultNoUse: true,
			Description:  "Kafka的topic名称(kafka_topic)",
		},
		{
			KeyName:      KeyKafkaZookeeper,
			ChooseOnly:   false,
			Default:      "localhost:2181",
			DefaultNoUse: true,
			Description:  "Zookeeper地址(kafka_zookeeper)",
		},
		{
			KeyName:       KeyWhence,
			ChooseOnly:    true,
			ChooseOptions: []string{WhenceOldest, WhenceNewest},
			Description:   "读取的起始位置(read_from)",
		},
		{
			KeyName:      KeyKafkaZookeeperTimeout,
			ChooseOnly:   false,
			Default:      "1",
			DefaultNoUse: false,
			Description:  "zookeeper超时时间(秒)(kafka_zookeeper_timeout)",
		},
		OptionDataSourceTag,
	},
	ModeRedis: {
		{
			KeyName:       KeyRedisDataType,
			ChooseOnly:    true,
			ChooseOptions: []string{DataTypeList, DataTypeChannel, DataTypePatterChannel},
			Description:   "Redis的数据读取模式(redis_datatype)",
		},
		{
			KeyName:      KeyRedisDB,
			ChooseOnly:   false,
			Default:      "0",
			DefaultNoUse: true,
			Description:  "数据库名称(redis_db)",
		},
		{
			KeyName:      KeyRedisKey,
			ChooseOnly:   false,
			Default:      "key1",
			DefaultNoUse: true,
			Description:  "redis键(redis_key)",
		},
		{
			KeyName:      KeyRedisAddress,
			ChooseOnly:   false,
			Default:      "127.0.0.1:6379",
			DefaultNoUse: false,
			Description:  "数据库地址(redis_address)",
		},
		{
			KeyName:      KeyRedisPassword,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "密码(redis_password)",
		},
		{
			KeyName:      KeyTimeoutDuration,
			ChooseOnly:   false,
			Default:      "5s",
			DefaultNoUse: false,
			Description:  "单次读取超时时间(m(分)、s(秒))(redis_timeout)",
			CheckRegex:   "\\d+[ms]",
		},
		OptionDataSourceTag,
	},
}
View Source
var ModeUsages = []utils.KeyValue{
	{ModeDir, "从文件读取( dir 模式)"},
	{ModeFile, "从文件读取( file 模式)"},
	{ModeTailx, "从文件读取( tailx 模式)"},
	{ModeMysql, "从 MySQL 读取"},
	{ModeMssql, "从 MSSQL 读取"},
	{ModeElastic, "从 Elasticsearch 读取"},
	{ModeMongo, "从 MongoDB 读取"},
	{ModeKafka, "从 kafka 读取"},
	{ModeRedis, "从 redis 读取"},
}

ModeUsages 用途说明

View Source
var (
	OptionDataSourceTag = utils.Option{
		KeyName:      KeyDataSourceTag,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "具体的数据文件路径来源标签(datasource_tag)",
	}
)
View Source
var WaitNoSuchFile = time.Second

Functions

func HeadPatternMode

func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)

Types

type ActiveReader

type ActiveReader struct {
	// contains filtered or unexported fields
}

func NewActiveReader

func NewActiveReader(logPath, whence string, meta *Meta, msgChan chan<- Result) (ar *ActiveReader, err error)

func (*ActiveReader) Close

func (ar *ActiveReader) Close() error

func (*ActiveReader) Run

func (ar *ActiveReader) Run()

func (*ActiveReader) Status added in v1.3.1

func (ar *ActiveReader) Status() utils.StatsInfo

func (*ActiveReader) SyncMeta

func (ar *ActiveReader) SyncMeta() string

除了sync自己的bufreader,还要sync一行linecache

type BufReader

type BufReader struct {
	// contains filtered or unexported fields
}

BufReader implements buffering for an FileReader object.

func NewReaderSize

func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)

NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.

func (*BufReader) Close

func (b *BufReader) Close() error

func (*BufReader) Name

func (b *BufReader) Name() string

func (*BufReader) ReadLine

func (b *BufReader) ReadLine() (ret string, err error)

ReadLine returns a string line as a normal Reader

func (*BufReader) ReadPattern

func (b *BufReader) ReadPattern() (string, error)

ReadPattern读取日志直到匹配行首模式串

func (*BufReader) ReadString

func (b *BufReader) ReadString(delim byte) (ret string, err error)

ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.

func (*BufReader) SetMode

func (b *BufReader) SetMode(mode string, v interface{}) (err error)

func (*BufReader) Source

func (b *BufReader) Source() string

func (*BufReader) Status added in v1.3.1

func (b *BufReader) Status() utils.StatsInfo

func (*BufReader) SyncMeta

func (b *BufReader) SyncMeta()

type CollectionFilter

type CollectionFilter map[string]interface{}

CollectionFilter is just a typed map of strings of map[string]interface{}

type ElasticReader

type ElasticReader struct {
	// contains filtered or unexported fields
}

func NewESReader

func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, esVersion, keepAlive string) (er *ElasticReader, err error)

func (*ElasticReader) Close

func (er *ElasticReader) Close() (err error)

func (*ElasticReader) Name

func (er *ElasticReader) Name() string

func (*ElasticReader) ReadLine

func (er *ElasticReader) ReadLine() (data string, err error)

func (*ElasticReader) SetMode

func (er *ElasticReader) SetMode(mode string, v interface{}) error

func (*ElasticReader) Source

func (er *ElasticReader) Source() string

func (*ElasticReader) Start

func (er *ElasticReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*ElasticReader) Status added in v1.3.1

func (er *ElasticReader) Status() utils.StatsInfo

func (*ElasticReader) SyncMeta

func (er *ElasticReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type FileReader

type FileReader interface {
	Name() string
	Source() string
	Read(p []byte) (n int, err error)
	Close() error
	SyncMeta() error
}

FileReader reader 接口方法

type KafkaReader

type KafkaReader struct {
	ConsumerGroup    string
	Topics           []string
	ZookeeperPeers   []string
	ZookeeperChroot  string
	ZookeeperTimeout time.Duration
	Whence           string

	Consumer *consumergroup.ConsumerGroup
	// contains filtered or unexported fields
}

func NewKafkaReader

func NewKafkaReader(meta *Meta, consumerGroup string,
	topics []string, zookeeper []string, zookeeperTimeout time.Duration, whence string) (kr *KafkaReader, err error)

func (*KafkaReader) Close

func (kr *KafkaReader) Close() (err error)

func (*KafkaReader) Name

func (kr *KafkaReader) Name() string

func (*KafkaReader) ReadLine

func (kr *KafkaReader) ReadLine() (data string, err error)

func (*KafkaReader) SetMode

func (kr *KafkaReader) SetMode(mode string, v interface{}) error

func (*KafkaReader) Source

func (kr *KafkaReader) Source() string

func (*KafkaReader) Start

func (kr *KafkaReader) Start()

func (*KafkaReader) Status added in v1.3.1

func (kr *KafkaReader) Status() utils.StatsInfo

func (*KafkaReader) SyncMeta

func (kr *KafkaReader) SyncMeta()

type LastSync

type LastSync struct {
	// contains filtered or unexported fields
}

type Meta

type Meta struct {
	RunnerName string
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(metadir, filedonedir, logpath, mode string, donefileRetention int) (m *Meta, err error)

func NewMetaWithConf

func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error)

func (*Meta) AppendDeleteFile

func (m *Meta) AppendDeleteFile(path string) (err error)

func (*Meta) AppendDoneFile

func (m *Meta) AppendDoneFile(path string) (err error)

AppendDoneFile 将处理完的文件写入doneFile中

func (*Meta) BufFile

func (m *Meta) BufFile() string

BufFile 返回buf的文件路径

func (*Meta) BufMetaFile

func (m *Meta) BufMetaFile() string

BufMetaFile 返回buf的meta文件路径

func (*Meta) CacheLineFile

func (m *Meta) CacheLineFile() string

func (*Meta) Clear

func (m *Meta) Clear() error

Clear 删除所有meta信息

func (*Meta) DeleteDoneFile

func (m *Meta) DeleteDoneFile(path string) error

func (*Meta) DeleteFile

func (m *Meta) DeleteFile() string

DeleteFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFile

func (m *Meta) DoneFile() string

DoneFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFilePath

func (m *Meta) DoneFilePath() string

DoneFilePath 返回meta的filedone文件的存放目录

func (*Meta) FtSaveLogPath added in v1.3.5

func (m *Meta) FtSaveLogPath() string

FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径

func (*Meta) GetDataSourceTag

func (m *Meta) GetDataSourceTag() string

func (*Meta) GetDoneFiles

func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)

func (*Meta) GetEncodingWay

func (m *Meta) GetEncodingWay() (e string)

GetEncodingWay 获取文件编码方式

func (*Meta) GetMode

func (m *Meta) GetMode() string

func (*Meta) IsDoneFile

func (m *Meta) IsDoneFile(file string) bool

IsDoneFile 返回是否是Donefile格式的文件

func (*Meta) IsExist

func (m *Meta) IsExist() bool

func (*Meta) IsFileMode added in v1.3.1

func (m *Meta) IsFileMode() bool

func (*Meta) IsNotExist

func (m *Meta) IsNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsNotValid

func (m *Meta) IsNotValid() bool

IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏

func (*Meta) IsValid

func (m *Meta) IsValid() bool

func (*Meta) LogPath

func (m *Meta) LogPath() string

func (*Meta) MetaFile

func (m *Meta) MetaFile() string

MetaFile 返回metaFileoffset 的meta文件地址

func (*Meta) ReadBuf

func (m *Meta) ReadBuf(buf []byte) (n int, err error)

func (*Meta) ReadBufMeta

func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)

func (*Meta) ReadCacheLine

func (m *Meta) ReadCacheLine() ([]byte, error)

func (*Meta) ReadOffset

func (m *Meta) ReadOffset() (currFile string, offset int64, err error)

ReadOffset 读取当前读取的文件和offset

func (*Meta) ReadStatistic added in v1.3.5

func (m *Meta) ReadStatistic() (stat Statistic, err error)

func (*Meta) Reset added in v1.2.4

func (b *Meta) Reset() error

func (*Meta) SetEncodingWay

func (m *Meta) SetEncodingWay(e string)

SetEncodingWay 设置文件编码方式,默认为 utf-8

func (*Meta) StatisticFile added in v1.3.5

func (m *Meta) StatisticFile() string

StatisticFile 返回 Runner 统计信息的文件路径

func (*Meta) WriteBuf

func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)

func (*Meta) WriteCacheLine

func (m *Meta) WriteCacheLine(lines string) error

func (*Meta) WriteOffset

func (m *Meta) WriteOffset(currFile string, offset int64) (err error)

WriteOffset 将当前文件和offset写入meta中

func (*Meta) WriteStatistic added in v1.3.5

func (m *Meta) WriteStatistic(stat *Statistic) error

type MongoReader

type MongoReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewMongoReader

func NewMongoReader(meta *Meta, readBatch int, host, database, collection, offsetkey, cronSched, filters, certfile string, execOnStart bool) (mr *MongoReader, err error)

func (*MongoReader) Close

func (mr *MongoReader) Close() (err error)

func (*MongoReader) LoopRun added in v1.3.3

func (mr *MongoReader) LoopRun()

func (*MongoReader) Name

func (mr *MongoReader) Name() string

func (*MongoReader) ReadLine

func (mr *MongoReader) ReadLine() (data string, err error)

func (*MongoReader) SetMode

func (mr *MongoReader) SetMode(mode string, v interface{}) error

func (*MongoReader) Source

func (mr *MongoReader) Source() string

func (*MongoReader) Start

func (mr *MongoReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*MongoReader) Status added in v1.3.1

func (mr *MongoReader) Status() utils.StatsInfo

func (*MongoReader) SyncMeta

func (mr *MongoReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type MultiReader

type MultiReader struct {
	// contains filtered or unexported fields
}

func NewMultiReader

func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)

func (*MultiReader) Close

func (mr *MultiReader) Close() (err error)

func (*MultiReader) Expire

func (mr *MultiReader) Expire()

Expire 函数关闭过期的文件,再更新

func (*MultiReader) Name

func (mr *MultiReader) Name() string

func (*MultiReader) ReadLine

func (mr *MultiReader) ReadLine() (data string, err error)

func (*MultiReader) SetMode

func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)

func (*MultiReader) Source

func (mr *MultiReader) Source() string

func (*MultiReader) Start

func (mr *MultiReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务

func (*MultiReader) StatLogPath

func (mr *MultiReader) StatLogPath()

func (*MultiReader) Status added in v1.3.1

func (mr *MultiReader) Status() utils.StatsInfo

func (*MultiReader) SyncMeta

func (mr *MultiReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Reader

type Reader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	ReadLine() (string, error)
	SetMode(mode string, v interface{}) error
	Close() error
	SyncMeta()
}

Reader 是一个通用的行读取reader接口

func NewFileBufReader

func NewFileBufReader(conf conf.MapConf, isFromWeb bool) (reader Reader, err error)

NewFileReader 创建FileReader

func NewFileBufReaderWithMeta

func NewFileBufReaderWithMeta(conf conf.MapConf, meta *Meta, isFromWeb bool) (reader Reader, err error)

type RedisOptionn added in v1.2.1

type RedisOptionn struct {
	// contains filtered or unexported fields
}

type RedisReader added in v1.2.1

type RedisReader struct {
	// contains filtered or unexported fields
}

func NewRedisReader added in v1.2.1

func NewRedisReader(meta *Meta, conf conf.MapConf) (rr *RedisReader, err error)

func (*RedisReader) Close added in v1.2.1

func (rr *RedisReader) Close() (err error)

func (*RedisReader) Name added in v1.2.1

func (rr *RedisReader) Name() string

func (*RedisReader) ReadLine added in v1.2.1

func (rr *RedisReader) ReadLine() (data string, err error)

func (*RedisReader) SetMode added in v1.2.1

func (rr *RedisReader) SetMode(mode string, v interface{}) error

func (*RedisReader) Source added in v1.2.1

func (rr *RedisReader) Source() string

func (*RedisReader) Start added in v1.2.1

func (rr *RedisReader) Start()

func (*RedisReader) Status added in v1.3.1

func (rr *RedisReader) Status() utils.StatsInfo

func (*RedisReader) SyncMeta added in v1.2.1

func (rr *RedisReader) SyncMeta()

type Result added in v1.2.2

type Result struct {
	// contains filtered or unexported fields
}

type SeqFile

type SeqFile struct {
	// contains filtered or unexported fields
}

SeqFile 按最终修改时间依次读取文件的Reader类型

func NewSeqFile

func NewSeqFile(meta *Meta, path string, ignoreHidden bool, suffixes []string, validFileRegex, whence string) (sf *SeqFile, err error)

func (*SeqFile) Close

func (sf *SeqFile) Close() (err error)

func (*SeqFile) Name

func (sf *SeqFile) Name() string

func (*SeqFile) Read

func (sf *SeqFile) Read(p []byte) (n int, err error)

func (*SeqFile) Source

func (sf *SeqFile) Source() string

func (*SeqFile) SyncMeta

func (sf *SeqFile) SyncMeta() (err error)

type ServerReader added in v1.2.1

type ServerReader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	Start()
	ReadLine() (string, error)
	Close() error
	SyncMeta()
}

TODO 构建统一的 Server reader框架, 减少重复的编码

type SingleFile

type SingleFile struct {
	// contains filtered or unexported fields
}

func NewSingleFile

func NewSingleFile(meta *Meta, path, whence string, isFromWeb bool) (sf *SingleFile, err error)

func (*SingleFile) Close

func (sf *SingleFile) Close() (err error)

func (*SingleFile) Name

func (sf *SingleFile) Name() string

func (*SingleFile) Read

func (sf *SingleFile) Read(p []byte) (n int, err error)

func (*SingleFile) Reopen

func (sf *SingleFile) Reopen() (err error)

func (*SingleFile) Source

func (sf *SingleFile) Source() string

func (*SingleFile) SyncMeta

func (sf *SingleFile) SyncMeta() error

type SqlReader

type SqlReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewSQLReader

func NewSQLReader(meta *Meta, conf conf.MapConf) (mr *SqlReader, err error)

func (*SqlReader) Close

func (mr *SqlReader) Close() (err error)

func (*SqlReader) LoopRun added in v1.3.3

func (mr *SqlReader) LoopRun()

func (*SqlReader) Name

func (mr *SqlReader) Name() string

func (*SqlReader) ReadLine

func (mr *SqlReader) ReadLine() (data string, err error)

func (*SqlReader) SetMode

func (mr *SqlReader) SetMode(mode string, v interface{}) error

func (*SqlReader) Source

func (mr *SqlReader) Source() string

func (*SqlReader) Start

func (mr *SqlReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*SqlReader) Status added in v1.3.1

func (mr *SqlReader) Status() utils.StatsInfo

func (*SqlReader) SyncMeta

func (mr *SqlReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Statistic added in v1.3.5

type Statistic struct {
	ReaderCnt int64               `json:"reader_count"` // 读取总条数
	ParserCnt [2]int64            `json:"parser_connt"` // [解析成功, 解析失败]
	SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败]
}

type StatsReader added in v1.3.1

type StatsReader interface {
	//Name reader名称
	Name() string
	Status() utils.StatsInfo
}

StatsReader 是一个通用的带有统计接口的reader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL