Documentation ¶
Index ¶
- Constants
- Variables
- func CreateCronTask(ctx context.Context, executorID task.TaskCode, ...) error
- func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, ext string) (factory table.WriterFactory)
- func InitCronExpr(ctx context.Context, duration time.Duration) error
- func InitMerge(ctx context.Context, SV *config.ObservabilityParameters) error
- func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error
- func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
- func SubStringPrefixLimit(str string, length int) string
- type Cache
- type ContentReader
- type ContentWriter
- type ETLReader
- type ETLWriter
- type FileMeta
- type MOCollector
- type MOCollectorOption
- type MapCache
- type Merge
- type MergeOption
- type PipeImplHolder
- type SliceCache
Constants ¶
View Source
const BatchReadRows = 4000
BatchReadRows ~= 20MB rawlog file has about 3700+ rows
View Source
const LoggerNameContentReader = "ETLContentReader"
View Source
const LoggerNameETLMerge = "ETLMerge"
View Source
const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
View Source
const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
View Source
const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
View Source
const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
View Source
const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
View Source
const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
View Source
const MergeTaskCronExprYesterday = "0 5 0 * * *"
View Source
const MergeTaskToday = "today"
View Source
const MergeTaskYesterday = "yesterday"
View Source
const ParamSeparator = " "
Variables ¶
View Source
var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour
MergeTaskCronExpr support sec level
Functions ¶
func CreateCronTask ¶
func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error
func GetWriterFactory ¶ added in v0.7.0
func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, ext string) (factory table.WriterFactory)
func InitCronExpr ¶
InitCronExpr support min interval 5 min, max 12 hour
func MergeTaskMetadata ¶
func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"
func SubStringPrefixLimit ¶ added in v0.7.0
Types ¶
type ContentReader ¶
type ContentReader struct {
// contains filtered or unexported fields
}
func NewContentReader ¶
func NewContentReader(ctx context.Context, path string, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader
func (*ContentReader) Close ¶
func (s *ContentReader) Close()
func (*ContentReader) ReadLine ¶
func (s *ContentReader) ReadLine() ([]string, error)
type ContentWriter ¶
type ContentWriter struct {
// contains filtered or unexported fields
}
func NewContentWriter ¶
func NewContentWriter(writer io.StringWriter, buffer []byte) *ContentWriter
func (*ContentWriter) FlushAndClose ¶
func (w *ContentWriter) FlushAndClose() (int, error)
func (*ContentWriter) WriteRow ¶ added in v0.7.0
func (w *ContentWriter) WriteRow(row *table.Row) error
func (*ContentWriter) WriteStrings ¶
func (w *ContentWriter) WriteStrings(record []string) error
type ETLReader ¶ added in v0.7.0
type ETLReader interface { // ReadRow read one line as table.Row ReadRow(row *table.Row) error // ReadLine read raw data from file. ReadLine() ([]string, error) // Close files and release all content. Close() }
func NewCSVReader ¶
func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (ETLReader, error)
NewCSVReader create new csv reader. success case return: ok_reader, nil error failed case return: nil_reader, error
type ETLWriter ¶ added in v0.7.0
type ETLWriter interface { // WriteRow write table.Row as one line info file. WriteRow(row *table.Row) error // WriteStrings write record as one line into file. WriteStrings(record []string) error // FlushAndClose flush its buffer and close the writer. FlushAndClose() (int, error) }
ETLWriter handle serialize logic, like csv file and tae file.
type MOCollector ¶
type MOCollector struct { motrace.BatchProcessor // contains filtered or unexported fields }
MOCollector handle all bufferPipe
func NewMOCollector ¶
func NewMOCollector(ctx context.Context, opts ...MOCollectorOption) *MOCollector
func (*MOCollector) Register ¶ added in v0.7.0
func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)
func (*MOCollector) Start ¶
func (c *MOCollector) Start() bool
Start all goroutine worker, including collector, generator, and exporter
func (*MOCollector) Stop ¶
func (c *MOCollector) Stop(graceful bool) error
type MOCollectorOption ¶ added in v0.7.0
type MOCollectorOption func(*MOCollector)
func WithCollectorCnt ¶ added in v0.7.0
func WithCollectorCnt(cnt int) MOCollectorOption
func WithExporterCnt ¶ added in v0.7.0
func WithExporterCnt(cnt int) MOCollectorOption
func WithGeneratorCnt ¶ added in v0.7.0
func WithGeneratorCnt(cnt int) MOCollectorOption
type Merge ¶
type Merge struct { Table *table.Table // WithTable FS fileservice.FileService // WithFileService FSName string // WithFileServiceName, cooperate with FS // MaxFileSize 控制合并后最大文件大小,default: 128 MB MaxFileSize int64 // WithMaxFileSize // MaxMergeJobs 允许进行的 Merge 的任务个数,default: 16 MaxMergeJobs int64 // WithMaxMergeJobs // MinFilesMerge 控制 Merge 最少合并文件个数,default:2 // // Deprecated: useless in Merge all in one file MinFilesMerge int // WithMinFilesMerge // FileCacheSize 控制 Merge 过程中,允许缓存的文件大小,default: 32 MB FileCacheSize int64 // contains filtered or unexported fields }
Merge like a compaction, merge input files into one/two/... files.
- `NewMergeService` init merge as service, with param `serviceInited` to avoid multi init.
- `MergeTaskExecutorFactory` drive by Cron TaskService.
- `NewMerge` handle merge obj init.
- `Merge::Start` as service loop, trigger `Merge::Main` each cycle
- `Merge::Main` handle handle job, 1. foreach account, build `rootPath` with tuple {account, date, Table } 2. call `Merge::doMergeFiles` with all files in `rootPath`, do merge job
- `Merge::doMergeFiles` handle one job flow: read each file, merge in cache, write into file.
func NewMergeService ¶
type MergeOption ¶
type MergeOption func(*Merge)
func WithFileService ¶
func WithFileService(fs fileservice.FileService) MergeOption
func WithFileServiceName ¶
func WithFileServiceName(name string) MergeOption
func WithMaxFileSize ¶
func WithMaxFileSize(filesize int64) MergeOption
func WithMaxMergeJobs ¶
func WithMaxMergeJobs(jobs int64) MergeOption
func WithMinFilesMerge ¶
func WithMinFilesMerge(files int) MergeOption
func WithTable ¶
func WithTable(tbl *table.Table) MergeOption
func (MergeOption) Apply ¶
func (opt MergeOption) Apply(m *Merge)
type PipeImplHolder ¶ added in v0.7.0
type PipeImplHolder struct {
// contains filtered or unexported fields
}
func (*PipeImplHolder) Get ¶ added in v0.7.0
func (h *PipeImplHolder) Get(name string) (motrace.PipeImpl, bool)
func (*PipeImplHolder) Put ¶ added in v0.7.0
func (h *PipeImplHolder) Put(name string, impl motrace.PipeImpl) bool
func (*PipeImplHolder) Size ¶ added in v0.7.0
func (h *PipeImplHolder) Size() int
type SliceCache ¶
type SliceCache struct {
// contains filtered or unexported fields
}
func (*SliceCache) Flush ¶
func (c *SliceCache) Flush(writer ETLWriter) error
func (*SliceCache) IsEmpty ¶
func (c *SliceCache) IsEmpty() bool
func (*SliceCache) Put ¶
func (c *SliceCache) Put(r *table.Row)
func (*SliceCache) Reset ¶
func (c *SliceCache) Reset()
func (*SliceCache) Size ¶
func (c *SliceCache) Size() int64
Click to show internal directories.
Click to hide internal directories.