table

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const AccountAll = ETLParamAccountAll
View Source
const AccountSys = "sys"
View Source
const CsvExtension = ".csv"
View Source
const ETLParamAccountAll = "*"
View Source
const ETLParamTypeAll = MergeLogTypeALL
View Source
const ExternalFilePath = "__mo_filepath"
View Source
const FSExtension = ".fs"
View Source
const FilenameElems = 3
View Source
const FilenameElemsV2 = 4
View Source
const FilenameIdxType = 2
View Source
const FilenameSeparator = "_"
View Source
const PathElems = 7
View Source
const PathIdxAccount = 0
View Source
const PathIdxFilename = 6
View Source
const PathIdxTable = 5
View Source
const TaeExtension = ".tae"

Variables

View Source
var CommonCsvOptions = &CsvOptions{
	FieldTerminator: ',',
	EncloseRune:     '"',
	Terminator:      '\n',
}
View Source
var ETLParamTSAll = time.Time{}
View Source
var ExternalTableEngine = "EXTERNAL"

ExternalTableEngine Deprecated

View Source
var NSecString = func() string {
	timeMu.Lock()
	nsec := time.Now().Nanosecond()
	timeMu.Unlock()
	return fmt.Sprintf("%09d", nsec)
}
View Source
var NormalTableEngine = "TABLE"
View Source
var ZeroTime = time.Time{}

Functions

func GetExtension

func GetExtension(ext string) string

func GetOptionFactory

func GetOptionFactory(ctx context.Context, engine string) func(db string, tbl string, account string) TableOptions

func SetPathBuilder

func SetPathBuilder(ctx context.Context, pathBuilder string) error

SetPathBuilder

Deprecated. Please init static

func Time2DatetimeBuffed added in v0.8.0

func Time2DatetimeBuffed(t time.Time, buf []byte) []byte

Time2DatetimeBuffed output datetime string to buffer len(buf) should >= max(64, len(timestampFormatter) + 10)

func Time2DatetimeString

func Time2DatetimeString(t time.Time) string

Types

type AccountDatePathBuilder

type AccountDatePathBuilder struct {
	PathBuilderConfig
}

func NewAccountDatePathBuilder

func NewAccountDatePathBuilder(opts ...PathBuilderOption) *AccountDatePathBuilder

func (*AccountDatePathBuilder) Build

func (b *AccountDatePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, tblName string) string

func (*AccountDatePathBuilder) BuildETLPath

func (b *AccountDatePathBuilder) BuildETLPath(db, name, account string) string

BuildETLPath implement PathBuilder

# account | typ | ts | table | filename like: * /* /*/*/* /metric /*.csv

func (*AccountDatePathBuilder) GetName

func (b *AccountDatePathBuilder) GetName() string

func (*AccountDatePathBuilder) NewLogFilename

func (b *AccountDatePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time, extension string) string

func (*AccountDatePathBuilder) NewMergeFilename

func (b *AccountDatePathBuilder) NewMergeFilename(timestampStart, timestampEnd, extension string) string

func (*AccountDatePathBuilder) ParsePath

func (b *AccountDatePathBuilder) ParsePath(ctx context.Context, path string) (Path, error)

func (*AccountDatePathBuilder) SupportAccountStrategy

func (b *AccountDatePathBuilder) SupportAccountStrategy() bool

func (*AccountDatePathBuilder) SupportMergeSplit

func (b *AccountDatePathBuilder) SupportMergeSplit() bool

type AfterWrite added in v0.8.0

type AfterWrite interface {
	AddAfter(CheckWriteHook)
}

AfterWrite cooperate with RowWriter

type CheckWriteHook added in v0.8.0

type CheckWriteHook func(context.Context)

type ColType

type ColType int
const (
	TSkip ColType = iota
	TDatetime
	TUint32
	TInt32
	TUint64
	TInt64
	TFloat64
	TJson
	TText
	TVarchar
	TChar
	TBool
	TBytes // only used in ColumnField
	TUuid  // only used in ColumnField
)

