Documentation ¶
Index ¶
- Constants
- Variables
- func CreateCronTask(ctx context.Context, executorID task.TaskCode, ...) error
- func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, enableSqlWriter bool) table.WriterFactory
- func InitCronExpr(ctx context.Context, duration time.Duration) error
- func InitMerge(ctx context.Context, SV *config.ObservabilityParameters) error
- func LongRunETLMerge(ctx context.Context, task task.Task, logger *log.MOLogger, opts ...MergeOption) error
- func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error
- func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
- func SubStringPrefixLimit(str string, length int) string
- type Cache
- type ContentReader
- type ETLReader
- type ETLWriter
- type FileMeta
- type MOCollector
- func (c *MOCollector) Collect(ctx context.Context, item batchpipe.HasName) error
- func (c *MOCollector) DiscardableCollect(ctx context.Context, item batchpipe.HasName) error
- func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)
- func (c *MOCollector) Start() bool
- func (c *MOCollector) Stop(graceful bool) error
- type MOCollectorOption
- type Merge
- type MergeOption
- type PipeImplHolder
- type SliceCache
Constants ¶
const BatchReadRows = 4000
BatchReadRows ~= 20MB rawlog file has about 3700+ rows
const LoggerNameContentReader = "ETLContentReader"
const LoggerNameETLMerge = "ETLMerge"
const LoggerNameMOCollector = "MOCollector"
const MAX_MERGE_INSERT_TIME = 10 * time.Second
const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
const MergeTaskCronExprEveryMin = "0 * * * * *"
const MergeTaskCronExprYesterday = "0 5 0 * * *"
const MergeTaskToday = "today"
const MergeTaskYesterday = "yesterday"
const ParamSeparator = " "
Variables ¶
var ETLMergeTaskPool *mpool.MPool
var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour
MergeTaskCronExpr support sec level
Functions ¶
func CreateCronTask ¶
func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error
func GetWriterFactory ¶ added in v0.7.0
func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, enableSqlWriter bool) table.WriterFactory
func InitCronExpr ¶
InitCronExpr support min interval 5 min, max 12 hour
func LongRunETLMerge ¶ added in v0.8.0
func MergeTaskMetadata ¶
func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"
func SubStringPrefixLimit ¶ added in v0.7.0
Types ¶
type ContentReader ¶
type ContentReader struct {
// contains filtered or unexported fields
}
func NewContentReader ¶
func NewContentReader(ctx context.Context, path string, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader
func (*ContentReader) Close ¶
func (s *ContentReader) Close()
func (*ContentReader) ReadLine ¶
func (s *ContentReader) ReadLine() ([]string, error)
type ETLReader ¶ added in v0.7.0
type ETLReader interface { // ReadRow read one line as table.Row ReadRow(row *table.Row) error // ReadLine read raw data from file. ReadLine() ([]string, error) // Close files and release all content. Close() }
func NewCSVReader ¶
func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (ETLReader, error)
NewCSVReader create new csv reader. success case return: ok_reader, nil error failed case return: nil_reader, error
type ETLWriter ¶ added in v0.7.0
type ETLWriter interface { // WriteRow write table.Row as one line info file. WriteRow(row *table.Row) error // WriteStrings write record as one line into file. WriteStrings(record []string) error // FlushAndClose flush its buffer and close the writer. FlushAndClose() (int, error) }
ETLWriter handle serialize logic, like csv file and tae file.
type MOCollector ¶
type MOCollector struct { motrace.BatchProcessor // contains filtered or unexported fields }
MOCollector handle all bufferPipe
func NewMOCollector ¶
func NewMOCollector(ctx context.Context, opts ...MOCollectorOption) *MOCollector
func (*MOCollector) DiscardableCollect ¶ added in v1.0.0
DiscardableCollect implements motrace.DiscardableCollector cooperate with logutil.Discardable() field
func (*MOCollector) Register ¶ added in v0.7.0
func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)
func (*MOCollector) Start ¶
func (c *MOCollector) Start() bool
Start all goroutine worker, including collector, generator, and exporter
func (*MOCollector) Stop ¶
func (c *MOCollector) Stop(graceful bool) error
type MOCollectorOption ¶ added in v0.7.0
type MOCollectorOption func(*MOCollector)
func WithCollectorCnt ¶ added in v0.7.0
func WithCollectorCnt(cnt int) MOCollectorOption
func WithExporterCnt ¶ added in v0.7.0
func WithExporterCnt(cnt int) MOCollectorOption
func WithGeneratorCnt ¶ added in v0.7.0
func WithGeneratorCnt(cnt int) MOCollectorOption
func WithOBCollectorConfig ¶ added in v0.8.0
func WithOBCollectorConfig(cfg *config.OBCollectorConfig) MOCollectorOption
type Merge ¶
type Merge struct { // MaxFileSize the total filesize to trigger doMergeFiles(),default: 32 MB // Deprecated MaxFileSize int64 // set by WithMaxFileSize // MaxMergeJobs 允许进行的 Merge 的任务个数,default: 1 MaxMergeJobs int64 // set by WithMaxMergeJobs // contains filtered or unexported fields }
Merge like a compaction, merge input files into one/two/... files. - NewMergeService init merge as service, with serviceInited to avoid multi init. - MergeTaskExecutorFactory drive by Cron TaskService. - NewMerge handle merge obj init. - Merge.Start() as service loop, trigger Merge.Main() - Merge.Main() handle main job.
- foreach account, build `rootPath` with tuple {account, date, Table }
- call Merge.doMergeFiles() with all files in `rootPath`, do merge job
- Merge.doMergeFiles handle one job flow: read each file, merge in cache, write into file.
func NewMergeService ¶
type MergeOption ¶
type MergeOption func(*Merge)
func WithFileService ¶
func WithFileService(fs fileservice.FileService) MergeOption
func WithMaxFileSize ¶
func WithMaxFileSize(filesize int64) MergeOption
func WithMaxMergeJobs ¶
func WithMaxMergeJobs(jobs int64) MergeOption
func WithTable ¶
func WithTable(tbl *table.Table) MergeOption
func WithTask ¶ added in v1.0.0
func WithTask(task task.Task) MergeOption
func (MergeOption) Apply ¶
func (opt MergeOption) Apply(m *Merge)
type PipeImplHolder ¶ added in v0.7.0
type PipeImplHolder struct {
// contains filtered or unexported fields
}
func (*PipeImplHolder) Get ¶ added in v0.7.0
func (h *PipeImplHolder) Get(name string) (motrace.PipeImpl, bool)
func (*PipeImplHolder) Put ¶ added in v0.7.0
func (h *PipeImplHolder) Put(name string, impl motrace.PipeImpl) bool
func (*PipeImplHolder) Size ¶ added in v0.7.0
func (h *PipeImplHolder) Size() int
type SliceCache ¶
type SliceCache struct {
// contains filtered or unexported fields
}
func (*SliceCache) IsEmpty ¶
func (c *SliceCache) IsEmpty() bool
func (*SliceCache) Put ¶
func (c *SliceCache) Put(r *table.Row)
func (*SliceCache) Reset ¶
func (c *SliceCache) Reset()
func (*SliceCache) Size ¶
func (c *SliceCache) Size() int64