Documentation ¶
Index ¶
- Constants
- func InitAllFrameworkModels(ctx context.Context, cc metaModel.ClientConn) error
- func InitEpochModel(ctx context.Context, cc metaModel.ClientConn) error
- func IsDuplicateEntryError(err error) bool
- func IsNotFoundError(err error) bool
- func NewGormDB(sqlDB *sql.DB, storeType metaModel.StoreType) (*gorm.DB, error)
- func NewOrmLogger(lg *zap.Logger, opts ...optionFunc) logger.Interface
- func WithIgnoreTraceRecordNotFoundErr() optionFunc
- func WithSlowThreshold(thres time.Duration) optionFunc
- type Client
- type DBConfig
- type ExecutorClient
- type JobClient
- type JobOpClient
- type ProjectClient
- type ProjectOperationClient
- type ResourceClient
- type ResourceKey
- type ResourceMeta
- type Result
- type TimeRange
- type WorkerClient
Constants ¶
const ( DefaultFrameMetaEndpoints = "127.0.0.1:3336" DefaultFrameMetaUser = "root" DefaultFrameMetaPassword = "" )
TODO: split the config file
Variables ¶
This section is empty.
Functions ¶
func InitAllFrameworkModels ¶
func InitAllFrameworkModels(ctx context.Context, cc metaModel.ClientConn) error
InitAllFrameworkModels will create all framework-related tables in SQL backend NOT thread-safe. TODO: What happen if we upgrade the definition of model when rolling update? TODO: need test: change column definition/add column/drop column?
func InitEpochModel ¶
func InitEpochModel(ctx context.Context, cc metaModel.ClientConn) error
InitEpochModel creates the backend logic epoch table if not exists Only use for business meta currently NOT thread-safe
func IsDuplicateEntryError ¶
IsDuplicateEntryError checks whether error contains DuplicateEntry(MySQL) error or UNIQUE constraint failed(SQLite) error underlying.
func IsNotFoundError ¶
IsNotFoundError checks whether the error is ErrMetaEntryNotFound TODO: refine me, need wrap error for api
func NewOrmLogger ¶
NewOrmLogger returns a logger which implements logger.Interface
func WithIgnoreTraceRecordNotFoundErr ¶
func WithIgnoreTraceRecordNotFoundErr() optionFunc
WithIgnoreTraceRecordNotFoundErr sets if ignore 'record not found' error for trace
func WithSlowThreshold ¶
WithSlowThreshold sets the slow log threshold for gorm log
Types ¶
type Client ¶
type Client interface { metaModel.Client // ProjectClient is the interface to operate project. ProjectClient // ProjectOperationClient is the client to operate project operation. ProjectOperationClient // JobClient is the interface to operate job info. JobClient // WorkerClient is the client to operate worker info. WorkerClient // ResourceClient is the interface to operate resource. ResourceClient // JobOpClient is the client to operate job operation. JobOpClient // ExecutorClient is the client to operate executor info. ExecutorClient }
Client defines an interface that has the ability to manage every kind of logic abstraction in metastore, including project, project op, job, worker and resource
type DBConfig ¶
type DBConfig struct { ReadTimeout string WriteTimeout string DialTimeout string ConnMaxIdleTime time.Duration ConnMaxLifeTime time.Duration MaxIdleConns int MaxOpenConns int }
DBConfig defines some configuration used in database connection refer to: https://pkg.go.dev/database/sql#SetConnMaxIdleTime
func NewDefaultDBConfig ¶
func NewDefaultDBConfig() DBConfig
NewDefaultDBConfig creates a default DBConfig
type ExecutorClient ¶
type ExecutorClient interface { CreateExecutor(ctx context.Context, executor *model.Executor) error UpdateExecutor(ctx context.Context, executor *model.Executor) error DeleteExecutor(ctx context.Context, executorID engineModel.ExecutorID) error QueryExecutors(ctx context.Context) ([]*model.Executor, error) }
ExecutorClient defines interface that manages executor information in metastore.
type JobClient ¶
type JobClient interface { InsertJob(ctx context.Context, job *frameModel.MasterMeta) error UpsertJob(ctx context.Context, job *frameModel.MasterMeta) error UpdateJob(ctx context.Context, jobID string, values model.KeyValueMap) error DeleteJob(ctx context.Context, jobID string) (Result, error) GetJobByID(ctx context.Context, jobID string) (*frameModel.MasterMeta, error) QueryJobs(ctx context.Context) ([]*frameModel.MasterMeta, error) QueryJobsByProjectID(ctx context.Context, projectID string) ([]*frameModel.MasterMeta, error) QueryJobsByState(ctx context.Context, jobID string, state int) ([]*frameModel.MasterMeta, error) }
JobClient defines interface that manages job in metastore
type JobOpClient ¶
type JobOpClient interface { SetJobNoop(ctx context.Context, jobID string) (Result, error) SetJobCanceling(ctx context.Context, JobID string) (Result, error) SetJobCanceled(ctx context.Context, jobID string) (Result, error) QueryJobOp(ctx context.Context, jobID string) (*model.JobOp, error) QueryJobOpsByStatus(ctx context.Context, op model.JobOpStatus) ([]*model.JobOp, error) }
JobOpClient defines interface that operates job status (upper logic oriented)
type ProjectClient ¶
type ProjectClient interface { CreateProject(ctx context.Context, project *model.ProjectInfo) error DeleteProject(ctx context.Context, projectID string) error QueryProjects(ctx context.Context) ([]*model.ProjectInfo, error) GetProjectByID(ctx context.Context, projectID string) (*model.ProjectInfo, error) }
ProjectClient defines interface that manages project in metastore
type ProjectOperationClient ¶
type ProjectOperationClient interface { CreateProjectOperation(ctx context.Context, op *model.ProjectOperation) error QueryProjectOperations(ctx context.Context, projectID string) ([]*model.ProjectOperation, error) QueryProjectOperationsByTimeRange(ctx context.Context, projectID string, tr TimeRange) ([]*model.ProjectOperation, error) }
ProjectOperationClient defines interface that manages project operation in metastore TODO: support pagination and cursor here support `order by time desc limit N`
type ResourceClient ¶
type ResourceClient interface { CreateResource(ctx context.Context, resource *ResourceMeta) error UpsertResource(ctx context.Context, resource *ResourceMeta) error UpdateResource(ctx context.Context, resource *ResourceMeta) error GetResourceByID(ctx context.Context, resourceKey ResourceKey) (*ResourceMeta, error) QueryResources(ctx context.Context) ([]*ResourceMeta, error) QueryResourcesByJobID(ctx context.Context, jobID string) ([]*ResourceMeta, error) QueryResourcesByExecutorIDs(ctx context.Context, executorID ...engineModel.ExecutorID) ([]*ResourceMeta, error) SetGCPendingByJobs(ctx context.Context, jobIDs ...engineModel.JobID) error GetOneResourceForGC(ctx context.Context) (*ResourceMeta, error) DeleteResource(ctx context.Context, resourceKey ResourceKey) (Result, error) DeleteResourcesByTypeAndExecutorIDs(ctx context.Context, resType resModel.ResourceType, executorID ...engineModel.ExecutorID) (Result, error) }
ResourceClient defines interface that manages resource in metastore
type ResourceKey ¶
type ResourceKey = resModel.ResourceKey
ResourceKey is the alias of resModel.ResourceKey
type ResourceMeta ¶
type ResourceMeta = resModel.ResourceMeta
ResourceMeta is the alias of resModel.ResourceMeta
type Result ¶
type Result interface {
RowsAffected() int64
}
Result defines a query result interface
type TimeRange ¶
type TimeRange struct {
// contains filtered or unexported fields
}
TimeRange defines a time range with [start, end] time
type WorkerClient ¶
type WorkerClient interface { UpsertWorker(ctx context.Context, worker *frameModel.WorkerStatus) error UpdateWorker(ctx context.Context, worker *frameModel.WorkerStatus) error DeleteWorker(ctx context.Context, masterID string, workerID string) (Result, error) GetWorkerByID(ctx context.Context, masterID string, workerID string) (*frameModel.WorkerStatus, error) QueryWorkersByMasterID(ctx context.Context, masterID string) ([]*frameModel.WorkerStatus, error) QueryWorkersByState(ctx context.Context, masterID string, state int) ([]*frameModel.WorkerStatus, error) }
WorkerClient defines interface that manages worker in metastore