func (*ColType) String

func (c *ColType) String(scale int) string

func (*ColType) ToType

func (c *ColType) ToType() types.Type

type Column

type Column struct {
	Name    string
	ColType ColType
	// Scale default 0, usually for varchar
	Scale   int
	Default string
	Comment string
	Alias   string // only use in view
}

func BoolColumn added in v1.0.0

func BoolColumn(name, comment string) Column

func DatetimeColumn

func DatetimeColumn(name, comment string) Column

func Int32Column added in v1.0.0

func Int32Column(name, comment string) Column

func Int64Column

func Int64Column(name, comment string) Column

func JsonColumn

func JsonColumn(name, comment string) Column

func SpanIDStringColumn

func SpanIDStringColumn(name, comment string) Column

func StringColumn

func StringColumn(name, comment string) Column

func StringDefaultColumn

func StringDefaultColumn(name, defaultVal, comment string) Column

func StringWithScale added in v0.8.0

func StringWithScale(name string, scale int, comment string) Column

func TextColumn

func TextColumn(name, comment string) Column

func TextDefaultColumn added in v0.8.0

func TextDefaultColumn(name, defaultVal, comment string) Column

func UInt32Column added in v1.0.0

func UInt32Column(name, comment string) Column

func UInt64Column

func UInt64Column(name, comment string) Column

func UuidStringColumn

func UuidStringColumn(name, comment string) Column

func ValueColumn

func ValueColumn(name, comment string) Column

func (*Column) ToCreateSql

func (col *Column) ToCreateSql(ctx context.Context) string

ToCreateSql return column scheme in create sql

  • case 1: `column_name` varchar(36) DEFAULT "def_val" COMMENT "what am I, with default."
  • case 2: `column_name` varchar(36) NOT NULL COMMENT "what am I. Without default, SO NOT NULL."

type ColumnField added in v0.8.0

type ColumnField struct {
	Type      ColType
	Integer   int64
	String    string
	Bytes     []byte
	Interface interface{}
}

func BytesField added in v0.8.0

func BytesField(val []byte) ColumnField

func Float64Field added in v0.8.0

func Float64Field(val float64) ColumnField

func Int64Field added in v0.8.0

func Int64Field(val int64) ColumnField

func JsonField added in v0.8.0

func JsonField(val string) ColumnField

JsonField will have same effect as StringField

func StringField added in v0.8.0

func StringField(val string) ColumnField

func TimeField added in v0.8.0

func TimeField(val time.Time) ColumnField

func Uint64Field added in v0.8.0

func Uint64Field(val uint64) ColumnField

func UuidField added in v0.8.0

func UuidField(val []byte) ColumnField

func (*ColumnField) EncodeBytes added in v0.8.0

func (cf *ColumnField) EncodeBytes() string

func (*ColumnField) EncodeUuid added in v0.8.0

func (cf *ColumnField) EncodeUuid() (dst [36]byte)

func (*ColumnField) EncodedDatetime added in v0.8.0

func (cf *ColumnField) EncodedDatetime(dst []byte) []byte

func (*ColumnField) GetFloat64 added in v0.8.0

func (cf *ColumnField) GetFloat64() float64

GetFloat64 return float64 which store in Integer

func (*ColumnField) GetTime added in v0.8.0

func (cf *ColumnField) GetTime() time.Time

type CreateSql added in v1.0.0

type CreateSql interface {
	String(ctx context.Context, ifNotExists bool) string
}

type CsvOptions

type CsvOptions struct {
	FieldTerminator rune // like: ','
	EncloseRune     rune // like: '"'
	Terminator      rune // like: '\n'
}

type CsvTableOptions

type CsvTableOptions struct {
	Formatter string
	DbName    string
	TblName   string
	Account   string
}

