Documentation ¶
Index ¶
- Constants
- Variables
- func CreateCronTask(ctx context.Context, executorID task.TaskCode, ...) error
- func DefaultContext() context.Context
- func GetOptionFactory(engine string) func(db, tbl, account string) TableOptions
- func InitCronExpr(duration time.Duration) error
- func InitMerge(mergeCycle time.Duration, filesize int) error
- func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error
- func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
- func Register(name batchpipe.HasName, impl batchpipe.PipeImpl[batchpipe.HasName, any])
- func ResetGlobalBatchProcessor()
- func SetDefaultContextFunc(f getContextFunc)
- func SetGlobalBatchProcessor(p BatchProcessor)
- func SetPathBuilder(pathBuilder string) error
- func String2Bytes(s string) (ret []byte)
- type AccountDatePathBuilder
- func (b *AccountDatePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
- func (b *AccountDatePathBuilder) BuildETLPath(db, name, account string) string
- func (b *AccountDatePathBuilder) GetName() string
- func (b *AccountDatePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string
- func (b *AccountDatePathBuilder) NewMergeFilename(timestampStart, timestampEnd string) string
- func (b *AccountDatePathBuilder) ParsePath(path string) (CSVPath, error)
- func (b *AccountDatePathBuilder) SupportAccountStrategy() bool
- func (b *AccountDatePathBuilder) SupportMergeSplit() bool
- type BatchProcessor
- type CSVPath
- type CSVReader
- type CSVWriter
- type Cache
- type Column
- type ContentReader
- type ContentWriter
- type CsvOptions
- type CsvTableOptions
- type DBTablePathBuilder
- func (m *DBTablePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
- func (m *DBTablePathBuilder) BuildETLPath(db, name, account string) string
- func (m *DBTablePathBuilder) GetName() string
- func (m *DBTablePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string
- func (m *DBTablePathBuilder) NewMergeFilename(timestampStart, timestampEnd string) string
- func (m *DBTablePathBuilder) ParsePath(path string) (CSVPath, error)
- func (m *DBTablePathBuilder) SupportAccountStrategy() bool
- func (m *DBTablePathBuilder) SupportMergeSplit() bool
- type FSWriter
- type FSWriterFactory
- type FSWriterOption
- func WithAccount(a string) FSWriterOption
- func WithDatabase(dir string) FSWriterOption
- func WithFilePath(filepath string) FSWriterOption
- func WithName(item batchpipe.HasName) FSWriterOption
- func WithNode(uuid, nodeType string) FSWriterOption
- func WithPathBuilder(builder PathBuilder) FSWriterOption
- func WithTimestamp(ts time.Time) FSWriterOption
- type MOCollector
- type MapCache
- type Merge
- type MergeLogType
- type MergeOption
- type MetricLogPath
- type NoopTableOptions
- type PathBuilder
- type Row
- func (r *Row) GetAccount() string
- func (r *Row) ParseRow(cols []string) error
- func (r *Row) PrimaryKey() string
- func (r *Row) Reset()
- func (r *Row) SetColumnVal(col Column, val string)
- func (r *Row) SetFloat64(col string, val float64)
- func (r *Row) SetInt64(col string, val int64)
- func (r *Row) SetVal(col string, val string)
- func (r *Row) Size() (size int64)
- func (r *Row) ToRawStrings() []string
- func (r *Row) ToStrings() []string
- type SliceCache
- type Table
- func (tbl *Table) Clone() *Table
- func (tbl *Table) GetDatabase() string
- func (tbl *Table) GetIdentify() string
- func (tbl *Table) GetName() string
- func (tbl *Table) GetRow() *Row
- func (tbl *Table) GetTableOptions() TableOptions
- func (tbl *Table) NewRowCache() Cache
- func (tbl *Table) ToCreateSql(ifNotExists bool) string
- type TableOptions
- type View
- type ViewOption
- type ViewSingleCondition
- type WhereCondition
Constants ¶
const AccountAll = ETLParamAccountAll
const BatchReadRows = 4000
BatchReadRows ~= 20MB rawlog file has about 3700+ rows
const CsvExtension = ".csv"
const ETLParamAccountAll = "*"
const ETLParamTypeAll = MergeLogTypeALL
const FilenameElems = 3
const FilenameIdxType = 2
const FilenameSeparator = "_"
const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
const MergeTaskCronExprYesterday = "0 5 0 * * *"
const MergeTaskToday = "today"
const MergeTaskYesterday = "yesterday"
const ParamSeparator = " "
const PathElems = 7
const PathIdxAccount = 0
const PathIdxFilename = 6
const PathIdxTable = 5
const READONLY = 1
const READWRITE = 0
Variables ¶
var CommonCsvOptions = &CsvOptions{
FieldTerminator: ',',
EncloseRune: '"',
Terminator: '\n',
}
var ETLParamTSAll = time.Time{}
var ExternalTableEngine = "EXTERNAL"
var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour
MergeTaskCronExpr support sec level
var NormalTableEngine = "TABLE"
Functions ¶
func CreateCronTask ¶
func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error
func DefaultContext ¶
func GetOptionFactory ¶
func GetOptionFactory(engine string) func(db, tbl, account string) TableOptions
func InitCronExpr ¶
InitCronExpr support min interval 5 min, max 12 hour
func MergeTaskMetadata ¶
func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"
func ResetGlobalBatchProcessor ¶
func ResetGlobalBatchProcessor()
func SetDefaultContextFunc ¶
func SetDefaultContextFunc(f getContextFunc)
func SetGlobalBatchProcessor ¶
func SetGlobalBatchProcessor(p BatchProcessor)
func SetPathBuilder ¶
func String2Bytes ¶
Types ¶
type AccountDatePathBuilder ¶
type AccountDatePathBuilder struct{}
func NewAccountDatePathBuilder ¶
func NewAccountDatePathBuilder() *AccountDatePathBuilder
func (*AccountDatePathBuilder) Build ¶
func (b *AccountDatePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
func (*AccountDatePathBuilder) BuildETLPath ¶
func (b *AccountDatePathBuilder) BuildETLPath(db, name, account string) string
BuildETLPath implement PathBuilder
# account | typ | ts | table | filename like: * /* /*/*/* /metric /*.csv
func (*AccountDatePathBuilder) GetName ¶
func (b *AccountDatePathBuilder) GetName() string
func (*AccountDatePathBuilder) NewLogFilename ¶
func (b *AccountDatePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string
func (*AccountDatePathBuilder) NewMergeFilename ¶
func (b *AccountDatePathBuilder) NewMergeFilename(timestampStart, timestampEnd string) string
func (*AccountDatePathBuilder) ParsePath ¶
func (b *AccountDatePathBuilder) ParsePath(path string) (CSVPath, error)
func (*AccountDatePathBuilder) SupportAccountStrategy ¶
func (b *AccountDatePathBuilder) SupportAccountStrategy() bool
func (*AccountDatePathBuilder) SupportMergeSplit ¶
func (b *AccountDatePathBuilder) SupportMergeSplit() bool
type BatchProcessor ¶
type BatchProcessor interface { Collect(context.Context, batchpipe.HasName) error Start() bool Stop(graceful bool) error }
func GetGlobalBatchProcessor ¶
func GetGlobalBatchProcessor() BatchProcessor
type CSVReader ¶
func NewCSVReader ¶
func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (CSVReader, error)
type CSVWriter ¶
type CSVWriter interface { // WriteStrings write record as one line into csv file WriteStrings(record []string) error // FlushAndClose flush its buffer and close. FlushAndClose() error }
func NewCSVWriter ¶
func NewCSVWriter(ctx context.Context, fs fileservice.FileService, path string, buf []byte) (CSVWriter, error)
type Column ¶
type Column struct { Name string Type string Default string Comment string Alias string // only use in view }
func (*Column) ToCreateSql ¶
ToCreateSql return column scheme in create sql
- case 1: `column_name` varchar(36) DEFAULT "def_val" COMMENT "what am I, with default."
- case 2: `column_name` varchar(36) NOT NULL COMMENT "what am I. Without default, SO NOT NULL."
type ContentReader ¶
type ContentReader struct {
// contains filtered or unexported fields
}
func NewContentReader ¶
func NewContentReader(ctx context.Context, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader
func (*ContentReader) Close ¶
func (s *ContentReader) Close()
func (*ContentReader) ReadLine ¶
func (s *ContentReader) ReadLine() ([]string, error)
type ContentWriter ¶
type ContentWriter struct {
// contains filtered or unexported fields
}
func NewContentWriter ¶
func NewContentWriter(writer io.StringWriter, buffer []byte) *ContentWriter
func (*ContentWriter) FlushAndClose ¶
func (w *ContentWriter) FlushAndClose() error
func (*ContentWriter) WriteStrings ¶
func (w *ContentWriter) WriteStrings(record []string) error
type CsvOptions ¶
type CsvTableOptions ¶
func (*CsvTableOptions) FormatDdl ¶
func (o *CsvTableOptions) FormatDdl(ddl string) string
func (*CsvTableOptions) GetCreateOptions ¶
func (o *CsvTableOptions) GetCreateOptions() string
func (*CsvTableOptions) GetTableOptions ¶
func (o *CsvTableOptions) GetTableOptions(builder PathBuilder) string
type DBTablePathBuilder ¶
type DBTablePathBuilder struct{}
func NewDBTablePathBuilder ¶
func NewDBTablePathBuilder() *DBTablePathBuilder
func (*DBTablePathBuilder) Build ¶
func (m *DBTablePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
func (*DBTablePathBuilder) BuildETLPath ¶
func (m *DBTablePathBuilder) BuildETLPath(db, name, account string) string
BuildETLPath implement PathBuilder
like: system/metric_*.csv
func (*DBTablePathBuilder) GetName ¶
func (m *DBTablePathBuilder) GetName() string
func (*DBTablePathBuilder) NewLogFilename ¶
func (m *DBTablePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string
func (*DBTablePathBuilder) NewMergeFilename ¶
func (m *DBTablePathBuilder) NewMergeFilename(timestampStart, timestampEnd string) string
func (*DBTablePathBuilder) ParsePath ¶
func (m *DBTablePathBuilder) ParsePath(path string) (CSVPath, error)
func (*DBTablePathBuilder) SupportAccountStrategy ¶
func (m *DBTablePathBuilder) SupportAccountStrategy() bool
func (*DBTablePathBuilder) SupportMergeSplit ¶
func (m *DBTablePathBuilder) SupportMergeSplit() bool
type FSWriter ¶
type FSWriter struct {
// contains filtered or unexported fields
}
func NewFSWriter ¶
func NewFSWriter(ctx context.Context, fs fileservice.FileService, opts ...FSWriterOption) *FSWriter
type FSWriterFactory ¶
type FSWriterFactory func(ctx context.Context, db string, name batchpipe.HasName, options ...FSWriterOption) io.StringWriter
func GetFSWriterFactory ¶
func GetFSWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string) FSWriterFactory
type FSWriterOption ¶
type FSWriterOption func(*FSWriter)
func WithAccount ¶
func WithAccount(a string) FSWriterOption
func WithDatabase ¶
func WithDatabase(dir string) FSWriterOption
func WithFilePath ¶
func WithFilePath(filepath string) FSWriterOption
func WithName ¶
func WithName(item batchpipe.HasName) FSWriterOption
func WithNode ¶
func WithNode(uuid, nodeType string) FSWriterOption
func WithPathBuilder ¶
func WithPathBuilder(builder PathBuilder) FSWriterOption
func WithTimestamp ¶
func WithTimestamp(ts time.Time) FSWriterOption
func (FSWriterOption) Apply ¶
func (f FSWriterOption) Apply(w *FSWriter)
type MOCollector ¶
type MOCollector struct {
// contains filtered or unexported fields
}
MOCollector handle all bufferPipe
func NewMOCollector ¶
func NewMOCollector() *MOCollector
func (*MOCollector) Start ¶
func (c *MOCollector) Start() bool
Start all goroutine worker, including collector, generator, and exporter
func (*MOCollector) Stop ¶
func (c *MOCollector) Stop(graceful bool) error
type Merge ¶
type Merge struct { Table *Table // WithTable FS fileservice.FileService // WithFileService FSName string // WithFileServiceName, cooperate with FS // MaxFileSize 控制合并后最大文件大小, default: 128 MB MaxFileSize int64 // WithMaxFileSize // MaxMergeJobs 允许进行的Merge的任务个数,default: 16 MaxMergeJobs int64 // WithMaxMergeJobs // MinFilesMerge 控制Merge最少合并文件个数,default:2 // // Deprecated: useless in Merge all in one file MinFilesMerge int // WithMinFilesMerge // FileCacheSize 控制Merge 过程中, 允许缓存的文件大小,default: 32 MB FileCacheSize int64 // contains filtered or unexported fields }
Merge like a compaction, merge input files into one/two/... files.
- `NewMergeService` init merge as service, with param `serviceInited` to avoid multi init.
- `MergeTaskExecutorFactory` drive by Cron TaskService.
- `NewMerge` handle merge obj init.
- `Merge::Start` as service loop, trigger `Merge::Main` each cycle
- `Merge::Main` handle handle job, 1. foreach account, build `rootPath` with tuple {account, date, Table } 2. call `Merge::doMergeFiles` with all files in `rootPath`, do merge job
- `Merge::doMergeFiles` handle one job flow: read each file, merge in cache, write into file.
func NewMergeService ¶
func NewMergeService(ctx context.Context, opts ...MergeOption) (*Merge, bool)
type MergeLogType ¶
type MergeLogType string
const MergeLogTypeALL MergeLogType = "*"
const MergeLogTypeLogs MergeLogType = "logs"
const MergeLogTypeMerged MergeLogType = "merged"
func (MergeLogType) String ¶
func (t MergeLogType) String() string
type MergeOption ¶
type MergeOption func(*Merge)
func WithFileService ¶
func WithFileService(fs fileservice.FileService) MergeOption
func WithFileServiceName ¶
func WithFileServiceName(name string) MergeOption
func WithMaxFileSize ¶
func WithMaxFileSize(filesize int64) MergeOption
func WithMaxMergeJobs ¶
func WithMaxMergeJobs(jobs int64) MergeOption
func WithMinFilesMerge ¶
func WithMinFilesMerge(files int) MergeOption
func WithTable ¶
func WithTable(tbl *Table) MergeOption
func (MergeOption) Apply ¶
func (opt MergeOption) Apply(m *Merge)
type MetricLogPath ¶
type MetricLogPath struct {
// contains filtered or unexported fields
}
func NewMetricLogPath ¶
func NewMetricLogPath(path string) *MetricLogPath
NewMetricLogPath
path like: sys/[log|merged]/yyyy/mm/dd/table/***.csv ## idx: 0 1 2 3 4 5 6 filename like: {timestamp}_{node_uuid}_{node_type}.csv ## or: {timestamp_start}_{timestamp_end}_merged.csv
func (*MetricLogPath) Parse ¶
func (p *MetricLogPath) Parse() error
func (*MetricLogPath) Table ¶
func (p *MetricLogPath) Table() string
func (*MetricLogPath) Timestamp ¶
func (p *MetricLogPath) Timestamp() []string
type NoopTableOptions ¶
type NoopTableOptions struct{}
func (NoopTableOptions) FormatDdl ¶
func (o NoopTableOptions) FormatDdl(ddl string) string
func (NoopTableOptions) GetCreateOptions ¶
func (o NoopTableOptions) GetCreateOptions() string
func (NoopTableOptions) GetTableOptions ¶
func (o NoopTableOptions) GetTableOptions(PathBuilder) string
type PathBuilder ¶
type PathBuilder interface { // Build directory path Build(account string, typ MergeLogType, ts time.Time, db string, name string) string // BuildETLPath return path for EXTERNAL table 'infile' options // // like: {account}/merged/*/*/*/{name}/*.csv BuildETLPath(db, name, account string) string // ParsePath // // switch path { // case "{timestamp_writedown}_{node_uuid}_{ndoe_type}.csv": // case "{timestamp_start}_{timestamp_end}_merged.csv" // } ParsePath(path string) (CSVPath, error) NewMergeFilename(timestampStart, timestampEnd string) string NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string // SupportMergeSplit const. if false, not support SCV merge|split task SupportMergeSplit() bool // SupportAccountStrategy const SupportAccountStrategy() bool // GetName const GetName() string }
PathBuilder hold strategy to build filepath
func PathBuilderFactory ¶
func PathBuilderFactory(pathBuilder string) PathBuilder
type Row ¶
func (*Row) GetAccount ¶
GetAccount return r.Columns[r.AccountIdx] if r.AccountIdx >= 0 and r.Table.PathBuilder.SupportAccountStrategy, else return "sys"
func (*Row) PrimaryKey ¶
func (*Row) SetColumnVal ¶
func (*Row) SetFloat64 ¶
type SliceCache ¶
type SliceCache struct {
// contains filtered or unexported fields
}
func (*SliceCache) Flush ¶
func (c *SliceCache) Flush(writer CSVWriter) error
func (*SliceCache) IsEmpty ¶
func (c *SliceCache) IsEmpty() bool
func (*SliceCache) Put ¶
func (c *SliceCache) Put(r *Row)
func (*SliceCache) Reset ¶
func (c *SliceCache) Reset()
func (*SliceCache) Size ¶
func (c *SliceCache) Size() int64
type Table ¶
type Table struct { Account string Database string Table string Columns []Column PrimaryKeyColumn []Column Engine string Comment string // PathBuilder help to desc param 'infile' PathBuilder PathBuilder // AccountColumn help to split data in account's filepath AccountColumn *Column // TableOptions default is nil, see GetTableOptions TableOptions TableOptions // SupportUserAccess default false. if true, user account can access. SupportUserAccess bool }
func GetAllTable ¶
func GetAllTable() []*Table
func RegisterTableDefine ¶
RegisterTableDefine return old one, if already registered
func (*Table) GetDatabase ¶
func (*Table) GetIdentify ¶
func (*Table) GetTableOptions ¶
func (tbl *Table) GetTableOptions() TableOptions
func (*Table) NewRowCache ¶
func (*Table) ToCreateSql ¶
type TableOptions ¶
type TableOptions interface { FormatDdl(ddl string) string // GetCreateOptions return option for `create {option}table`, which should end with ' ' GetCreateOptions() string GetTableOptions(PathBuilder) string }
type View ¶
type View struct { Database string Table string OriginTable *Table Columns []Column Condition WhereCondition // SupportUserAccess default false. if true, user account can access. SupportUserAccess bool }
func (*View) ToCreateSql ¶
type ViewOption ¶
type ViewOption func(view *View)
func SupportUserAccess ¶
func SupportUserAccess(support bool) ViewOption
func WithColumn ¶
func WithColumn(c Column) ViewOption
func (ViewOption) Apply ¶
func (opt ViewOption) Apply(view *View)
type ViewSingleCondition ¶
func (*ViewSingleCondition) String ¶
func (tbl *ViewSingleCondition) String() string
type WhereCondition ¶
type WhereCondition interface {
String() string
}