reader

package
v1.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2018 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Overview

Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.

Index

Constants

View Source
const (
	KeyHttpServiceAddress = "http_service_address"
	KeyHttpServicePath    = "http_service_path"

	DefaultHttpServiceAddress = ":4000"
	DefaultHttpServicePath    = "/logkit/data"

	DefaultSyncEvery       = 10
	DefaultMaxBodySize     = 100 * 1024 * 1024
	DefaultMaxBytesPerFile = 500 * 1024 * 1024
	DefaultWriteSpeedLimit = 10 * 1024 * 1024 // 默认写速限制为10MB
)
View Source
const (
	KeyLogPath        = "log_path"
	KeyMetaPath       = "meta_path"
	KeyFileDone       = "file_done"
	KeyMode           = "mode"
	KeyBufSize        = "reader_buf_size"
	KeyWhence         = "read_from"
	KeyEncoding       = "encoding"
	KeyReadIOLimit    = "readio_limit"
	KeyDataSourceTag  = "datasource_tag"
	KeyTagFile        = "tag_file"
	KeyHeadPattern    = "head_pattern"
	KeyRunnerName     = "runner_name"
	KeyNewFileNewLine = "newfile_newline"

	// 忽略隐藏文件
	KeyIgnoreHiddenFile = "ignore_hidden"
	KeyIgnoreFileSuffix = "ignore_file_suffix"
	KeyValidFilePattern = "valid_file_pattern"

	KeyExpire       = "expire"
	KeyMaxOpenFiles = "max_open_files"
	KeyStatInterval = "stat_interval"

	KeyMysqlOffsetKey   = "mysql_offset_key"
	KeyMysqlReadBatch   = "mysql_limit_batch"
	KeyMysqlDataSource  = "mysql_datasource"
	KeyMysqlDataBase    = "mysql_database"
	KeyMysqlSQL         = "mysql_sql"
	KeyMysqlCron        = "mysql_cron"
	KeyMysqlExecOnStart = "mysql_exec_onstart"

	KeySQLSchema = "sql_schema"

	KeyMssqlOffsetKey   = "mssql_offset_key"
	KeyMssqlReadBatch   = "mssql_limit_batch"
	KeyMssqlDataSource  = "mssql_datasource"
	KeyMssqlDataBase    = "mssql_database"
	KeyMssqlSQL         = "mssql_sql"
	KeyMssqlCron        = "mssql_cron"
	KeyMssqlExecOnStart = "mssql_exec_onstart"

	KeyPGsqlOffsetKey   = "postgres_offset_key"
	KeyPGsqlReadBatch   = "postgres_limit_batch"
	KeyPGsqlDataSource  = "postgres_datasource"
	KeyPGsqlDataBase    = "postgres_database"
	KeyPGsqlSQL         = "postgres_sql"
	KeyPGsqlCron        = "postgres_cron"
	KeyPGsqlExecOnStart = "postgres_exec_onstart"

	KeyESReadBatch = "es_limit_batch"
	KeyESIndex     = "es_index"
	KeyESType      = "es_type"
	KeyESHost      = "es_host"
	KeyESKeepAlive = "es_keepalive"
	KeyESVersion   = "es_version"

	KeyMongoHost        = "mongo_host"
	KeyMongoDatabase    = "mongo_database"
	KeyMongoCollection  = "mongo_collection"
	KeyMongoOffsetKey   = "mongo_offset_key"
	KeyMongoReadBatch   = "mongo_limit_batch"
	KeyMongoCron        = "mongo_cron"
	KeyMongoExecOnstart = "mongo_exec_onstart"
	KeyMongoFilters     = "mongo_filters"
	KeyMongoCert        = "mongo_cacert"

	KeyKafkaGroupID          = "kafka_groupid"
	KeyKafkaTopic            = "kafka_topic"
	KeyKafkaZookeeper        = "kafka_zookeeper"
	KeyKafkaZookeeperChroot  = "kafka_zookeeper_chroot"
	KeyKafkaZookeeperTimeout = "kafka_zookeeper_timeout"

	KeyExecInterpreter   = "script_exec_interprepter"
	KeyScriptCron        = "script_cron"
	KeyScriptExecOnStart = "script_exec_onstart"
)

FileReader's conf keys

View Source
const (
	ModeDir      = "dir"
	ModeFile     = "file"
	ModeTailx    = "tailx"
	ModeFileAuto = "fileauto"
	ModeMysql    = "mysql"
	ModeMssql    = "mssql"
	ModePG       = "postgres"
	ModeElastic  = "elastic"
	ModeMongo    = "mongo"
	ModeKafka    = "kafka"
	ModeRedis    = "redis"
	ModeSocket   = "socket"
	ModeHttp     = "http"
	ModeScript   = "script"
	ModeSnmp     = "snmp"
)

FileReader's modes

View Source
const (
	ReadModeHeadPatternString = "mode_head_pattern_string"
	ReadModeHeadPatternRegexp = "mode_head_pattern_regexp"
)
View Source
const (
	WhenceOldest = "oldest"
	WhenceNewest = "newest"
)

KeyWhence 的可选项