func (*CsvTableOptions) FormatDdl

func (o *CsvTableOptions) FormatDdl(ddl string) string

func (*CsvTableOptions) GetCreateOptions

func (o *CsvTableOptions) GetCreateOptions() string

func (*CsvTableOptions) GetTableOptions

func (o *CsvTableOptions) GetTableOptions(builder PathBuilder) string

type DBTablePathBuilder

type DBTablePathBuilder struct{}

func NewDBTablePathBuilder

func NewDBTablePathBuilder() *DBTablePathBuilder

func (*DBTablePathBuilder) Build

func (m *DBTablePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string

func (*DBTablePathBuilder) BuildETLPath

func (m *DBTablePathBuilder) BuildETLPath(db, name, account string) string

BuildETLPath implement PathBuilder

like: system/metric_*.csv

func (*DBTablePathBuilder) GetName

func (m *DBTablePathBuilder) GetName() string

func (*DBTablePathBuilder) NewLogFilename

func (m *DBTablePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time, extension string) string

func (*DBTablePathBuilder) NewMergeFilename

func (m *DBTablePathBuilder) NewMergeFilename(timestampStart, timestampEnd, extension string) string

func (*DBTablePathBuilder) ParsePath

func (m *DBTablePathBuilder) ParsePath(ctx context.Context, path string) (Path, error)

func (*DBTablePathBuilder) SupportAccountStrategy

func (m *DBTablePathBuilder) SupportAccountStrategy() bool

func (*DBTablePathBuilder) SupportMergeSplit

func (m *DBTablePathBuilder) SupportMergeSplit() bool

type ETLPath

type ETLPath struct {
	// contains filtered or unexported fields
}

func NewETLPath

func NewETLPath(path string) *ETLPath

NewETLPath

path like: sys/[log|merged]/yyyy/mm/dd/table/***.csv ## idx: 0 1 2 3 4 5 6 filename like: {timestamp}_{node_uuid}_{node_type}.csv ## or: {timestamp_start}_{timestamp_end}_merged.csv

func (*ETLPath) Parse

func (p *ETLPath) Parse(ctx context.Context) error

func (*ETLPath) Table

func (p *ETLPath) Table() string

func (*ETLPath) Timestamp

func (p *ETLPath) Timestamp() []string

type ExportRequests

type ExportRequests []WriteRequest

type FilePathCfg

type FilePathCfg struct {
	NodeUUID  string
	NodeType  string
	Extension string
}

func (*FilePathCfg) LogsFilePathFactory

func (c *FilePathCfg) LogsFilePathFactory(account string, tbl *Table, ts time.Time) string

type MergeLogType

type MergeLogType string
const MergeLogTypeALL MergeLogType = "*"
const MergeLogTypeLogs MergeLogType = "logs"
const MergeLogTypeMerged MergeLogType = "merged"

func (MergeLogType) String

func (t MergeLogType) String() string

type NeedCheckWrite added in v0.8.0

type NeedCheckWrite interface {
	NeedCheckWrite() bool
	GetCheckWriteHook() CheckWriteHook
}

NeedCheckWrite cooperate with AfterWrite and RowField

type NeedSyncWrite added in v0.8.0

type NeedSyncWrite interface {
	NeedSyncWrite() bool
	GetCheckWriteHook() CheckWriteHook
}

type NoopTableOptions

type NoopTableOptions struct{}

func (NoopTableOptions) FormatDdl

func (o NoopTableOptions) FormatDdl(ddl string) string

func (NoopTableOptions) GetCreateOptions

func (o NoopTableOptions) GetCreateOptions() string

func (NoopTableOptions) GetTableOptions

func (o NoopTableOptions) GetTableOptions(PathBuilder) string

type Path

type Path interface {
	Table() string
	Timestamp() []string
}

type PathBuilder

type PathBuilder interface {
	// Build directory path
	Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
	// BuildETLPath return path for EXTERNAL table 'infile' options
	//
	// like: {account}/merged/*/*/*/{name}/*.csv
	BuildETLPath(db, name, account string) string
	// ParsePath
	//
	// switch path {
	// case "{timestamp_writedown}_{node_uuid}_{ndoe_type}.csv":
	// case "{timestamp_start}_{timestamp_end}_merged.csv"
	// }
	ParsePath(ctx context.Context, path string) (Path, error)
	NewMergeFilename(timestampStart, timestampEnd, extension string) string
	NewLogFilename(name, nodeUUID, nodeType string, ts time.Time, extension string) string
	// SupportMergeSplit const. if false, not support SCV merge|split task
	SupportMergeSplit() bool
	// SupportAccountStrategy const
	SupportAccountStrategy() bool
	// GetName const
	GetName() string
}

PathBuilder hold strategy to build filepath

func PathBuilderFactory

func PathBuilderFactory(pathBuilder string) PathBuilder

type PathBuilderConfig

type PathBuilderConfig struct {
	// contains filtered or unexported fields
}

type PathBuilderOption

type PathBuilderOption func(*PathBuilderConfig)

func WithDatabase

func WithDatabase(with bool) PathBuilderOption

func (PathBuilderOption) Apply

func (opt PathBuilderOption) Apply(cfg *PathBuilderConfig)

type Row

type Row struct {
	Table *Table

	Columns    []ColumnField
	CsvColumns []string
}

func NewRow

func NewRow() *Row

func (*Row) Clone

func (r *Row) Clone() *Row

func (*Row) CsvPrimaryKey

func (r *Row) CsvPrimaryKey() string

CsvPrimaryKey return string = concat($CsvCol[PrimaryKeyColumnIdx], '-') Deprecated

func (*Row) Free

func (r *Row) Free()

func (*Row) GetAccount

func (r *Row) GetAccount() string

GetAccount return r.Table.Account if r.Table.SupportConstAccess else return r.Columns[r.AccountIdx] if r.AccountIdx >= 0 and r.Table.PathBuilder.SupportAccountStrategy, else return "sys"

func (*Row) GetCsvStrings

func (r *Row) GetCsvStrings() []string

GetCsvStrings not format

func (*Row) GetRawColumns added in v0.8.0

func (r *Row) GetRawColumns() []ColumnField

func (*Row) ParseRow

func (r *Row) ParseRow(cols []string) error

func (*Row) Reset

func (r *Row) Reset()

func (*Row) SetColumnVal

func (r *Row) SetColumnVal(col Column, cf ColumnField)

func (*Row) SetVal

func (r *Row) SetVal(col string, cf ColumnField)

func (*Row) Size

func (r *Row) Size() (size int64)

func (*Row) ToStrings

func (r *Row) ToStrings() []string

ToStrings output all column as string

type RowField

type RowField interface {
	GetTable() *Table
	FillRow(context.Context, *Row)
}

type RowRequest

type RowRequest struct {
	// contains filtered or unexported fields
}

func NewRowRequest

func NewRowRequest(writer RowWriter) *RowRequest

func (*RowRequest) GetContent

func (r *RowRequest) GetContent() string

func (*RowRequest) Handle

func (r *RowRequest) Handle() (int, error)

type RowWriter

type RowWriter interface {
	WriteRow(row *Row) error
	// GetContent get buffer content
	GetContent() string
	// FlushAndClose flush its buffer and close.
	FlushAndClose() (int, error)
}

type SchemaDiff added in v1.0.0

type SchemaDiff struct {
	AddedColumns []Column
	TableName    string
	DatabaseName string
}

type Table

type Table struct {
	Account          string
	Database         string
	Table            string
	Columns          []Column
	PrimaryKeyColumn []Column
	ClusterBy        []Column
	Engine           string
	Comment          string
	// PathBuilder help to desc param 'infile'
	PathBuilder PathBuilder
	// AccountColumn help to split data in account's filepath
	AccountColumn *Column
	// TimestampColumn help to purge data
	TimestampColumn *Column
	// TableOptions default is nil, see GetTableOptions
	TableOptions TableOptions
	// SupportUserAccess default false. if true, user account can access.
	SupportUserAccess bool
	// SupportConstAccess default false. if true, use Table.Account first
	SupportConstAccess bool

	// The original create table sql of the system table. If the system table is created by ddl,
	// If the system table was created by DDL, the original creation sql will be used when upgrading the new table
	// Note: ToCreateSql() converts a table object as a table creation statement based on its known basic properties
	CreateTableSql string

	// The original create view sql of the system view
	CreateViewSql string
	// contains filtered or unexported fields
}

func GetAllTables added in v1.0.0

func GetAllTables() []*Table

GetAllTables holds all tables' Definition which should be handled in ETLMerge

func RegisterTableDefine

func RegisterTableDefine(table *Table) *Table

RegisterTableDefine return old one, if already registered

func (*Table) Clone

func (tbl *Table) Clone() *Table

func (*Table) GetDatabase

func (tbl *Table) GetDatabase() string

func (*Table) GetIdentify

func (tbl *Table) GetIdentify() string

GetIdentify return identify like database.table

func (*Table) GetName

func (tbl *Table) GetName() string

func (*Table) GetRow

func (tbl *Table) GetRow(ctx context.Context) *Row

func (*Table) GetTableOptions

func (tbl *Table) GetTableOptions(ctx context.Context) TableOptions

func (*Table) ToCreateSql

func (tbl *Table) ToCreateSql(ctx context.Context, ifNotExists bool) string

type TableOptions

type TableOptions interface {
	FormatDdl(ddl string) string
	// GetCreateOptions return option for `create {option}table`, which should end with ' '
	GetCreateOptions() string
	GetTableOptions(PathBuilder) string
}

type View

type View struct {
	Database    string
	Table       string
	OriginTable *Table
	Columns     []Column
	// Condition will be used in View.ToCreateSql
	Condition WhereCondition
	// CreateSql will be used in View.ToCreateSql
	CreateSql CreateSql
	// SupportUserAccess default false. if true, user account can access.
	SupportUserAccess bool
}

func (*View) ToCreateSql

func (tbl *View) ToCreateSql(ctx context.Context, ifNotExists bool) string

ToCreateSql return create view sql. If tbl.CreateSql is not nil, return tbl.CreateSql.String(), Else return

type ViewCreateSqlString added in v1.0.0

type ViewCreateSqlString string

func (ViewCreateSqlString) String added in v1.0.0

func (s ViewCreateSqlString) String(ctx context.Context, ifNotExists bool) string

type ViewOption

type ViewOption func(view *View)

func SupportUserAccess

func SupportUserAccess(support bool) ViewOption

func WithColumn

func WithColumn(c Column) ViewOption

func (ViewOption) Apply

func (opt ViewOption) Apply(view *View)

type ViewSingleCondition

type ViewSingleCondition struct {
	Column Column
	Table  string
}

func (*ViewSingleCondition) String

func (tbl *ViewSingleCondition) String() string

type WhereCondition

type WhereCondition interface {
	String() string
}

type WriteRequest

type WriteRequest interface {
	Handle() (int, error)
	GetContent() string
}

type WriterFactory

type WriterFactory interface {
	GetRowWriter(ctx context.Context, account string, tbl *Table, ts time.Time) RowWriter
	GetWriter(ctx context.Context, filepath string) io.WriteCloser
}

func NewWriterFactoryGetter added in v0.8.0

func NewWriterFactoryGetter(
	rowWriterGetter func(ctx context.Context, account string, tbl *Table, ts time.Time) RowWriter,
	writerGetter func(ctx context.Context, filepath string) io.WriteCloser,
) WriterFactory

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL