reader

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2017 License: Apache-2.0 Imports: 32 Imported by: 74

Documentation

Overview

Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.

Index

Constants

View Source
const (
	KeyLogPath       = "log_path"
	KeyMetaPath      = "meta_path"
	KeyFileDone      = "file_done"
	KeyMode          = "mode"
	KeyBufSize       = "reader_buf_size"
	KeyWhence        = "read_from"
	KeyEncoding      = "encoding"
	KeyReadIOLimit   = "readio_limit"
	KeyDataSourceTag = "datasource_tag"
	KeyHeadPattern   = "head_pattern"
	KeyRunnerName    = "runner_name"

	// 忽略隐藏文件
	KeyIgnoreHiddenFile = "ignore_hidden"
	KeyIgnoreFileSuffix = "ignore_file_suffix"
	KeyValidFilePattern = "valid_file_pattern"

	KeyExpire       = "expire"
	KeyMaxOpenFiles = "max_open_files"
	KeyStatInterval = "stat_interval"

	KeyMysqlOffsetKey   = "mysql_offset_key"
	KeyMysqlReadBatch   = "mysql_limit_batch"
	KeyMysqlDataSource  = "mysql_datasource"
	KeyMysqlDataBase    = "mysql_database"
	KeyMysqlSQL         = "mysql_sql"
	KeyMysqlCron        = "mysql_cron"
	KeyMysqlExecOnStart = "mysql_exec_onstart"

	KeyMssqlOffsetKey   = "mssql_offset_key"
	KeyMssqlReadBatch   = "mssql_limit_batch"
	KeyMssqlDataSource  = "mssql_datasource"
	KeyMssqlDataBase    = "mssql_database"
	KeyMssqlSQL         = "mssql_sql"
	KeyMssqlCron        = "mssql_cron"
	KeyMssqlExecOnStart = "mssql_exec_onstart"

	KeyESReadBatch = "es_limit_batch"
	KeyESIndex     = "es_index"
	KeyESType      = "es_type"
	KeyESHost      = "es_host"
	KeyESKeepAlive = "es_keepalive"

	KeyMongoHost        = "mongo_host"
	KeyMongoDatabase    = "mongo_database"
	KeyMongoCollection  = "mongo_collection"
	KeyMongoOffsetKey   = "mongo_offset_key"
	KeyMongoReadBatch   = "mongo_limit_batch"
	KeyMongoCron        = "mongo_cron"
	KeyMongoExecOnstart = "mongo_exec_onstart"
	KeyMongoFilters     = "mongo_filters"
	KeyMongoCert        = "mongo_cacert"

	KeyKafkaGroupID   = "kafka_groupid"
	KeyKafkaTopic     = "kafka_topic"
	KeyKafkaZookeeper = "kafka_zookeeper"
)

FileReader's conf keys

View Source
const (
	ModeDir     = "dir"
	ModeFile    = "file"
	ModeTailx   = "tailx"
	ModeMysql   = "mysql"
	ModeMssql   = "mssql"
	ModeElastic = "elastic"
	ModeMongo   = "mongo"
	ModeKafka   = "kafka"
)

FileReader's modes

View Source
const (
	ReadModeHeadPatternString = "mode_head_pattern_string"
	ReadModeHeadPatternRegexp = "mode_head_pattern_regexp"
)
View Source
const (
	WhenceOldest = "oldest"
	WhenceNewest = "newest"
)

KeyWhence 的可选项

View Source
const (
	StatusInit int32 = iota
	StatusStopped
	StatusStoping
	StatusRunning
)
View Source
const DirMode = "dir"

DirMode 按时间顺序顺次读取文件夹下所有文件的模式

View Source
const FileMode = "file"

FileMode 读取单个文件模式

View Source
const (
	MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
View Source
const (
	MongoDefaultOffsetKey = "_id"
)
View Source
const (
	SQL_SPLITER = ";"
)

Variables

View Source
var (
	ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte")
	ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune")
	ErrBufferFull        = errors.New("bufio: buffer full")
	ErrNegativeCount     = errors.New("bufio: negative count")
)
View Source
var ErrFileNotDir = errors.New("file is not directory")
View Source
var ErrFileNotRegular = errors.New("file is not regular")
View Source
var ErrMetaFileRead = errors.New("cannot read meta file")
View Source
var ErrNoFileChosen = errors.New("no files found")
View Source
var ErrStopped = errors.New("runner stopped")
View Source
var WaitNoSuchFile = time.Second

Functions

func HeadPatternMode

func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)

Types

type ActiveReader

type ActiveReader struct {
	// contains filtered or unexported fields
}

func NewActiveReader

func NewActiveReader(logPath, whence string, meta *Meta) (ar *ActiveReader, err error)

func (*ActiveReader) Close

func (ar *ActiveReader) Close() error

func (*ActiveReader) Run

func (ar *ActiveReader) Run()

func (*ActiveReader) SyncMeta

func (ar *ActiveReader) SyncMeta()

除了sync自己的bufreader,还要sync一行linecache

type BufReader

type BufReader struct {
	// contains filtered or unexported fields
}

BufReader implements buffering for an FileReader object.

func NewReaderSize

func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)

NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.

func (*BufReader) Close

func (b *BufReader) Close() error

func (*BufReader) Name

func (b *BufReader) Name() string

func (*BufReader) ReadLine

func (b *BufReader) ReadLine() (ret string, err error)

ReadLine returns a string line as a normal Reader

func (*BufReader) ReadPattern

func (b *BufReader) ReadPattern() (string, error)

ReadPattern读取日志直到匹配行首模式串

func (*BufReader) ReadString

func (b *BufReader) ReadString(delim byte) (ret string, err error)

ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.

func (*BufReader) SetMode

func (b *BufReader) SetMode(mode string, v interface{}) (err error)

func (*BufReader) Source

func (b *BufReader) Source() string

func (*BufReader) SyncMeta

func (b *BufReader) SyncMeta()

type CollectionFilter

type CollectionFilter map[string]interface{}

CollectionFilter is just a typed map of strings of map[string]interface{}

type ElasticReader

type ElasticReader struct {
	// contains filtered or unexported fields
}

func NewESReader

func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, keepAlive string) (er *ElasticReader, err error)

func (*ElasticReader) Close

func (er *ElasticReader) Close() (err error)

func (*ElasticReader) Name

func (er *ElasticReader) Name() string

func (*ElasticReader) ReadLine

func (er *ElasticReader) ReadLine() (data string, err error)

func (*ElasticReader) SetMode

func (er *ElasticReader) SetMode(mode string, v interface{}) error

func (*ElasticReader) Source

func (er *ElasticReader) Source() string

func (*ElasticReader) Start

func (er *ElasticReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*ElasticReader) SyncMeta

func (er *ElasticReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type FileReader

type FileReader interface {
	Name() string
	Source() string
	Read(p []byte) (n int, err error)
	Close() error
	SyncMeta() error
}

FileReader reader 接口方法

type KafkaReader

type KafkaReader struct {
	ConsumerGroup   string
	Topics          []string
	ZookeeperPeers  []string
	ZookeeperChroot string
	Whence          string

	Consumer *consumergroup.ConsumerGroup
	// contains filtered or unexported fields
}

func NewKafkaReader

func NewKafkaReader(meta *Meta, consumerGroup string,
	topics []string, zookeeper []string, whence string) (kr *KafkaReader, err error)

func (*KafkaReader) Close

func (kr *KafkaReader) Close() (err error)

func (*KafkaReader) Name

func (kr *KafkaReader) Name() string

func (*KafkaReader) ReadLine

func (kr *KafkaReader) ReadLine() (data string, err error)

func (*KafkaReader) SetMode

func (kr *KafkaReader) SetMode(mode string, v interface{}) error

func (*KafkaReader) Source

func (kr *KafkaReader) Source() string

func (*KafkaReader) Start

func (kr *KafkaReader) Start()

func (*KafkaReader) SyncMeta

func (kr *KafkaReader) SyncMeta()

type LastSync

type LastSync struct {
	// contains filtered or unexported fields
}

type Meta

type Meta struct {
	RunnerName string
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(metadir, filedonedir, logpath, mode string, donefileRetention int) (m *Meta, err error)

func NewMetaWithConf

func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error)

func (*Meta) AppendDeleteFile

func (m *Meta) AppendDeleteFile(path string) (err error)

func (*Meta) AppendDoneFile

func (m *Meta) AppendDoneFile(path string) (err error)

AppendDoneFile 将处理完的文件写入doneFile中

func (*Meta) BufFile

func (m *Meta) BufFile() string

BufFile 返回buf的文件路径

func (*Meta) BufMetaFile

func (m *Meta) BufMetaFile() string

BufMetaFile 返回buf的meta文件路径

func (*Meta) CacheLineFile

func (m *Meta) CacheLineFile() string

func (*Meta) Clear

func (m *Meta) Clear() error

Clear 删除所有meta信息

func (*Meta) DeleteDoneFile

func (m *Meta) DeleteDoneFile(path string) error

func (*Meta) DeleteFile

func (m *Meta) DeleteFile() string

DeleteFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFile

func (m *Meta) DoneFile() string

DoneFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFilePath

func (m *Meta) DoneFilePath() string

DoneFilePath 返回meta的filedone文件的存放目录

func (*Meta) GetDataSourceTag

func (m *Meta) GetDataSourceTag() string

func (*Meta) GetDoneFiles

func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)

func (*Meta) GetEncodingWay

func (m *Meta) GetEncodingWay() (e string)

GetEncodingWay 获取文件编码方式

func (*Meta) GetMode

func (m *Meta) GetMode() string

func (*Meta) IsDoneFile

func (m *Meta) IsDoneFile(file string) bool

IsDoneFile 返回是否是Donefile格式的文件

func (*Meta) IsExist

func (m *Meta) IsExist() bool

func (*Meta) IsNotExist

func (m *Meta) IsNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsNotValid

func (m *Meta) IsNotValid() bool

IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏

func (*Meta) IsValid

func (m *Meta) IsValid() bool

func (*Meta) LogPath

func (m *Meta) LogPath() string

func (*Meta) MetaFile

func (m *Meta) MetaFile() string

MetaFile 返回metaFileoffset 的meta文件地址

func (*Meta) ReadBuf

func (m *Meta) ReadBuf(buf []byte) (n int, err error)

func (*Meta) ReadBufMeta

func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)

func (*Meta) ReadCacheLine

func (m *Meta) ReadCacheLine() ([]byte, error)

func (*Meta) ReadOffset

func (m *Meta) ReadOffset() (currFile string, offset int64, err error)

ReadOffset 读取当前读取的文件和offset

func (*Meta) SetEncodingWay

func (m *Meta) SetEncodingWay(e string)

SetEncodingWay 设置文件编码方式,默认为 utf-8

func (*Meta) WriteBuf

func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)

func (*Meta) WriteCacheLine

func (m *Meta) WriteCacheLine(lines string) error

func (*Meta) WriteOffset

func (m *Meta) WriteOffset(currFile string, offset int64) (err error)

WriteOffset 将当前文件和offset写入meta中

type MongoReader

type MongoReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewMongoReader

func NewMongoReader(meta *Meta, readBatch int, host, database, collection, offsetkey, cronSched, filters, certfile string, execOnStart bool) (mr *MongoReader, err error)

func (*MongoReader) Close

func (mr *MongoReader) Close() (err error)

func (*MongoReader) Name

func (mr *MongoReader) Name() string

func (*MongoReader) ReadLine

func (mr *MongoReader) ReadLine() (data string, err error)

func (*MongoReader) SetMode

func (mr *MongoReader) SetMode(mode string, v interface{}) error

func (*MongoReader) Source

func (mr *MongoReader) Source() string

func (*MongoReader) Start

func (mr *MongoReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*MongoReader) SyncMeta

func (mr *MongoReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type MultiReader

type MultiReader struct {
	// contains filtered or unexported fields
}

func NewMultiReader

func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)

func (*MultiReader) Close

func (mr *MultiReader) Close() (err error)

func (*MultiReader) Expire

func (mr *MultiReader) Expire()

Expire 函数关闭过期的文件,再更新

func (*MultiReader) Name

func (mr *MultiReader) Name() string

func (*MultiReader) ReadLine

func (mr *MultiReader) ReadLine() (data string, err error)

func (*MultiReader) SetMode

func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)

func (*MultiReader) Source

func (mr *MultiReader) Source() string

func (*MultiReader) Start

func (mr *MultiReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务

func (*MultiReader) StatLogPath

func (mr *MultiReader) StatLogPath()

func (*MultiReader) SyncMeta

func (mr *MultiReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Reader

type Reader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	ReadLine() (string, error)
	SetMode(mode string, v interface{}) error
	Close() error
	SyncMeta()
}

Reader 是一个通用的行读取reader接口

func NewFileBufReader

func NewFileBufReader(conf conf.MapConf) (reader Reader, err error)

NewFileReader 创建FileReader

func NewFileBufReaderWithMeta

func NewFileBufReaderWithMeta(conf conf.MapConf, meta *Meta) (reader Reader, err error)

type SeqFile

type SeqFile struct {
	// contains filtered or unexported fields
}

SeqFile 按最终修改时间依次读取文件的Reader类型

func NewSeqFile

func NewSeqFile(meta *Meta, path string, ignoreHidden bool, suffixes []string, validFileRegex, whence string) (sf *SeqFile, err error)

func (*SeqFile) Close

func (sf *SeqFile) Close() (err error)

func (*SeqFile) Name

func (sf *SeqFile) Name() string

func (*SeqFile) Read

func (sf *SeqFile) Read(p []byte) (n int, err error)

func (*SeqFile) Source

func (sf *SeqFile) Source() string

func (*SeqFile) SyncMeta

func (sf *SeqFile) SyncMeta() (err error)

type SingleFile

type SingleFile struct {
	// contains filtered or unexported fields
}

func NewSingleFile

func NewSingleFile(meta *Meta, path, whence string) (sf *SingleFile, err error)

func (*SingleFile) Close

func (sf *SingleFile) Close() (err error)

func (*SingleFile) Name

func (sf *SingleFile) Name() string

func (*SingleFile) Read

func (sf *SingleFile) Read(p []byte) (n int, err error)

func (*SingleFile) Reopen

func (sf *SingleFile) Reopen() (err error)

func (*SingleFile) Source

func (sf *SingleFile) Source() string

func (*SingleFile) SyncMeta

func (sf *SingleFile) SyncMeta() error

type SqlReader

type SqlReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewSQLReader

func NewSQLReader(meta *Meta, readBatch int, dbtype, dataSource, database, rawSqls, cronSchedule, offsetKey string, execOnStart bool) (mr *SqlReader, err error)

func (*SqlReader) Close

func (mr *SqlReader) Close() (err error)

func (*SqlReader) Name

func (mr *SqlReader) Name() string

func (*SqlReader) ReadLine

func (mr *SqlReader) ReadLine() (data string, err error)

func (*SqlReader) SetMode

func (mr *SqlReader) SetMode(mode string, v interface{}) error

func (*SqlReader) Source

func (mr *SqlReader) Source() string

func (*SqlReader) Start

func (mr *SqlReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*SqlReader) SyncMeta

func (mr *SqlReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL