Documentation ¶
Index ¶
- Constants
- Variables
- func GetExtension(ext string) string
- func GetOptionFactory(ctx context.Context, engine string) func(db string, tbl string, account string) TableOptions
- func SetPathBuilder(ctx context.Context, pathBuilder string) error
- func Time2DatetimeBuffed(t time.Time, buf []byte) []byte
- func Time2DatetimeString(t time.Time) string
- type AccountDatePathBuilder
- func (b *AccountDatePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, tblName string) string
- func (b *AccountDatePathBuilder) BuildETLPath(db, name, account string) string
- func (b *AccountDatePathBuilder) GetName() string
- func (b *AccountDatePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time, extension string) string
- func (b *AccountDatePathBuilder) NewMergeFilename(timestampStart, timestampEnd, extension string) string
- func (b *AccountDatePathBuilder) ParsePath(ctx context.Context, path string) (Path, error)
- func (b *AccountDatePathBuilder) SupportAccountStrategy() bool
- func (b *AccountDatePathBuilder) SupportMergeSplit() bool
- type AfterWrite
- type CheckWriteHook
- type ColType
- type Column
- func BoolColumn(name, comment string) Column
- func DatetimeColumn(name, comment string) Column
- func Int32Column(name, comment string) Column
- func Int64Column(name, comment string) Column
- func JsonColumn(name, comment string) Column
- func SpanIDStringColumn(name, comment string) Column
- func StringColumn(name, comment string) Column
- func StringDefaultColumn(name, defaultVal, comment string) Column
- func StringWithScale(name string, scale int, comment string) Column
- func TextColumn(name, comment string) Column
- func TextDefaultColumn(name, defaultVal, comment string) Column
- func TimestampDefaultColumn(name, defaultVal, comment string) Column
- func UInt32Column(name, comment string) Column
- func UInt64Column(name, comment string) Column
- func UuidStringColumn(name, comment string) Column
- func ValueColumn(name, comment string) Column
- type ColumnField
- func BytesField(val []byte) ColumnField
- func Float64Field(val float64) ColumnField
- func Int64Field(val int64) ColumnField
- func JsonField(val string) ColumnField
- func StringField(val string) ColumnField
- func TimeField(val time.Time) ColumnField
- func Uint64Field(val uint64) ColumnField
- func UuidField(val []byte) ColumnField
- type CreateSql
- type CsvOptions
- type CsvTableOptions
- type DBTablePathBuilder
- func (m *DBTablePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
- func (m *DBTablePathBuilder) BuildETLPath(db, name, account string) string
- func (m *DBTablePathBuilder) GetName() string
- func (m *DBTablePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time, extension string) string
- func (m *DBTablePathBuilder) NewMergeFilename(timestampStart, timestampEnd, extension string) string
- func (m *DBTablePathBuilder) ParsePath(ctx context.Context, path string) (Path, error)
- func (m *DBTablePathBuilder) SupportAccountStrategy() bool
- func (m *DBTablePathBuilder) SupportMergeSplit() bool
- type ETLPath
- type ExportRequests
- type FilePathCfg
- type MergeLogType
- type NeedCheckWrite
- type NeedSyncWrite
- type NoopTableOptions
- type Path
- type PathBuilder
- type PathBuilderConfig
- type PathBuilderOption
- type Row
- func (r *Row) Clone() *Row
- func (r *Row) CsvPrimaryKey() string
- func (r *Row) Free()
- func (r *Row) GetAccount() string
- func (r *Row) GetCsvStrings() []string
- func (r *Row) GetRawColumns() []ColumnField
- func (r *Row) ParseRow(cols []string) error
- func (r *Row) Reset()
- func (r *Row) SetColumnVal(col Column, cf ColumnField)
- func (r *Row) SetVal(col string, cf ColumnField)
- func (r *Row) Size() (size int64)
- func (r *Row) ToStrings() []string
- type RowField
- type RowRequest
- type RowWriter
- type SchemaDiff
- type Table
- func (tbl *Table) Clone() *Table
- func (tbl *Table) GetDatabase() string
- func (tbl *Table) GetIdentify() string
- func (tbl *Table) GetName() string
- func (tbl *Table) GetRow(ctx context.Context) *Row
- func (tbl *Table) GetTableOptions(ctx context.Context) TableOptions
- func (tbl *Table) ToCreateSql(ctx context.Context, ifNotExists bool) string
- type TableOptions
- type View
- type ViewCreateSqlString
- type ViewOption
- type ViewSingleCondition
- type WhereCondition
- type WriteRequest
- type WriterFactory
Constants ¶
const AccountAll = ETLParamAccountAll
const AccountSys = "sys"
const CsvExtension = ".csv"
const ETLParamAccountAll = "*"
const ETLParamTypeAll = MergeLogTypeALL
const ExternalFilePath = "__mo_filepath"
const FSExtension = ".fs"
const FilenameElems = 3
const FilenameElemsV2 = 4
const FilenameIdxType = 2
const FilenameSeparator = "_"
const PathElems = 7
const PathIdxAccount = 0
const PathIdxFilename = 6
const PathIdxTable = 5
const TaeExtension = ".tae"
Variables ¶
var CommonCsvOptions = &CsvOptions{
FieldTerminator: ',',
EncloseRune: '"',
Terminator: '\n',
}
var ETLParamTSAll = time.Time{}
var ExternalTableEngine = "EXTERNAL"
ExternalTableEngine Deprecated
var NSecString = func() string { timeMu.Lock() nsec := time.Now().Nanosecond() timeMu.Unlock() return fmt.Sprintf("%09d", nsec) }
var NormalTableEngine = "TABLE"
var ZeroTime = time.Time{}
Functions ¶
func GetExtension ¶
func GetOptionFactory ¶
func Time2DatetimeBuffed ¶ added in v0.8.0
Time2DatetimeBuffed output datetime string to buffer len(buf) should >= max(64, len(timestampFormatter) + 10)
func Time2DatetimeString ¶
Types ¶
type AccountDatePathBuilder ¶
type AccountDatePathBuilder struct {
PathBuilderConfig
}
func NewAccountDatePathBuilder ¶
func NewAccountDatePathBuilder(opts ...PathBuilderOption) *AccountDatePathBuilder
func (*AccountDatePathBuilder) Build ¶
func (b *AccountDatePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, tblName string) string
func (*AccountDatePathBuilder) BuildETLPath ¶
func (b *AccountDatePathBuilder) BuildETLPath(db, name, account string) string
BuildETLPath implement PathBuilder
# account | typ | ts | table | filename like: * /* /*/*/* /metric /*.csv
func (*AccountDatePathBuilder) GetName ¶
func (b *AccountDatePathBuilder) GetName() string
func (*AccountDatePathBuilder) NewLogFilename ¶
func (*AccountDatePathBuilder) NewMergeFilename ¶
func (b *AccountDatePathBuilder) NewMergeFilename(timestampStart, timestampEnd, extension string) string
func (*AccountDatePathBuilder) SupportAccountStrategy ¶
func (b *AccountDatePathBuilder) SupportAccountStrategy() bool
func (*AccountDatePathBuilder) SupportMergeSplit ¶
func (b *AccountDatePathBuilder) SupportMergeSplit() bool
type AfterWrite ¶ added in v0.8.0
type AfterWrite interface {
AddAfter(CheckWriteHook)
}
AfterWrite cooperate with RowWriter
type CheckWriteHook ¶ added in v0.8.0
type Column ¶
type Column struct { Name string ColType ColType // Scale default 0, usually for varchar Scale int Default string Comment string Alias string // only use in view }
func BoolColumn ¶ added in v1.0.0
func DatetimeColumn ¶
func Int32Column ¶ added in v1.0.0
func Int64Column ¶
func JsonColumn ¶
func SpanIDStringColumn ¶
func StringColumn ¶
func StringDefaultColumn ¶
func StringWithScale ¶ added in v0.8.0
func TextColumn ¶
func TextDefaultColumn ¶ added in v0.8.0
func TimestampDefaultColumn ¶ added in v1.2.0
func UInt32Column ¶ added in v1.0.0
func UInt64Column ¶
func UuidStringColumn ¶
func ValueColumn ¶
func (*Column) ToCreateSql ¶
ToCreateSql return column scheme in create sql
- case 1: `column_name` varchar(36) DEFAULT "def_val" COMMENT "what am I, with default."
- case 2: `column_name` varchar(36) NOT NULL COMMENT "what am I. Without default, SO NOT NULL."
type ColumnField ¶ added in v0.8.0
type ColumnField struct { Type ColType Integer int64 String string Bytes []byte Interface interface{} }
func BytesField ¶ added in v0.8.0
func BytesField(val []byte) ColumnField
func Float64Field ¶ added in v0.8.0
func Float64Field(val float64) ColumnField
func Int64Field ¶ added in v0.8.0
func Int64Field(val int64) ColumnField
func JsonField ¶ added in v0.8.0
func JsonField(val string) ColumnField
JsonField will have same effect as StringField
func StringField ¶ added in v0.8.0
func StringField(val string) ColumnField
func TimeField ¶ added in v0.8.0
func TimeField(val time.Time) ColumnField
func Uint64Field ¶ added in v0.8.0
func Uint64Field(val uint64) ColumnField
func UuidField ¶ added in v0.8.0
func UuidField(val []byte) ColumnField
func (*ColumnField) EncodeBytes ¶ added in v0.8.0
func (cf *ColumnField) EncodeBytes() string
func (*ColumnField) EncodeUuid ¶ added in v0.8.0
func (cf *ColumnField) EncodeUuid() (dst [36]byte)
func (*ColumnField) EncodedDatetime ¶ added in v0.8.0
func (cf *ColumnField) EncodedDatetime(dst []byte) []byte
func (*ColumnField) GetFloat64 ¶ added in v0.8.0
func (cf *ColumnField) GetFloat64() float64
GetFloat64 return float64 which store in Integer
func (*ColumnField) GetTime ¶ added in v0.8.0
func (cf *ColumnField) GetTime() time.Time
type CsvOptions ¶
type CsvTableOptions ¶
func (*CsvTableOptions) FormatDdl ¶
func (o *CsvTableOptions) FormatDdl(ddl string) string
func (*CsvTableOptions) GetCreateOptions ¶
func (o *CsvTableOptions) GetCreateOptions() string
func (*CsvTableOptions) GetTableOptions ¶
func (o *CsvTableOptions) GetTableOptions(builder PathBuilder) string
type DBTablePathBuilder ¶
type DBTablePathBuilder struct{}
func NewDBTablePathBuilder ¶
func NewDBTablePathBuilder() *DBTablePathBuilder
func (*DBTablePathBuilder) Build ¶
func (m *DBTablePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
func (*DBTablePathBuilder) BuildETLPath ¶
func (m *DBTablePathBuilder) BuildETLPath(db, name, account string) string
BuildETLPath implement PathBuilder
like: system/metric_*.csv
func (*DBTablePathBuilder) GetName ¶
func (m *DBTablePathBuilder) GetName() string
func (*DBTablePathBuilder) NewLogFilename ¶
func (*DBTablePathBuilder) NewMergeFilename ¶
func (m *DBTablePathBuilder) NewMergeFilename(timestampStart, timestampEnd, extension string) string
func (*DBTablePathBuilder) SupportAccountStrategy ¶
func (m *DBTablePathBuilder) SupportAccountStrategy() bool
func (*DBTablePathBuilder) SupportMergeSplit ¶
func (m *DBTablePathBuilder) SupportMergeSplit() bool
type ETLPath ¶
type ETLPath struct {
// contains filtered or unexported fields
}
func NewETLPath ¶
NewETLPath
path like: sys/[log|merged]/yyyy/mm/dd/table/***.csv ## idx: 0 1 2 3 4 5 6 filename like: {timestamp}_{node_uuid}_{node_type}.csv ## or: {timestamp_start}_{timestamp_end}_merged.csv
type ExportRequests ¶
type ExportRequests []WriteRequest
type FilePathCfg ¶
func (*FilePathCfg) LogsFilePathFactory ¶
type MergeLogType ¶
type MergeLogType string
const MergeLogTypeALL MergeLogType = "*"
const MergeLogTypeLogs MergeLogType = "logs"
const MergeLogTypeMerged MergeLogType = "merged"
func (MergeLogType) String ¶
func (t MergeLogType) String() string
type NeedCheckWrite ¶ added in v0.8.0
type NeedCheckWrite interface { NeedCheckWrite() bool GetCheckWriteHook() CheckWriteHook }
NeedCheckWrite cooperate with AfterWrite and RowField
type NeedSyncWrite ¶ added in v0.8.0
type NeedSyncWrite interface { NeedSyncWrite() bool GetCheckWriteHook() CheckWriteHook }
type NoopTableOptions ¶
type NoopTableOptions struct{}
func (NoopTableOptions) FormatDdl ¶
func (o NoopTableOptions) FormatDdl(ddl string) string
func (NoopTableOptions) GetCreateOptions ¶
func (o NoopTableOptions) GetCreateOptions() string
func (NoopTableOptions) GetTableOptions ¶
func (o NoopTableOptions) GetTableOptions(PathBuilder) string
type PathBuilder ¶
type PathBuilder interface { // Build directory path Build(account string, typ MergeLogType, ts time.Time, db string, name string) string // BuildETLPath return path for EXTERNAL table 'infile' options // // like: {account}/merged/*/*/*/{name}/*.csv BuildETLPath(db, name, account string) string // ParsePath // // switch path { // case "{timestamp_writedown}_{node_uuid}_{ndoe_type}.csv": // case "{timestamp_start}_{timestamp_end}_merged.csv" // } ParsePath(ctx context.Context, path string) (Path, error) NewMergeFilename(timestampStart, timestampEnd, extension string) string NewLogFilename(name, nodeUUID, nodeType string, ts time.Time, extension string) string // SupportMergeSplit const. if false, not support SCV merge|split task SupportMergeSplit() bool // SupportAccountStrategy const SupportAccountStrategy() bool // GetName const GetName() string }
PathBuilder hold strategy to build filepath
func PathBuilderFactory ¶
func PathBuilderFactory(pathBuilder string) PathBuilder
type PathBuilderConfig ¶
type PathBuilderConfig struct {
// contains filtered or unexported fields
}
type PathBuilderOption ¶
type PathBuilderOption func(*PathBuilderConfig)
func WithDatabase ¶
func WithDatabase(with bool) PathBuilderOption
func (PathBuilderOption) Apply ¶
func (opt PathBuilderOption) Apply(cfg *PathBuilderConfig)
type Row ¶
type Row struct { Table *Table Columns []ColumnField CsvColumns []string }
func (*Row) CsvPrimaryKey ¶
CsvPrimaryKey return string = concat($CsvCol[PrimaryKeyColumnIdx], '-') Deprecated
func (*Row) GetAccount ¶
GetAccount return r.Table.Account if r.Table.SupportConstAccess else return r.Columns[r.AccountIdx] if r.AccountIdx >= 0 and r.Table.PathBuilder.SupportAccountStrategy, else return "sys"
func (*Row) GetRawColumns ¶ added in v0.8.0
func (r *Row) GetRawColumns() []ColumnField
func (*Row) SetColumnVal ¶
func (r *Row) SetColumnVal(col Column, cf ColumnField)
func (*Row) SetVal ¶
func (r *Row) SetVal(col string, cf ColumnField)
type RowRequest ¶
type RowRequest struct {
// contains filtered or unexported fields
}
func NewRowRequest ¶
func NewRowRequest(writer RowWriter) *RowRequest
func (*RowRequest) GetContent ¶
func (r *RowRequest) GetContent() string
func (*RowRequest) Handle ¶
func (r *RowRequest) Handle() (int, error)
type SchemaDiff ¶ added in v1.0.0
type Table ¶
type Table struct { Account string Database string Table string Columns []Column PrimaryKeyColumn []Column ClusterBy []Column Engine string Comment string // PathBuilder help to desc param 'infile' PathBuilder PathBuilder // AccountColumn help to split data in account's filepath AccountColumn *Column // TimestampColumn help to purge data TimestampColumn *Column // TableOptions default is nil, see GetTableOptions TableOptions TableOptions // SupportUserAccess default false. if true, user account can access. SupportUserAccess bool // SupportConstAccess default false. if true, use Table.Account first SupportConstAccess bool // The original create table sql of the system table. If the system table is created by ddl, // If the system table was created by DDL, the original creation sql will be used when upgrading the new table // Note: ToCreateSql() converts a table object as a table creation statement based on its known basic properties CreateTableSql string // The original create view sql of the system view CreateViewSql string // contains filtered or unexported fields }
func GetAllTables ¶ added in v1.0.0
func GetAllTables() []*Table
GetAllTables holds all tables' Definition which should be handled in ETLMerge
func RegisterTableDefine ¶
RegisterTableDefine return old one, if already registered
func (*Table) GetDatabase ¶
func (*Table) GetIdentify ¶
GetIdentify return identify like database.table
func (*Table) GetTableOptions ¶
func (tbl *Table) GetTableOptions(ctx context.Context) TableOptions
type TableOptions ¶
type TableOptions interface { FormatDdl(ddl string) string // GetCreateOptions return option for `create {option}table`, which should end with ' ' GetCreateOptions() string GetTableOptions(PathBuilder) string }
type View ¶
type View struct { Database string Table string OriginTable *Table Columns []Column // Condition will be used in View.ToCreateSql Condition WhereCondition // CreateSql will be used in View.ToCreateSql CreateSql CreateSql // SupportUserAccess default false. if true, user account can access. SupportUserAccess bool }
type ViewCreateSqlString ¶ added in v1.0.0
type ViewCreateSqlString string
type ViewOption ¶
type ViewOption func(view *View)
func SupportUserAccess ¶
func SupportUserAccess(support bool) ViewOption
func WithColumn ¶
func WithColumn(c Column) ViewOption
func (ViewOption) Apply ¶
func (opt ViewOption) Apply(view *View)
type ViewSingleCondition ¶
func (*ViewSingleCondition) String ¶
func (tbl *ViewSingleCondition) String() string
type WhereCondition ¶
type WhereCondition interface {
String() string
}