Documentation ¶
Index ¶
- Constants
- Variables
- func AndCondition(f1, f2 func(os.FileInfo) bool) func(os.FileInfo) bool
- func CompressedFile(path string) bool
- func GetLogPathAbs(conf conf.MapConf) (logpath, oldLogPath string, err error)
- func GetMaxFile(logdir string, condition func(os.FileInfo) bool, ...) (chosen os.FileInfo, err error)
- func GetMetaOption(conf conf.MapConf) (string, string, string, error)
- func GetMinFile(logdir string, condition func(os.FileInfo) bool, ...) (os.FileInfo, error)
- func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)
- func IgnoreFileSuffixes(file string, suffixes []string) bool
- func IgnoreHidden(file string, ignoreHidden bool) bool
- func InRunTime(hour, minute int, runTime RunTime) bool
- func JoinFileInode(filename, inode string) string
- func NoCondition(f os.FileInfo) bool
- func NotCondition(f1 func(os.FileInfo) bool) func(os.FileInfo) bool
- func OrCondition(f1, f2 func(os.FileInfo) bool) func(os.FileInfo) bool
- func ParseLoopDuration(cronSched string) (dur time.Duration, err error)
- func ParseNumber(str string) (number int, err error)
- func ParseTime(timeStr string) (hour, minute int, err error)
- func RegisterConstructor(typ string, c Constructor)
- func ValidFileRegex(file, validFilePattern string) bool
- type Constructor
- type DaemonReader
- type DataReader
- type FileReader
- type LagReader
- type Meta
- func (m *Meta) AddSubMeta(key string, meta *Meta) error
- func (m *Meta) AppendDeleteFile(path string) (err error)
- func (m *Meta) AppendDoneFile(path string) (err error)
- func (m *Meta) AppendDoneFileInode(path string, inode uint64) (err error)
- func (m *Meta) BufFile() string
- func (m *Meta) BufMetaFile() string
- func (m *Meta) CacheLineFile() string
- func (m *Meta) CheckExpiredSubMetas(expire time.Duration)
- func (m *Meta) CleanExpiredSubMetas(expire time.Duration)
- func (m *Meta) Clear() error
- func (m *Meta) Delete() error
- func (m *Meta) DeleteDoneFile(path string) error
- func (m *Meta) DeleteFile() string
- func (m *Meta) DoneFile() string
- func (m *Meta) ExtraInfo() map[string]string
- func (m *Meta) FtSaveLogPath() string
- func (m *Meta) GetDataSourceTag() string
- func (m *Meta) GetDoneFileContent() ([]string, error)
- func (m *Meta) GetDoneFileInode(inodeSensitive bool) map[string]bool
- func (m *Meta) GetDoneFiles() ([]File, error)
- func (m *Meta) GetEncodeTag() string
- func (m *Meta) GetEncodingWay() (e string)
- func (m *Meta) GetMode() string
- func (m *Meta) GetTagFile() string
- func (m *Meta) GetTags() map[string]interface{}
- func (m *Meta) IsDoneFile(file string) bool
- func (m *Meta) IsExist() bool
- func (m *Meta) IsFileMode() bool
- func (m *Meta) IsNotExist() bool
- func (m *Meta) IsNotValid() bool
- func (m *Meta) IsStatisticFileExist() bool
- func (m *Meta) IsStatisticFileNotExist() bool
- func (m *Meta) IsValid() bool
- func (m *Meta) LogPath() string
- func (m *Meta) MetaFile() string
- func (m *Meta) ReadBuf(buf []byte) (n int, err error)
- func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)
- func (m *Meta) ReadCacheLine() ([]byte, error)
- func (m *Meta) ReadDBDoneFile(database string) (content []string, err error)
- func (m *Meta) ReadOffset() (currFile string, offset int64, err error)
- func (m *Meta) ReadRecordsFile(recordsFile string) ([]string, error)
- func (m *Meta) ReadStatistic() (stat Statistic, err error)
- func (m *Meta) RemoveSubMeta(key string)
- func (m *Meta) Reset() error
- func (m *Meta) SetEncodingWay(e string)
- func (m *Meta) StatisticFile() string
- func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)
- func (m *Meta) WriteCacheLine(lines string) error
- func (m *Meta) WriteOffset(currFile string, offset int64) (err error)
- func (m *Meta) WriteStatistic(stat *Statistic) error
- type NewSourceRecorder
- type OnceReader
- type Reader
- type Registry
- type RunTime
- type RunTimeReader
- type SourceIndex
- type Statistic
- type StatsReader
Constants ¶
const ( DoneFileName = "file.done" FtSaveLogPath = "ft_log" // ft log 在 meta 中的文件夹名字 )
const ( DefautFileRetention = 7 ModeMetrics = "metrics" )
Variables ¶
var ErrFileNotDir = errors.New("file is not directory")
var ErrFileNotRegular = errors.New("file is not regular")
var ErrMetaFileRead = errors.New("cannot read meta file")
var ErrNoFileChosen = errors.New("no files found")
var ErrStopped = errors.New("runner stopped")
var WaitNoSuchFile = 100 * time.Millisecond
Functions ¶
func CompressedFile ¶
func GetMaxFile ¶
func GetMaxFile(logdir string, condition func(os.FileInfo) bool, gte func(f1, f2 os.FileInfo) bool) (chosen os.FileInfo, err error)
GetMaxFile 在指定的限制条件condition下,根据比较函数gte 选择最大的os.FileInfo condition 文件必须满足的条件 gte f1 >= f2 则返回true
func GetMinFile ¶
func GetMinFile(logdir string, condition func(os.FileInfo) bool, gte func(f1, f2 os.FileInfo) bool) (os.FileInfo, error)
GetMinFile 于getMaxFile 相反,返回最小的文件
func HeadPatternMode ¶
func IgnoreFileSuffixes ¶
IgnoreFileSuffixes return true if file has suffix of one of the suffixes
func IgnoreHidden ¶
IgnoreHidden return ture if file has dot(.) which presents ignore files in *nix system
func JoinFileInode ¶
func ParseNumber ¶
func RegisterConstructor ¶
func RegisterConstructor(typ string, c Constructor)
RegisterConstructor adds a new constructor for a given type of reader.
func ValidFileRegex ¶
ValidFileRegex return true if file matches with validFilePattern
Types ¶
type DaemonReader ¶
type DaemonReader interface { // Start 用于非阻塞的启动读取器对应的守护线程,需要读取器自行负责其生命周期 Start() error }
DaemonReader 代表了一个需要守护线程的读取器
type DataReader ¶
DataReader 代表了一个可直接读取内存数据结构的读取器
type FileReader ¶
type FileReader interface { Name() string Source() string Read(p []byte) (n int, err error) Close() error SyncMeta() error }
FileReader reader 接口方法
type Meta ¶
type Meta struct { Dir string // 记录文件处理进度的路径 DoneFilePath string // 记录扫描过文件记录的文件 TagFile string //记录tag文件路径的标签名称 Readlimit int //读取磁盘限速单位 MB/s RunnerName string LastKey string // 记录从s3 最近一次拉取的文件 // contains filtered or unexported fields }
func NewMetaWithRunnerName ¶
func (*Meta) AppendDeleteFile ¶
func (*Meta) AppendDoneFile ¶
AppendDoneFile 将处理完的文件写入doneFile中
func (*Meta) AppendDoneFileInode ¶
AppendDoneFileInode 将处理完的文件路径、inode以及完成时间写入doneFile中
func (*Meta) CacheLineFile ¶
func (*Meta) CheckExpiredSubMetas ¶
CheckExpiredSubMetas 仅用于轮询收集所有过期的 submeta,清理操作应通过调用 CleanExpiredSubMetas 方法完成。 一般情况下,应由 reader 实现启动 goroutine 单独调用以避免 submeta 数量过多导致进程被长时间阻塞。 另外,如果 submeta 没有存放在该 meta 的子目录则调用此方法无效
func (*Meta) CleanExpiredSubMetas ¶
CleanExpiredSubMetas 清除超过指定过期时长的 submeta 目录,清理数目单次调用存在上限以减少阻塞时间
func (*Meta) DeleteDoneFile ¶
func (*Meta) FtSaveLogPath ¶
FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径
func (*Meta) GetDataSourceTag ¶
func (*Meta) GetDoneFileContent ¶
func (*Meta) GetDoneFileInode ¶
func (*Meta) GetDoneFiles ¶
func (*Meta) GetEncodeTag ¶
func (*Meta) GetTagFile ¶
func (*Meta) IsFileMode ¶
func (*Meta) IsNotValid ¶
IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏
func (*Meta) IsStatisticFileExist ¶
func (*Meta) IsStatisticFileNotExist ¶
IsNotExist meta 不存在,用来判断是第一次创建
func (*Meta) ReadBufMeta ¶
func (*Meta) ReadCacheLine ¶
func (*Meta) ReadDBDoneFile ¶
ReadDBDoneFile 读取当前Database已经读取的表
func (*Meta) ReadOffset ¶
ReadOffset 读取当前读取的文件和offset
func (*Meta) ReadRecordsFile ¶
ReadRecordsFile 读取当前runner已经读取的表
func (*Meta) ReadStatistic ¶
func (*Meta) RemoveSubMeta ¶
func (*Meta) SetEncodingWay ¶
SetEncodingWay 设置文件编码方式,默认为 utf-8
func (*Meta) StatisticFile ¶
StatisticFile 返回 Runner 统计信息的文件路径
func (*Meta) WriteCacheLine ¶
func (*Meta) WriteOffset ¶
WriteOffset 将当前文件和offset写入meta中
func (*Meta) WriteStatistic ¶
type NewSourceRecorder ¶
type NewSourceRecorder interface {
NewSourceIndex() []SourceIndex
}
type OnceReader ¶
type OnceReader interface {
ReadDone() bool
}
type Reader ¶
type Reader interface { // Name 用于返回读取器的具体名称 Name() string // SetMode 用于设置读取器的匹配模式 SetMode(mode string, v interface{}) error // Source 用于返回当前读取的数据源 Source() string // ReadLine 用于向读取器请求返回一行数据 ReadLine() (string, error) // SyncMeta 用于通知读取器保存同步相关元数据 SyncMeta() // Close 用于关闭读取器 Close() error }
Reader 代表了一个通用的行读取器
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry reader 的工厂类。可以注册自定义reader
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) NewReaderWithMeta ¶
func (*Registry) RegisterReader ¶
func (reg *Registry) RegisterReader(readerType string, constructor Constructor) error
type RunTime ¶
func ParseRunTime ¶
func ParseRunTimeWithMode ¶
func (RunTime) GreaterThanStart ¶
func (RunTime) LessThanEnd ¶
type RunTimeReader ¶
type SourceIndex ¶
type Statistic ¶
type Statistic struct { ReaderCnt int64 `json:"reader_count"` // 读取总条数 ParserCnt [2]int64 `json:"parser_connt"` // [解析成功, 解析失败] SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败] TransCnt map[string][2]int64 `json:"transform_count"` // [解析成功, 解析失败] ReadErrors ErrorStatistic `json:"read_errors"` ParseErrors ErrorStatistic `json:"parse_errors"` TransformErrors map[string]ErrorStatistic `json:"transform_errors"` SendErrors map[string]ErrorStatistic `json:"send_errors"` }
type StatsReader ¶
type StatsReader interface { //Name reader名称 Name() string Status() StatsInfo }
StatsReader 是一个通用的带有统计接口的reader
Directories ¶
Path | Synopsis |
---|---|
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
|
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O. |
Package builtin does nothing but import all builtin readers to execute their init functions.
|
Package builtin does nothing but import all builtin readers to execute their init functions. |