View Source
const (
	DateTypeHash          = "hash"
	DateTypeSortedSet     = "sortedSet"
	DataTypeSet           = "set"
	DataTypeString        = "string"
	DataTypeList          = "list"
	DataTypeChannel       = "channel"
	DataTypePatterChannel = "pattern_channel"
)
View Source
const (
	KeyRedisDataType   = "redis_datatype" // 必填
	KeyRedisDB         = "redis_db"       //默认 是0
	KeyRedisKey        = "redis_key"      //必填
	KeyRedisHashArea   = "redisHash_area"
	KeyRedisAddress    = "redis_address" // 默认127.0.0.1:6379
	KeyRedisPassword   = "redis_password"
	KeyTimeoutDuration = "redis_timeout"
)
View Source
const (
	KeySnmpReaderAgents    = "snmp_agents"
	KeySnmpReaderTimeOut   = "snmp_time_out"
	KeySnmpReaderInterval  = "snmp_interval"
	KeySnmpReaderRetries   = "snmp_retries"
	KeySnmpReaderVersion   = "snmp_version"
	KeySnmpReaderCommunity = "snmp_community"

	KeySnmpReaderMaxRepetitions = "snmp_max_repetitions"

	KeySnmpReaderContextName  = "snmp_context_name"
	KeySnmpReaderSecLevel     = "snmp_sec_level"
	KeySnmpReaderSecName      = "snmp_sec_name"
	KeySnmpReaderAuthProtocol = "snmp_auth_protocol"
	KeySnmpReaderAuthPassword = "snmp_auth_password"
	KeySnmpReaderPrivProtocol = "snmp_priv_protocol"
	KeySnmpReaderPrivPassword = "snmp_priv_password"
	KeySnmpReaderEngineID     = "snmp_engine_id"
	KeySnmpReaderEngineBoots  = "snmp_engine_boots"
	KeySnmpReaderEngineTime   = "snmp_engine_time"
	KeySnmpReaderTables       = "snmp_tables"
	KeySnmpReaderName         = "snmp_reader_name"
	KeySnmpReaderFields       = "snmp_fields"

	KeySnmpTableName = "snmp_table"
	KeyTimestamp     = "timestamp"
)
View Source
const (

	// 监听的url形式包括:
	// socket_service_address = "tcp://:3110"
	// socket_service_address = "tcp://127.0.0.1:http"
	// socket_service_address = "tcp4://:3110"
	// socket_service_address = "tcp6://:3110"
	// socket_service_address = "tcp6://[2001:db8::1]:3110"
	// socket_service_address = "udp://:3110"
	// socket_service_address = "udp4://:3110"
	// socket_service_address = "udp6://:3110"
	// socket_service_address = "unix:///tmp/sys.sock"
	// socket_service_address = "unixgram:///tmp/sys.sock"
	KeySocketServiceAddress = "socket_service_address"

	// 最大并发连接数
	// 仅用于 stream sockets (e.g. TCP).
	// 0 (default) 为无限制.
	// socket_max_connections = 1024
	KeySocketMaxConnections = "socket_max_connections"

	// 读的超时时间
	// 仅用于 stream sockets (e.g. TCP).
	// 0 (default) 为没有超时
	// socket_read_timeout = "30s"
	KeySocketReadTimeout = "socket_read_timeout"

	// Socket的Buffer大小,默认65535
	// socket_read_buffer_size = 65535
	KeySocketReadBufferSize = "socket_read_buffer_size"

	// TCP连接的keep_alive时长
	// 0 表示关闭keep_alive
	// 默认5分钟
	KeySocketKeepAlivePeriod = "socket_keep_alive_period"
)
View Source
const (
	StatusInit int32 = iota
	StatusStopped
	StatusStopping
	StatusRunning
)
View Source
const DirMode = "dir"

DirMode 按时间顺序顺次读取文件夹下所有文件的模式

View Source
const FileMode = "file"

FileMode 读取单个文件模式

View Source
const (
	Loop = "loop"
)
View Source
const (
	MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
View Source
const (
	ModeMetrics = "metrics"
)
View Source
const (
	MongoDefaultOffsetKey = "_id"
)
View Source
const (
	SQL_SPLITER = ";"
)

Variables

View Source
var (
	ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte")
	ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune")
	ErrBufferFull        = errors.New("bufio: buffer full")
	ErrNegativeCount     = errors.New("bufio: negative count")
)
View Source
var (
	ElasticVersion3 = "3.x"
	ElasticVersion5 = "5.x"
	ElasticVersion6 = "6.x"
)
View Source
var (
	OptionMetaPath = Option{
		KeyName:      KeyMetaPath,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "数据保存路径(meta_path)",
		Advance:      true,
	}
	OptionDataSourceTag = Option{
		KeyName:      KeyDataSourceTag,
		ChooseOnly:   false,
		Default:      "datasource",
		DefaultNoUse: false,
		Description:  "来源标签(datasource_tag)",
		Advance:      true,
	}
	OptionBuffSize = Option{
		KeyName:      KeyBufSize,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "数据缓存大小(reader_buf_size)",
		CheckRegex:   "\\d+",
		Advance:      true,
	}
	OptionEncoding = Option{
		KeyName:    KeyEncoding,
		ChooseOnly: true,
		ChooseOptions: []interface{}{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1",
			"GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS",
			"ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7",
			"ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13",
			"ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4",
			"macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251",
			"windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256",
			"windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995",
			"ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995",
			"ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999",

			"KOI8-R", "KOI8-U", "ebcdic-xml-us"},
		Default:      "UTF-8",
		DefaultNoUse: false,
		Description:  "编码方式(encoding)",
		Advance:      true,
	}
	OptionWhence = Option{
		KeyName:       KeyWhence,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{WhenceOldest, WhenceNewest},
		Default:       WhenceOldest,
		Description:   "读取起始位置(read_from)",
	}
	OptionReadIoLimit = Option{
		KeyName:      KeyReadIOLimit,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "读取速度限制(MB/s)(readio_limit)",
		CheckRegex:   "\\d+",
		Advance:      true,
	}
	OptionHeadPattern = Option{
		KeyName:      KeyHeadPattern,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "按正则表达式规则换行(head_pattern)",
		Advance:      true,
	}
	OptionSQLSchema = Option{
		KeyName:      KeySQLSchema,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "手动定义SQL字段类型(sql_schema)",
		Advance:      true,
	}
	OptionKeyNewFileNewLine = Option{
		KeyName:       KeyNewFileNewLine,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{"false", "true"},
		Default:       "false",
		DefaultNoUse:  false,
		Description:   "文件末尾添加换行符(newfile_newline)",
		Advance:       true,
	}
	OptionKeyValidFilePattern = Option{
		KeyName:      KeyValidFilePattern,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "以linux通配符匹配文件(valid_file_pattern)",
		Advance:      true,
	}
)
View Source
var ErrFileNotDir = errors.New("file is not directory")
View Source
var ErrFileNotRegular = errors.New("file is not regular")
View Source
var ErrMetaFileRead = errors.New("cannot read meta file")
View Source
var ErrNoFileChosen = errors.New("no files found")
View Source
var ErrStopped = errors.New("runner stopped")
View Source
var ModeKeyOptions = map[string][]Option{
	ModeDir: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "/home/users/john/log/",
			Required:     true,
			DefaultNoUse: true,
			Description:  "日志文件夹路径(log_path)",
		},
		OptionMetaPath,
		OptionBuffSize,
		OptionWhence,
		OptionEncoding,
		OptionDataSourceTag,
		OptionReadIoLimit,
		OptionHeadPattern,
		OptionKeyNewFileNewLine,
		{
			KeyName:       KeyIgnoreHiddenFile,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否忽略隐藏文件(ignore_hidden)",
			Advance:       true,
		},
		{
			KeyName:      KeyIgnoreFileSuffix,
			ChooseOnly:   false,
			Default:      strings.Join(defaultIgnoreFileSuffix, ","),
			DefaultNoUse: false,
			Description:  "忽略此类后缀文件(ignore_file_suffix)",
			Advance:      true,
		},
		OptionKeyValidFilePattern,
	},
	ModeFile: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "/home/users/john/log/my.log",
			DefaultNoUse: true,
			Description:  "日志文件路径(log_path)",
		},
		OptionMetaPath,
		OptionBuffSize,
		OptionWhence,
		OptionDataSourceTag,
		OptionEncoding,
		OptionReadIoLimit,
		OptionHeadPattern,
	},
	ModeTailx: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "/home/users/*/mylog/*.log",
			DefaultNoUse: true,
			Description:  "日志文件路径模式串(log_path)",
		},
		OptionMetaPath,
		OptionBuffSize,
		OptionWhence,
		OptionEncoding,
		OptionReadIoLimit,
		OptionDataSourceTag,
		OptionHeadPattern,
		{
			KeyName:      KeyExpire,
			ChooseOnly:   false,
			Default:      "24h",
			DefaultNoUse: false,
			Required:     true,
			Description:  "忽略文件的最大过期时间[时h,分m,秒s](expire)",
			CheckRegex:   "\\d+[hms]",
		},
		{
			KeyName:      KeyMaxOpenFiles,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "最大打开文件数(max_open_files)",
			CheckRegex:   "\\d+",
			Advance:      true,
		},
		{
			KeyName:      KeyStatInterval,
			ChooseOnly:   false,
			Default:      "3m",
			DefaultNoUse: false,
			Description:  "扫描间隔(stat_interval)",
			CheckRegex:   "\\d+[hms]",
			Advance:      true,
		},
	},
	ModeFileAuto: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "/your/log/dir/or/path*.log",
			DefaultNoUse: true,
			Description:  "日志路径(log_path)",
		},
		OptionMetaPath,
		OptionWhence,
		OptionEncoding,
		OptionDataSourceTag,
		OptionKeyNewFileNewLine,
		OptionHeadPattern,
	},
	ModeMysql: {
		{
			KeyName:      KeyMysqlDataSource,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "<username>:<password>@tcp(<hostname>:<port>)",
			DefaultNoUse: true,
			Description:  "数据库地址(mysql_datasource)",
		},
		{
			KeyName:      KeyMysqlDataBase,
			ChooseOnly:   false,
			Placeholder:  "<database>",
			DefaultNoUse: true,
			Default:      "",
			Required:     true,
			Description:  "数据库名称(mysql_database)",
		},
		{
			KeyName:      KeyMysqlSQL,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(mysql_sql)",
		},
		{
			KeyName:      KeyMysqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "递增的列名称(mysql_offset_key)",
			Advance:      true,
		},
		{
			KeyName:      KeyMysqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mysql_limit_batch)",
			CheckRegex:   "\\d+",
			Advance:      true,
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyMysqlCron,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "00 00 04 * * *",
			DefaultNoUse: false,
			Description:  "定时任务",
			Advance:      true,
		},
		{
			KeyName:       KeyMysqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mysql_exec_onstart)",
		},
		OptionSQLSchema,
	},
	ModeMssql: {
		{
			KeyName:      KeyMssqlDataSource,
			ChooseOnly:   false,
			Placeholder:  "server=<hostname or instance>;user id=<username>;password=<password>;port=<port>",
			DefaultNoUse: true,
			Default:      "",
			Required:     true,
			Description:  "数据库地址(mssql_datasource)",
		},
		{
			KeyName:      KeyMssqlDataBase,
			Default:      "",
			Required:     true,
			ChooseOnly:   false,
			Placeholder:  "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(mssql_database)",
		},
		{
			KeyName:      KeyMssqlSQL,
			Default:      "",
			Required:     true,
			ChooseOnly:   false,
			Placeholder:  "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(mssql_sql)",
		},
		{
			KeyName:      KeyMssqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "递增的列名称(mssql_offset_key)",
			Advance:      true,
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyMssqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mssql_limit_batch)",
			CheckRegex:   "\\d+",
			Advance:      true,
		},
		{
			KeyName:      KeyMssqlCron,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "00 00 04 * * *",
			DefaultNoUse: false,
			Description:  "定时任务(mssql_cron)",
			Advance:      true,
		},
		{
			KeyName:       KeyMssqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mssql_exec_onstart)",
		},
		OptionSQLSchema,
	},
	ModePG: {
		{
			KeyName:      KeyPGsqlDataSource,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "host=localhost port=5432 connect_timeout=10 user=pqgotest password=123456 sslmode=disable",
			DefaultNoUse: true,
			Description:  "数据库地址(postgres_datasource)",
		},
		{
			KeyName:      KeyPGsqlDataBase,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(postgres_database)",
		},
		{
			KeyName:      KeyPGsqlSQL,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(postgres_sql)",
		},
		{
			KeyName:      KeyPGsqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "递增的列名称(postgres_offset_key)",
			Advance:      true,
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyPGsqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(postgres_limit_batch)",
			CheckRegex:   "\\d+",
			Advance:      true,
		},
		{
			KeyName:      KeyPGsqlCron,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "00 00 04 * * *",
			DefaultNoUse: false,
			Description:  "定时任务(postgres_cron)",
			Advance:      true,
		},
		{
			KeyName:       KeyPGsqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(postgres_exec_onstart)",
		},
		OptionSQLSchema,
	},
	ModeElastic: {
		{
			KeyName:      KeyESHost,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Required:     true,
			Placeholder:  "http://localhost:9200",
			Description:  "数据库地址(es_host)",
		},
		{
			KeyName:       KeyESVersion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
			Description:   "版本(es_version)",
		},
		{
			KeyName:      KeyESIndex,
			ChooseOnly:   false,
			Placeholder:  "app-repo-123",
			DefaultNoUse: true,
			Default:      "",
			Required:     true,
			Description:  "索引名称(es_index)",
		},
		{
			KeyName:      KeyESType,
			ChooseOnly:   false,
			Placeholder:  "type_app",
			Default:      "",
			Required:     true,
			DefaultNoUse: true,
			Description:  "app名称(es_type)",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyESReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			Required:     true,
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(es_limit_batch)",
			Advance:      true,
		},
		{
			KeyName:      KeyESKeepAlive,
			ChooseOnly:   false,
			Default:      "1d",
			Required:     true,
			DefaultNoUse: false,
			Description:  "offset保存时间(es_keepalive)",
			CheckRegex:   "\\d+[dms]",
			Advance:      true,
		},
	},
	ModeMongo: {
		{
			KeyName:      KeyMongoHost,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]",
			DefaultNoUse: true,
			Description:  "数据库地址(mongo_host)",
		},
		{
			KeyName:      KeyMongoDatabase,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "app123",
			DefaultNoUse: true,
			Description:  "数据库名称(mongo_database)",
		},
		{
			KeyName:      KeyMongoCollection,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "collection1",
			DefaultNoUse: true,
			Description:  "数据表名称(mongo_collection)",
		},
		{
			KeyName:      KeyMongoOffsetKey,
			ChooseOnly:   false,
			Default:      "_id",
			Required:     true,
			DefaultNoUse: false,
			Description:  "递增的主键(mongo_offset_key)",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyMongoReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mongo_limit_batch)",
			CheckRegex:   "\\d+",
			Advance:      true,
		},
		{
			KeyName:      KeyMongoCron,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "00 00 04 * * *",
			DefaultNoUse: false,
			Description:  "定时任务(mongo_cron)",
			Advance:      true,
		},
		{
			KeyName:       KeyMongoExecOnstart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mongo_exec_onstart)",
		},
		{
			KeyName:      KeyMongoFilters,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Placeholder:  "{\"foo\": {\"i\": {\"$gt\": 10}}}",
			Description:  "数据过滤方式(mongo_filters)",
			Advance:      true,
		},
	},
	ModeKafka: {
		{
			KeyName:      KeyKafkaGroupID,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "logkit1",
			DefaultNoUse: true,
			Description:  "consumer组名称(kafka_groupid)",
		},
		{
			KeyName:      KeyKafkaTopic,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "test_topic1",
			DefaultNoUse: true,
			Description:  "topic名称(kafka_topic)",
		},
		{
			KeyName:      KeyKafkaZookeeper,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "localhost:2181",
			DefaultNoUse: true,
			Description:  "zookeeper地址(kafka_zookeeper)",
		},
		{
			KeyName:      KeyKafkaZookeeperChroot,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "zookeeper中kafka根路径(kafka_zookeeper_chroot)",
			Advance:      true,
		},
		OptionWhence,
		{
			KeyName:      KeyKafkaZookeeperTimeout,
			ChooseOnly:   false,
			Default:      "1",
			Required:     true,
			DefaultNoUse: false,
			Description:  "zookeeper超时时间[秒](kafka_zookeeper_timeout)",
			Advance:      true,
		},
		OptionDataSourceTag,
	},
	ModeRedis: {
		{
			KeyName:       KeyRedisDataType,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{DataTypeList, DataTypeChannel, DataTypePatterChannel, DataTypeString, DataTypeSet, DateTypeSortedSet, DateTypeHash},
			Description:   "数据读取模式(redis_datatype)",
		},
		{
			KeyName:      KeyRedisDB,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "db",
			DefaultNoUse: true,
			Description:  "数据库名称(redis_db)",
		},
		{
			KeyName:      KeyRedisKey,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "key1",
			DefaultNoUse: true,
			Description:  "redis键(redis_key)",
		},
		{
			KeyName:      KeyRedisHashArea,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "hash模式对应的数据结构域(redisHash_area)",
			Advance:      true,
		},
		{
			KeyName:      KeyRedisAddress,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "127.0.0.1:6379",
			Required:     true,
			DefaultNoUse: false,
			Description:  "数据库地址(redis_address)",
		},
		{
			KeyName:      KeyRedisPassword,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "密码(redis_password)",
			Advance:      true,
		},
		{
			KeyName:      KeyTimeoutDuration,
			ChooseOnly:   false,
			Default:      "5s",
			Required:     true,
			DefaultNoUse: false,
			Description:  "单次读取超时时间[m(分)、s(秒)](redis_timeout)",
			CheckRegex:   "\\d+[ms]",
			Advance:      true,
		},
		OptionDataSourceTag,
	},
	ModeSocket: {
		{
			KeyName:      KeySocketServiceAddress,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "tcp://127.0.0.1:3110",
			Required:     true,
			DefaultNoUse: true,
			Description:  "socket监听的地址[协议://端口](socket_service_address)",
		},
		{
			KeyName:      KeySocketMaxConnections,
			ChooseOnly:   false,
			Default:      "0",
			DefaultNoUse: false,
			Description:  "最大并发连接数[tcp](socket_max_connections)",
			Advance:      true,
		},
		{
			KeyName:      KeySocketReadTimeout,
			ChooseOnly:   false,
			Default:      "1m",
			DefaultNoUse: false,
			Description:  "连接超时时间[0为不超时](socket_read_timeout)",
			Advance:      true,
		},
		{
			KeyName:      KeySocketReadBufferSize,
			ChooseOnly:   false,
			Default:      "65535",
			DefaultNoUse: false,
			Description:  "连接缓存大小[udp](socket_read_buffer_size)",
			Advance:      true,
		},
		{
			KeyName:      KeySocketKeepAlivePeriod,
			ChooseOnly:   false,
			Default:      "5m",
			DefaultNoUse: false,
			Description:  "连接保持时长[0为关闭](socket_keep_alive_period)",
			Advance:      true,
		},
		OptionDataSourceTag,
	},
	ModeHttp: {
		{
			KeyName:      KeyHttpServiceAddress,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  DefaultHttpServiceAddress,
			Required:     true,
			DefaultNoUse: true,
			Description:  "监听的地址和端口[<ip/host/不填>:port](http_service_address)",
		},
		{
			KeyName:      KeyHttpServicePath,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  DefaultHttpServicePath,
			Required:     true,
			DefaultNoUse: true,
			Description:  "监听地址前缀(http_service_path)",
		},
	},
	ModeScript: {
		{
			KeyName:      KeyExecInterpreter,
			ChooseOnly:   false,
			Default:      "/bin/bash",
			Placeholder:  "/bin/bash",
			Required:     true,
			DefaultNoUse: true,
			Description:  "脚本执行解释器(script_exec_interpreter)",
		},
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "/home/users/john/log/my.sh",
			DefaultNoUse: true,
			Description:  "脚本路径(log_path)",
		},
		{
			KeyName:      KeyScriptCron,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "00 00 04 * * *",
			DefaultNoUse: false,
			Description:  "定时任务(script_cron)",
			Advance:      true,
		},
		{
			KeyName:       KeyScriptExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(script_exec_onstart)",
		},
	},
	ModeSnmp: {
		{
			KeyName:      KeySnmpReaderName,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "logkit_default_name",
			DefaultNoUse: true,
			Description:  "名称(snmp_reader_name)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderAgents,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "127.0.0.1:161,10.10.0.1:161",
			Required:     true,
			DefaultNoUse: true,
			Description:  "agents列表[多个用','分隔](snmp_agents)",
		},
		{
			KeyName:      KeySnmpReaderTables,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "[{\"table_name\":\"udpLocalAddress\",\"table_oid\":\"1.3.6.1.2.1.31.1.1\"}]",
			DefaultNoUse: true,
			Description:  "tables配置[填入json数组字符串](snmp_tables)",
		},
		{
			KeyName:      KeySnmpReaderFields,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "fields配置[填入json数组字符串](snmp_fields)",
		},
		{
			KeyName:       KeySnmpReaderVersion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"1", "2", "3"},
			Default:       "2",
			DefaultNoUse:  true,
			Description:   "snmp协议版本(snmp_version)",
		},
		{
			KeyName:      KeySnmpReaderTimeOut,
			ChooseOnly:   false,
			Default:      "5s",
			Required:     true,
			DefaultNoUse: false,
			Description:  "连接超时时间(snmp_time_out)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderInterval,
			ChooseOnly:   false,
			Default:      "30s",
			Required:     true,
			DefaultNoUse: false,
			Description:  "收集频率(snmp_interval)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderRetries,
			ChooseOnly:   false,
			Default:      "3",
			Required:     true,
			DefaultNoUse: false,
			Description:  "重试次数(snmp_retries)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderCommunity,
			ChooseOnly:   false,
			Default:      "public",
			Placeholder:  "public",
			DefaultNoUse: true,
			Description:  "community/秘钥[版本1/2有效](snmp_version)",
		},
		{
			KeyName:      KeySnmpReaderMaxRepetitions,
			ChooseOnly:   false,
			Default:      "50",
			DefaultNoUse: false,
			Description:  "最大迭代次数[版本2/3有效](snmp_max_repetitions)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderContextName,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "context名称[版本3有效](snmp_version)",
			Advance:      true,
		},
		{
			KeyName:       KeySnmpReaderSecLevel,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"noAuthNoPriv", "authNoPriv", "authPriv"},
			DefaultNoUse:  true,
			Description:   "安全等级[版本3有效](snmp_sec_level)",
			Advance:       true,
		},
		{
			KeyName:      KeySnmpReaderSecName,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "认证名称[版本3有效](snmp_sec_name)",
			Advance:      true,
		},
		{
			KeyName:       KeySnmpReaderAuthProtocol,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"md5", "sha", ""},
			DefaultNoUse:  false,
			Description:   "认证协议[版本3有效](snmp_auth_protocol)",
			Advance:       true,
		},
		{
			KeyName:      KeySnmpReaderAuthPassword,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "认证密码[版本3有效](snmp_auth_password)",
			Advance:      true,
		},
		{
			KeyName:       KeySnmpReaderPrivProtocol,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"des", "aes", ""},
			DefaultNoUse:  false,
			Description:   "隐私协议[版本3有效](snmp_priv_protocol)",
			Advance:       true,
		},
		{
			KeyName:      KeySnmpReaderPrivPassword,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "隐私密码[版本3有效](snmp_priv_password)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderEngineID,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "snmp_priv_engine_id[版本3有效]",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderEngineBoots,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "snmp_engine_boots(版本3有效)",
			Advance:      true,
		},
		{
			KeyName:      KeySnmpReaderEngineTime,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "snmp_engine_time(版本3有效)",
			Advance:      true,
		},
	},
}
View Source
var ModeUsages = []KeyValue{
	{ModeFileAuto, "从文件读取( fileauto 模式)"},
	{ModeDir, "从文件读取( dir 模式)"},
	{ModeFile, "从文件读取( file 模式)"},
	{ModeTailx, "从文件读取( tailx 模式)"},
	{ModeMysql, "从 MySQL 读取"},
	{ModeMssql, "从 MSSQL 读取"},
	{ModePG, "从 PostgreSQL 读取"},
	{ModeElastic, "从 Elasticsearch 读取"},
	{ModeMongo, "从 MongoDB 读取"},
	{ModeKafka, "从 Kafka 读取"},
	{ModeRedis, "从 Redis 读取"},
	{ModeSocket, "从 Socket 读取"},
	{ModeHttp, "从 http 请求中读取"},
	{ModeScript, "从脚本的执行结果中读取"},
	{ModeSnmp, "从 SNMP 服务中读取"},
}

ModeUsages 用途说明

View Source
var WaitNoSuchFile = time.Second

Functions

func Errorf added in v1.4.3

func Errorf(err error, msg string, format ...interface{}) error

func HeadPatternMode

func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)

Types

type ActiveReader

type ActiveReader struct {
	// contains filtered or unexported fields
}

func NewActiveReader

func NewActiveReader(originPath, realPath, whence string, meta *Meta, msgChan chan<- Result) (ar *ActiveReader, err error)

func (*ActiveReader) Close

func (ar *ActiveReader) Close() error

func (*ActiveReader) Lag added in v1.4.4

func (ar *ActiveReader) Lag() (rl *models.LagInfo, err error)

func (*ActiveReader) Run

func (ar *ActiveReader) Run()

func (*ActiveReader) Status added in v1.3.1

func (ar *ActiveReader) Status() utils.StatsInfo

func (*ActiveReader) SyncMeta

func (ar *ActiveReader) SyncMeta() string

除了sync自己的bufreader,还要sync一行linecache

type BufReader

type BufReader struct {
	// contains filtered or unexported fields
}

BufReader implements buffering for an FileReader object.

func NewReaderSize

func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)

NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.

func (*BufReader) Close

func (b *BufReader) Close() error

func (*BufReader) Lag added in v1.4.4

func (b *BufReader) Lag() (rl *models.LagInfo, err error)

func (*BufReader) Name

func (b *BufReader) Name() string

func (*BufReader) ReadLine

func (b *BufReader) ReadLine() (ret string, err error)

ReadLine returns a string line as a normal Reader

func (*BufReader) ReadPattern

func (b *BufReader) ReadPattern() (string, error)

ReadPattern读取日志直到匹配行首模式串

func (*BufReader) ReadString

func (b *BufReader) ReadString(delim byte) (ret string, err error)

ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.

func (*BufReader) SetMode

func (b *BufReader) SetMode(mode string, v interface{}) (err error)

func (*BufReader) Source

func (b *BufReader) Source() string

func (*BufReader) Status added in v1.3.1

func (b *BufReader) Status() utils.StatsInfo

func (*BufReader) SyncMeta

func (b *BufReader) SyncMeta()

type CollectionFilter

type CollectionFilter map[string]interface{}

CollectionFilter is just a typed map of strings of map[string]interface{}

type ElasticReader

type ElasticReader struct {
	// contains filtered or unexported fields
}

func NewESReader

func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, esVersion, keepAlive string) (er *ElasticReader, err error)

func (*ElasticReader) Close

func (er *ElasticReader) Close() (err error)

func (*ElasticReader) Name

func (er *ElasticReader) Name() string

func (*ElasticReader) ReadLine

func (er *ElasticReader) ReadLine() (data string, err error)

func (*ElasticReader) SetMode

func (er *ElasticReader) SetMode(mode string, v interface{}) error

func (*ElasticReader) Source

func (er *ElasticReader) Source() string

func (*ElasticReader) Start

func (er *ElasticReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*ElasticReader) Status added in v1.3.1

func (er *ElasticReader) Status() utils.StatsInfo

func (*ElasticReader) SyncMeta

func (er *ElasticReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Field added in v1.4.3

type Field struct {
	Name           string `json:"field_name"`
	Oid            string `json:"field_oid"`
	OidIndexSuffix string `json:"field_oid_index_suffix"`
	IsTag          bool   `json:"field_is_tag"`
	Conversion     string `json:"field_conversion"`
}

type FileReader

type FileReader interface {
	Name() string
	Source() string
	Read(p []byte) (n int, err error)
	Close() error
	SyncMeta() error
}

FileReader reader 接口方法

type HttpReader added in v1.4.2

type HttpReader struct {
	// contains filtered or unexported fields
}

func NewHttpReader added in v1.4.2

func NewHttpReader(meta *Meta, conf conf.MapConf) (*HttpReader, error)

func (*HttpReader) Close added in v1.4.2

func (h *HttpReader) Close() error

func (*HttpReader) Name added in v1.4.2

func (h *HttpReader) Name() string

func (*HttpReader) ReadLine added in v1.4.2

func (h *HttpReader) ReadLine() (data string, err error)

func (*HttpReader) SetMode added in v1.4.2

func (h *HttpReader) SetMode(mode string, v interface{}) error

func (*HttpReader) Source added in v1.4.2

func (h *HttpReader) Source() string

func (*HttpReader) Start added in v1.4.2

func (h *HttpReader) Start() error

func (*HttpReader) SyncMeta added in v1.4.2

func (h *HttpReader) SyncMeta()

type KafkaReader

type KafkaReader struct {
	ConsumerGroup    string
	Topics           []string
	ZookeeperPeers   []string
	ZookeeperChroot  string
	ZookeeperTimeout time.Duration
	Whence           string

	Consumer *consumergroup.ConsumerGroup
	// contains filtered or unexported fields
}

func NewKafkaReader

func NewKafkaReader(meta *Meta, consumerGroup string,
	topics []string, zookeeper []string, zkchroot string, zookeeperTimeout time.Duration, whence string) (kr *KafkaReader, err error)

func (*KafkaReader) Close

func (kr *KafkaReader) Close() (err error)

func (*KafkaReader) Lag added in v1.4.4

func (kr *KafkaReader) Lag() (rl *models.LagInfo, err error)

func (*KafkaReader) Name

func (kr *KafkaReader) Name() string

func (*KafkaReader) ReadLine

func (kr *KafkaReader) ReadLine() (data string, err error)

func (*KafkaReader) SetMode

func (kr *KafkaReader) SetMode(mode string, v interface{}) error

func (*KafkaReader) Source

func (kr *KafkaReader) Source() string

func (*KafkaReader) Start

func (kr *KafkaReader) Start()

func (*KafkaReader) Status added in v1.3.1

func (kr *KafkaReader) Status() utils.StatsInfo

func (*KafkaReader) SyncMeta

func (kr *KafkaReader) SyncMeta()

type LagReader added in v1.4.4

type LagReader interface {
	Lag() (*models.LagInfo, error)
}

获取数据lag的接口

type LastSync

type LastSync struct {
	// contains filtered or unexported fields
}

type Meta

type Meta struct {
	RunnerName string
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(metadir, filedonedir, logpath, mode, tagfile string, donefileRetention int) (m *Meta, err error)

func NewMetaWithConf

func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error)

func (*Meta) AppendDeleteFile

func (m *Meta) AppendDeleteFile(path string) (err error)

func (*Meta) AppendDoneFile

func (m *Meta) AppendDoneFile(path string) (err error)

AppendDoneFile 将处理完的文件写入doneFile中

func (*Meta) BufFile

func (m *Meta) BufFile() string

BufFile 返回buf的文件路径

func (*Meta) BufMetaFile

func (m *Meta) BufMetaFile() string

BufMetaFile 返回buf的meta文件路径

func (*Meta) CacheLineFile

func (m *Meta) CacheLineFile() string

func (*Meta) Clear

func (m *Meta) Clear() error

Clear 删除所有meta信息

func (*Meta) DeleteDoneFile

func (m *Meta) DeleteDoneFile(path string) error

func (*Meta) DeleteFile

func (m *Meta) DeleteFile() string

DeleteFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFile

func (m *Meta) DoneFile() string

DoneFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFilePath

func (m *Meta) DoneFilePath() string

DoneFilePath 返回meta的filedone文件的存放目录

func (*Meta) FtSaveLogPath added in v1.3.5

func (m *Meta) FtSaveLogPath() string

FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径

func (*Meta) GetDataSourceTag

func (m *Meta) GetDataSourceTag() string

func (*Meta) GetDoneFiles

func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)

func (*Meta) GetEncodingWay

func (m *Meta) GetEncodingWay() (e string)

GetEncodingWay 获取文件编码方式

func (*Meta) GetMode

func (m *Meta) GetMode() string

func (*Meta) GetTagFile added in v1.4.3

func (m *Meta) GetTagFile() string

func (*Meta) GetTags added in v1.4.3

func (m *Meta) GetTags() map[string]interface{}

func (*Meta) IsDoneFile

func (m *Meta) IsDoneFile(file string) bool

IsDoneFile 返回是否是Donefile格式的文件

func (*Meta) IsExist

func (m *Meta) IsExist() bool

func (*Meta) IsFileMode added in v1.3.1

func (m *Meta) IsFileMode() bool

func (*Meta) IsNotExist

func (m *Meta) IsNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsNotValid

func (m *Meta) IsNotValid() bool

IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏

func (*Meta) IsValid

func (m *Meta) IsValid() bool

func (*Meta) LogPath

func (m *Meta) LogPath() string

func (*Meta) MetaFile

func (m *Meta) MetaFile() string

MetaFile 返回metaFileoffset 的meta文件地址

func (*Meta) ReadBuf

func (m *Meta) ReadBuf(buf []byte) (n int, err error)

func (*Meta) ReadBufMeta

func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)

func (*Meta) ReadCacheLine

func (m *Meta) ReadCacheLine() ([]byte, error)

func (*Meta) ReadOffset

func (m *Meta) ReadOffset() (currFile string, offset int64, err error)

读取当前读取的文件和offset

func (*Meta) ReadStatistic added in v1.3.5

func (m *Meta) ReadStatistic() (stat Statistic, err error)

func (*Meta) Reset added in v1.2.4

func (m *Meta) Reset() error

func (*Meta) SetEncodingWay

func (m *Meta) SetEncodingWay(e string)

SetEncodingWay 设置文件编码方式,默认为 utf-8

func (*Meta) StatisticFile added in v1.3.5

func (m *Meta) StatisticFile() string

StatisticFile 返回 Runner 统计信息的文件路径

func (*Meta) WriteBuf

func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)

func (*Meta) WriteCacheLine

func (m *Meta) WriteCacheLine(lines string) error

func (*Meta) WriteOffset

func (m *Meta) WriteOffset(currFile string, offset int64) (err error)

WriteOffset 将当前文件和offset写入meta中

func (*Meta) WriteStatistic added in v1.3.5

func (m *Meta) WriteStatistic(stat *Statistic) error

type MongoReader

type MongoReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewMongoReader

func NewMongoReader(meta *Meta, readBatch int, host, database, collection, offsetkey, cronSched, filters, certfile string, execOnStart bool) (mr *MongoReader, err error)

func (*MongoReader) Close

func (mr *MongoReader) Close() (err error)

func (*MongoReader) LoopRun added in v1.3.3

func (mr *MongoReader) LoopRun()

func (*MongoReader) Name

func (mr *MongoReader) Name() string

func (*MongoReader) ReadLine

func (mr *MongoReader) ReadLine() (data string, err error)

func (*MongoReader) SetMode

func (mr *MongoReader) SetMode(mode string, v interface{}) error

func (*MongoReader) Source

func (mr *MongoReader) Source() string

func (*MongoReader) Start

func (mr *MongoReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*MongoReader) Status added in v1.3.1

func (mr *MongoReader) Status() utils.StatsInfo

func (*MongoReader) SyncMeta

func (mr *MongoReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type MultiReader

type MultiReader struct {
	// contains filtered or unexported fields
}

func NewMultiReader

func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)

func (*MultiReader) Close

func (mr *MultiReader) Close() (err error)

func (*MultiReader) Expire

func (mr *MultiReader) Expire()

Expire 函数关闭过期的文件,再更新

func (*MultiReader) Lag added in v1.4.4

func (mr *MultiReader) Lag() (rl *models.LagInfo, err error)

func (*MultiReader) Name

func (mr *MultiReader) Name() string

func (*MultiReader) ReadLine

func (mr *MultiReader) ReadLine() (data string, err error)

func (*MultiReader) Reset added in v1.4.3

func (mr *MultiReader) Reset() (err error)

func (*MultiReader) SetMode

func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)

func (*MultiReader) Source

func (mr *MultiReader) Source() string

func (*MultiReader) Start

func (mr *MultiReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务

func (*MultiReader) StatLogPath

func (mr *MultiReader) StatLogPath()

func (*MultiReader) Status added in v1.3.1

func (mr *MultiReader) Status() utils.StatsInfo

func (*MultiReader) SyncMeta

func (mr *MultiReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type NestedError added in v1.4.3

type NestedError struct {
	Err       error
	NestedErr error
}

func (NestedError) Error added in v1.4.3

func (ne NestedError) Error() string

type RTable added in v1.4.3

type RTable struct {
	Name string
	Time string
	Rows []RTableRow
}

type RTableRow added in v1.4.3

type RTableRow struct {
	Tags   map[string]string
	Fields map[string]interface{}
}

type Reader

type Reader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	ReadLine() (string, error)
	SetMode(mode string, v interface{}) error
	Close() error
	SyncMeta()
}

Reader 是一个通用的行读取reader接口

func NewFileAutoReader added in v1.4.1

func NewFileAutoReader(conf conf.MapConf, meta *Meta, isFromWeb bool, bufSize int, whence string, path string, fr FileReader) (reader Reader, err error)

func NewFileBufReader

func NewFileBufReader(conf conf.MapConf, isFromWeb bool) (reader Reader, err error)

NewFileReader 创建FileReader

func NewFileBufReaderWithMeta

func NewFileBufReaderWithMeta(conf conf.MapConf, meta *Meta, isFromWeb bool) (reader Reader, err error)

type RedisOptionn added in v1.2.1

type RedisOptionn struct {
	// contains filtered or unexported fields
}

type RedisReader added in v1.2.1

type RedisReader struct {
	// contains filtered or unexported fields
}

func NewRedisReader added in v1.2.1

func NewRedisReader(meta *Meta, conf conf.MapConf) (rr *RedisReader, err error)

func (*RedisReader) Close added in v1.2.1

func (rr *RedisReader) Close() (err error)

func (*RedisReader) Name added in v1.2.1

func (rr *RedisReader) Name() string

func (*RedisReader) ReadLine added in v1.2.1

func (rr *RedisReader) ReadLine() (data string, err error)

func (*RedisReader) SetMode added in v1.2.1

func (rr *RedisReader) SetMode(mode string, v interface{}) error

func (*RedisReader) Source added in v1.2.1

func (rr *RedisReader) Source() string

func (*RedisReader) Start added in v1.2.1

func (rr *RedisReader) Start()

func (*RedisReader) Status added in v1.3.1

func (rr *RedisReader) Status() utils.StatsInfo

func (*RedisReader) SyncMeta added in v1.2.1

func (rr *RedisReader) SyncMeta()

type Result added in v1.2.2

type Result struct {
	// contains filtered or unexported fields
}

type ScriptReader added in v1.4.3

type ScriptReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewScriptReader added in v1.4.3

func NewScriptReader(meta *Meta, conf conf.MapConf) (sr *ScriptReader, err error)

func (*ScriptReader) Close added in v1.4.3

func (sr *ScriptReader) Close() (err error)

func (*ScriptReader) LoopRun added in v1.4.3

func (sr *ScriptReader) LoopRun()

func (*ScriptReader) Name added in v1.4.3

func (sr *ScriptReader) Name() string

func (*ScriptReader) ReadLine added in v1.4.3

func (sr *ScriptReader) ReadLine() (data string, err error)

func (*ScriptReader) SetMode added in v1.4.3

func (sr *ScriptReader) SetMode(mode string, v interface{}) error

func (*ScriptReader) Source added in v1.4.3

func (sr *ScriptReader) Source() string

func (*ScriptReader) Start added in v1.4.3

func (sr *ScriptReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*ScriptReader) SyncMeta added in v1.4.3

func (sr *ScriptReader) SyncMeta()

type SeqFile

type SeqFile struct {
	// contains filtered or unexported fields
}

SeqFile 按最终修改时间依次读取文件的Reader类型

func NewSeqFile

func NewSeqFile(meta *Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string) (sf *SeqFile, err error)

func (*SeqFile) Close

func (sf *SeqFile) Close() (err error)

func (*SeqFile) Lag added in v1.4.4

func (sf *SeqFile) Lag() (rl *models.LagInfo, err error)

func (*SeqFile) Name

func (sf *SeqFile) Name() string

func (*SeqFile) Read

func (sf *SeqFile) Read(p []byte) (n int, err error)

func (*SeqFile) Source

func (sf *SeqFile) Source() string

func (*SeqFile) SyncMeta

func (sf *SeqFile) SyncMeta() (err error)

type ServerReader added in v1.2.1

type ServerReader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	Start()
	ReadLine() (string, error)
	Close() error
	SyncMeta()
}

TODO 构建统一的 Server reader框架, 减少重复的编码

type SingleFile

type SingleFile struct {
	// contains filtered or unexported fields
}

func NewSingleFile

func NewSingleFile(meta *Meta, path, whence string, isFromWeb bool) (sf *SingleFile, err error)

func (*SingleFile) Close

func (sf *SingleFile) Close() (err error)

func (*SingleFile) Lag added in v1.4.4

func (sf *SingleFile) Lag() (rl *models.LagInfo, err error)

func (*SingleFile) Name

func (sf *SingleFile) Name() string

func (*SingleFile) Read

func (sf *SingleFile) Read(p []byte) (n int, err error)

func (*SingleFile) Reopen

func (sf *SingleFile) Reopen() (err error)

func (*SingleFile) Source

func (sf *SingleFile) Source() string

func (*SingleFile) SyncMeta

func (sf *SingleFile) SyncMeta() error

type SnmpReader added in v1.4.3

type SnmpReader struct {
	SnmpName       string
	Agents         []string      // agent address [ip:port]
	Timeout        time.Duration // 等待回复的时间
	Interval       time.Duration // 收集频率
	Retries        int
	Version        uint8  // 1, 2, 3
	Community      string // version 1&2 的参数
	MaxRepetitions uint8  // version 2&3 的参数
	ContextName    string // version 3 参数

	SecLevel     string //"noAuthNoPriv", "authNoPriv", "authPriv"
	SecName      string
	AuthProtocol string // "MD5", "SHA", "", 默认: ""
	AuthPassword string
	PrivProtocol string // "DES", "AES", "", 默认: ""
	PrivPassword string
	EngineID     string
	EngineBoots  uint32
	EngineTime   uint32

	Tables          []Table
	Fields          []Field
	ConnectionCache []snmpConnection

	Meta     *Meta
	Status   int32
	StopChan chan struct{}
	DataChan chan interface{}
}

SnmpReader holds the configuration for the plugin.

func NewSnmpReader added in v1.4.3

func NewSnmpReader(meta *Meta, c conf.MapConf) (s *SnmpReader, err error)

func (*SnmpReader) Close added in v1.4.3

func (s *SnmpReader) Close() error

func (*SnmpReader) Gather added in v1.4.3

func (s *SnmpReader) Gather() (err error)

func (*SnmpReader) Name added in v1.4.3

func (s *SnmpReader) Name() string

Name reader名称

func (*SnmpReader) ReadLine added in v1.4.3

func (s *SnmpReader) ReadLine() (line string, err error)

func (*SnmpReader) SetMode added in v1.4.3

func (s *SnmpReader) SetMode(mode string, v interface{}) error

func (*SnmpReader) Source added in v1.4.3

func (s *SnmpReader) Source() string

Source 读取的数据源

func (*SnmpReader) Start added in v1.4.4

func (s *SnmpReader) Start() error

func (*SnmpReader) StoreData added in v1.4.3

func (s *SnmpReader) StoreData(data []map[string]interface{}) (err error)

func (*SnmpReader) SyncMeta added in v1.4.3

func (s *SnmpReader) SyncMeta()

type SocketReader added in v1.3.6

type SocketReader struct {
	ServiceAddress  string
	MaxConnections  int
	ReadBufferSize  int
	ReadTimeout     time.Duration
	KeepAlivePeriod time.Duration

	// resource need  close
	ReadChan chan string
	Closer   io.Closer
	// contains filtered or unexported fields
}

func NewSocketReader added in v1.3.6

func NewSocketReader(meta *Meta, conf conf.MapConf) (*SocketReader, error)

func (*SocketReader) Close added in v1.3.6

func (sr *SocketReader) Close() (err error)

func (*SocketReader) Name added in v1.3.6

func (sr *SocketReader) Name() string

func (*SocketReader) ReadLine added in v1.3.6

func (sr *SocketReader) ReadLine() (data string, err error)

func (*SocketReader) SetMode added in v1.3.6

func (sr *SocketReader) SetMode(mode string, v interface{}) error

func (*SocketReader) Source added in v1.3.6

func (sr *SocketReader) Source() string

func (*SocketReader) Start added in v1.3.6

func (sr *SocketReader) Start() error

func (*SocketReader) SyncMeta added in v1.3.6

func (sr *SocketReader) SyncMeta()

type SqlReader

type SqlReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewSQLReader

func NewSQLReader(meta *Meta, conf conf.MapConf) (mr *SqlReader, err error)

func (*SqlReader) Close

func (mr *SqlReader) Close() (err error)

func (*SqlReader) LoopRun added in v1.3.3

func (mr *SqlReader) LoopRun()

func (*SqlReader) Name

func (mr *SqlReader) Name() string

func (*SqlReader) ReadLine

func (mr *SqlReader) ReadLine() (data string, err error)

func (*SqlReader) SetMode

func (mr *SqlReader) SetMode(mode string, v interface{}) error

func (*SqlReader) Source

func (mr *SqlReader) Source() string

func (*SqlReader) Start

func (mr *SqlReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*SqlReader) Status added in v1.3.1

func (mr *SqlReader) Status() utils.StatsInfo

func (*SqlReader) SyncMeta

func (mr *SqlReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Statistic added in v1.3.5

type Statistic struct {
	ReaderCnt int64               `json:"reader_count"` // 读取总条数
	ParserCnt [2]int64            `json:"parser_connt"` // [解析成功, 解析失败]
	SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败]
}

type StatsReader added in v1.3.1

type StatsReader interface {
	//Name reader名称
	Name() string
	Status() utils.StatsInfo
}

StatsReader 是一个通用的带有统计接口的reader

type Table added in v1.4.3

type Table struct {
	Name        string   `json:"table_name"`
	InheritTags []string `json:"table_inherit_tags"`
	IndexAsTag  bool     `json:"table_index_tag"`
	Fields      []Field  `json:"table_fields"`
	Oid         string   `json:"table_oid"`
}

func (Table) Build added in v1.4.3

func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL