Documentation ¶
Overview ¶
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Index ¶
- Constants
- Variables
- func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)
- type ActiveReader
- type BufReader
- func (b *BufReader) Close() error
- func (b *BufReader) Name() string
- func (b *BufReader) ReadLine() (ret string, err error)
- func (b *BufReader) ReadPattern() (string, error)
- func (b *BufReader) ReadString(delim byte) (ret string, err error)
- func (b *BufReader) SetMode(mode string, v interface{}) (err error)
- func (b *BufReader) Source() string
- func (b *BufReader) SyncMeta()
- type CollectionFilter
- type ElasticReader
- func (er *ElasticReader) Close() (err error)
- func (er *ElasticReader) Name() string
- func (er *ElasticReader) ReadLine() (data string, err error)
- func (er *ElasticReader) SetMode(mode string, v interface{}) error
- func (er *ElasticReader) Source() string
- func (er *ElasticReader) Start()
- func (er *ElasticReader) SyncMeta()
- type FileReader
- type KafkaReader
- func (kr *KafkaReader) Close() (err error)
- func (kr *KafkaReader) Name() string
- func (kr *KafkaReader) ReadLine() (data string, err error)
- func (kr *KafkaReader) SetMode(mode string, v interface{}) error
- func (kr *KafkaReader) Source() string
- func (kr *KafkaReader) Start()
- func (kr *KafkaReader) SyncMeta()
- type LastSync
- type Meta
- func (m *Meta) AppendDeleteFile(path string) (err error)
- func (m *Meta) AppendDoneFile(path string) (err error)
- func (m *Meta) BufFile() string
- func (m *Meta) BufMetaFile() string
- func (m *Meta) CacheLineFile() string
- func (m *Meta) Clear() error
- func (m *Meta) DeleteDoneFile(path string) error
- func (m *Meta) DeleteFile() string
- func (m *Meta) DoneFile() string
- func (m *Meta) DoneFilePath() string
- func (m *Meta) GetDataSourceTag() string
- func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)
- func (m *Meta) GetEncodingWay() (e string)
- func (m *Meta) GetMode() string
- func (m *Meta) IsDoneFile(file string) bool
- func (m *Meta) IsExist() bool
- func (m *Meta) IsNotExist() bool
- func (m *Meta) IsNotValid() bool
- func (m *Meta) IsValid() bool
- func (m *Meta) LogPath() string
- func (m *Meta) MetaFile() string
- func (m *Meta) ReadBuf(buf []byte) (n int, err error)
- func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)
- func (m *Meta) ReadCacheLine() ([]byte, error)
- func (m *Meta) ReadOffset() (currFile string, offset int64, err error)
- func (m *Meta) SetEncodingWay(e string)
- func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)
- func (m *Meta) WriteCacheLine(lines string) error
- func (m *Meta) WriteOffset(currFile string, offset int64) (err error)
- type MongoReader
- func (mr *MongoReader) Close() (err error)
- func (mr *MongoReader) Name() string
- func (mr *MongoReader) ReadLine() (data string, err error)
- func (mr *MongoReader) SetMode(mode string, v interface{}) error
- func (mr *MongoReader) Source() string
- func (mr *MongoReader) Start()
- func (mr *MongoReader) SyncMeta()
- type MultiReader
- func (mr *MultiReader) Close() (err error)
- func (mr *MultiReader) Expire()
- func (mr *MultiReader) Name() string
- func (mr *MultiReader) ReadLine() (data string, err error)
- func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)
- func (mr *MultiReader) Source() string
- func (mr *MultiReader) Start()
- func (mr *MultiReader) StatLogPath()
- func (mr *MultiReader) SyncMeta()
- type Reader
- type SeqFile
- type SingleFile
- type SqlReader
Constants ¶
const ( KeyLogPath = "log_path" KeyMetaPath = "meta_path" KeyFileDone = "file_done" KeyMode = "mode" KeyBufSize = "reader_buf_size" KeyWhence = "read_from" KeyEncoding = "encoding" KeyReadIOLimit = "readio_limit" KeyDataSourceTag = "datasource_tag" KeyHeadPattern = "head_pattern" KeyRunnerName = "runner_name" // 忽略隐藏文件 KeyIgnoreHiddenFile = "ignore_hidden" KeyIgnoreFileSuffix = "ignore_file_suffix" KeyValidFilePattern = "valid_file_pattern" KeyExpire = "expire" KeyMaxOpenFiles = "max_open_files" KeyStatInterval = "stat_interval" KeyMysqlOffsetKey = "mysql_offset_key" KeyMysqlReadBatch = "mysql_limit_batch" KeyMysqlDataSource = "mysql_datasource" KeyMysqlDataBase = "mysql_database" KeyMysqlSQL = "mysql_sql" KeyMysqlCron = "mysql_cron" KeyMysqlExecOnStart = "mysql_exec_onstart" KeyMssqlOffsetKey = "mssql_offset_key" KeyMssqlReadBatch = "mssql_limit_batch" KeyMssqlDataSource = "mssql_datasource" KeyMssqlDataBase = "mssql_database" KeyMssqlSQL = "mssql_sql" KeyMssqlCron = "mssql_cron" KeyMssqlExecOnStart = "mssql_exec_onstart" KeyESReadBatch = "es_limit_batch" KeyESIndex = "es_index" KeyESType = "es_type" KeyESHost = "es_host" KeyESKeepAlive = "es_keepalive" KeyMongoHost = "mongo_host" KeyMongoDatabase = "mongo_database" KeyMongoCollection = "mongo_collection" KeyMongoOffsetKey = "mongo_offset_key" KeyMongoReadBatch = "mongo_limit_batch" KeyMongoCron = "mongo_cron" KeyMongoExecOnstart = "mongo_exec_onstart" KeyMongoFilters = "mongo_filters" KeyMongoCert = "mongo_cacert" KeyKafkaGroupID = "kafka_groupid" KeyKafkaTopic = "kafka_topic" KeyKafkaZookeeper = "kafka_zookeeper" )
FileReader's conf keys
const ( ModeDir = "dir" ModeFile = "file" ModeTailx = "tailx" ModeMysql = "mysql" ModeMssql = "mssql" ModeElastic = "elastic" ModeMongo = "mongo" ModeKafka = "kafka" )
FileReader's modes
const ( ReadModeHeadPatternString = "mode_head_pattern_string" ReadModeHeadPatternRegexp = "mode_head_pattern_regexp" )
const ( WhenceOldest = "oldest" WhenceNewest = "newest" )
KeyWhence 的可选项
const ( StatusInit int32 = iota StatusStopped StatusStoping StatusRunning )
const DirMode = "dir"
DirMode 按时间顺序顺次读取文件夹下所有文件的模式
const FileMode = "file"
FileMode 读取单个文件模式
const (
MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
const (
MongoDefaultOffsetKey = "_id"
)
const (
SQL_SPLITER = ";"
)
Variables ¶
var ( ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte") ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune") ErrBufferFull = errors.New("bufio: buffer full") ErrNegativeCount = errors.New("bufio: negative count") )
var ErrFileNotDir = errors.New("file is not directory")
var ErrFileNotRegular = errors.New("file is not regular")
var ErrMetaFileRead = errors.New("cannot read meta file")
var ErrNoFileChosen = errors.New("no files found")
var ErrStopped = errors.New("runner stopped")
var WaitNoSuchFile = time.Second
Functions ¶
Types ¶
type ActiveReader ¶
type ActiveReader struct {
// contains filtered or unexported fields
}
func NewActiveReader ¶
func NewActiveReader(logPath, whence string, meta *Meta) (ar *ActiveReader, err error)
func (*ActiveReader) Close ¶
func (ar *ActiveReader) Close() error
func (*ActiveReader) Run ¶
func (ar *ActiveReader) Run()
func (*ActiveReader) SyncMeta ¶
func (ar *ActiveReader) SyncMeta()
除了sync自己的bufreader,还要sync一行linecache
type BufReader ¶
type BufReader struct {
// contains filtered or unexported fields
}
BufReader implements buffering for an FileReader object.
func NewReaderSize ¶
func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)
NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.
func (*BufReader) ReadPattern ¶
ReadPattern读取日志直到匹配行首模式串
func (*BufReader) ReadString ¶
ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.
type CollectionFilter ¶
type CollectionFilter map[string]interface{}
CollectionFilter is just a typed map of strings of map[string]interface{}
type ElasticReader ¶
type ElasticReader struct {
// contains filtered or unexported fields
}
func NewESReader ¶
func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, keepAlive string) (er *ElasticReader, err error)
func (*ElasticReader) Close ¶
func (er *ElasticReader) Close() (err error)
func (*ElasticReader) Name ¶
func (er *ElasticReader) Name() string
func (*ElasticReader) ReadLine ¶
func (er *ElasticReader) ReadLine() (data string, err error)
func (*ElasticReader) SetMode ¶
func (er *ElasticReader) SetMode(mode string, v interface{}) error
func (*ElasticReader) Source ¶
func (er *ElasticReader) Source() string
func (*ElasticReader) Start ¶
func (er *ElasticReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
func (*ElasticReader) SyncMeta ¶
func (er *ElasticReader) SyncMeta()
SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。
type FileReader ¶
type FileReader interface { Name() string Source() string Read(p []byte) (n int, err error) Close() error SyncMeta() error }
FileReader reader 接口方法
type KafkaReader ¶
type KafkaReader struct { ConsumerGroup string Topics []string ZookeeperPeers []string ZookeeperChroot string Whence string Consumer *consumergroup.ConsumerGroup // contains filtered or unexported fields }
func NewKafkaReader ¶
func (*KafkaReader) Close ¶
func (kr *KafkaReader) Close() (err error)
func (*KafkaReader) Name ¶
func (kr *KafkaReader) Name() string
func (*KafkaReader) ReadLine ¶
func (kr *KafkaReader) ReadLine() (data string, err error)
func (*KafkaReader) SetMode ¶
func (kr *KafkaReader) SetMode(mode string, v interface{}) error
func (*KafkaReader) Source ¶
func (kr *KafkaReader) Source() string
func (*KafkaReader) Start ¶
func (kr *KafkaReader) Start()
func (*KafkaReader) SyncMeta ¶
func (kr *KafkaReader) SyncMeta()
type Meta ¶
type Meta struct { RunnerName string // contains filtered or unexported fields }
func (*Meta) AppendDeleteFile ¶
func (*Meta) AppendDoneFile ¶
AppendDoneFile 将处理完的文件写入doneFile中
func (*Meta) CacheLineFile ¶
func (*Meta) DeleteDoneFile ¶
func (*Meta) DoneFilePath ¶
DoneFilePath 返回meta的filedone文件的存放目录
func (*Meta) GetDataSourceTag ¶
func (*Meta) IsNotValid ¶
IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏
func (*Meta) ReadBufMeta ¶
func (*Meta) ReadCacheLine ¶
func (*Meta) ReadOffset ¶
ReadOffset 读取当前读取的文件和offset
func (*Meta) SetEncodingWay ¶
SetEncodingWay 设置文件编码方式,默认为 utf-8
func (*Meta) WriteCacheLine ¶
type MongoReader ¶
func NewMongoReader ¶
func (*MongoReader) Close ¶
func (mr *MongoReader) Close() (err error)
func (*MongoReader) Name ¶
func (mr *MongoReader) Name() string
func (*MongoReader) ReadLine ¶
func (mr *MongoReader) ReadLine() (data string, err error)
func (*MongoReader) SetMode ¶
func (mr *MongoReader) SetMode(mode string, v interface{}) error
func (*MongoReader) Source ¶
func (mr *MongoReader) Source() string
func (*MongoReader) Start ¶
func (mr *MongoReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题
type MultiReader ¶
type MultiReader struct {
// contains filtered or unexported fields
}
func NewMultiReader ¶
func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)
func (*MultiReader) Close ¶
func (mr *MultiReader) Close() (err error)
func (*MultiReader) Name ¶
func (mr *MultiReader) Name() string
func (*MultiReader) ReadLine ¶
func (mr *MultiReader) ReadLine() (data string, err error)
func (*MultiReader) SetMode ¶
func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)
func (*MultiReader) Source ¶
func (mr *MultiReader) Source() string
func (*MultiReader) Start ¶
func (mr *MultiReader) Start()
Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务
func (*MultiReader) StatLogPath ¶
func (mr *MultiReader) StatLogPath()
type Reader ¶
type Reader interface { //Name reader名称 Name() string //Source 读取的数据源 Source() string ReadLine() (string, error) SetMode(mode string, v interface{}) error Close() error SyncMeta() }
Reader 是一个通用的行读取reader接口
func NewFileBufReader ¶
NewFileReader 创建FileReader
type SeqFile ¶
type SeqFile struct {
// contains filtered or unexported fields
}
SeqFile 按最终修改时间依次读取文件的Reader类型
func NewSeqFile ¶
type SingleFile ¶
type SingleFile struct {
// contains filtered or unexported fields
}
func NewSingleFile ¶
func NewSingleFile(meta *Meta, path, whence string) (sf *SingleFile, err error)
func (*SingleFile) Close ¶
func (sf *SingleFile) Close() (err error)
func (*SingleFile) Name ¶
func (sf *SingleFile) Name() string
func (*SingleFile) Reopen ¶
func (sf *SingleFile) Reopen() (err error)
func (*SingleFile) Source ¶
func (sf *SingleFile) Source() string
func (*SingleFile) SyncMeta ¶
func (sf *SingleFile) SyncMeta() error