Documentation ¶
Index ¶
- func Map(vs []string, f func(string) string) []string
- func NullInt64ToPointer(ni sql.NullInt64) *int64
- func NullStringToPointer(ns sql.NullString) *string
- func PointerToNullInt64(ip *int64) sql.NullInt64
- func PointerToNullString(sp *string) sql.NullString
- type DB
- type DBStatusStore
- type DBStatusStoreInterface
- type DefaultExperimentStore
- type DefaultExperimentStoreInterface
- type ExperimentStore
- func (s *ExperimentStore) ArchiveExperiment(expId string) error
- func (s *ExperimentStore) CreateExperiment(experiment *model.Experiment) (*model.Experiment, error)
- func (s *ExperimentStore) DeleteExperiment(id string) error
- func (s *ExperimentStore) GetExperiment(uuid string) (*model.Experiment, error)
- func (s *ExperimentStore) ListExperiments(filterContext *common.FilterContext, opts *list.Options) ([]*model.Experiment, int, string, error)
- func (s *ExperimentStore) UnarchiveExperiment(expId string) error
- type ExperimentStoreInterface
- type FakeMinioClient
- func (c *FakeMinioClient) DeleteObject(bucketName, objectName string) error
- func (c *FakeMinioClient) ExistObject(objectName string) bool
- func (c *FakeMinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
- func (c *FakeMinioClient) GetObjectCount() int
- func (c *FakeMinioClient) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, ...) (n int64, err error)
- type JobStore
- func (s *JobStore) CreateJob(j *model.Job) (*model.Job, error)
- func (s *JobStore) DeleteJob(id string) error
- func (s *JobStore) EnableJob(id string, enabled bool) error
- func (s *JobStore) GetJob(id string) (*model.Job, error)
- func (s *JobStore) ListJobs(filterContext *common.FilterContext, opts *list.Options) ([]*model.Job, int, string, error)
- func (s *JobStore) UpdateJob(swf *util.ScheduledWorkflow) error
- type JobStoreInterface
- type MinioClient
- func (c *MinioClient) DeleteObject(bucketName, objectName string) error
- func (c *MinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
- func (c *MinioClient) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, ...) (n int64, err error)
- type MinioClientInterface
- type MinioObjectStore
- func (m *MinioObjectStore) AddAsYamlFile(o interface{}, filePath string) error
- func (m *MinioObjectStore) AddFile(file []byte, filePath string) error
- func (m *MinioObjectStore) DeleteFile(filePath string) error
- func (m *MinioObjectStore) GetFile(filePath string) ([]byte, error)
- func (m *MinioObjectStore) GetFromYamlFile(o interface{}, filePath string) error
- func (m *MinioObjectStore) GetPipelineKey(pipelineID string) string
- type MySQLDialect
- type ObjectStoreInterface
- type PipelineStore
- func (s *PipelineStore) CreatePipeline(p *model.Pipeline) (*model.Pipeline, error)
- func (s *PipelineStore) CreatePipelineVersion(v *model.PipelineVersion, updatePipelineDefaultVersion bool) (*model.PipelineVersion, error)
- func (s *PipelineStore) DeletePipeline(id string) error
- func (s *PipelineStore) DeletePipelineVersion(versionId string) error
- func (s *PipelineStore) GetPipeline(id string) (*model.Pipeline, error)
- func (s *PipelineStore) GetPipelineVersion(versionId string) (*model.PipelineVersion, error)
- func (s *PipelineStore) GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error)
- func (s *PipelineStore) GetPipelineWithStatus(id string, status model.PipelineStatus) (*model.Pipeline, error)
- func (s *PipelineStore) ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error)
- func (s *PipelineStore) ListPipelines(filterContext *common.FilterContext, opts *list.Options) ([]*model.Pipeline, int, string, error)
- func (s *PipelineStore) SetUUIDGenerator(new_uuid util.UUIDGeneratorInterface)
- func (s *PipelineStore) UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, ...) error
- func (s *PipelineStore) UpdatePipelineDefaultVersion(pipelineId string, versionId string) error
- func (s *PipelineStore) UpdatePipelineStatus(id string, status model.PipelineStatus) error
- func (s *PipelineStore) UpdatePipelineVersionStatus(id string, status model.PipelineVersionStatus) error
- type PipelineStoreInterface
- type ResourceReferenceStore
- func (s *ResourceReferenceStore) CreateResourceReferences(tx *sql.Tx, refs []*model.ResourceReference) error
- func (s *ResourceReferenceStore) DeleteResourceReferences(tx *sql.Tx, id string, resourceType common.ResourceType) error
- func (s *ResourceReferenceStore) GetResourceReference(resourceId string, resourceType common.ResourceType, ...) (*model.ResourceReference, error)
- type ResourceReferenceStoreInterface
- type RunStore
- func (s *RunStore) AddSortByRunMetricToSelect(sqlBuilder sq.SelectBuilder, opts *list.Options) sq.SelectBuilder
- func (s *RunStore) ArchiveRun(runId string) error
- func (s *RunStore) CreateOrUpdateRun(runDetail *model.RunDetail) error
- func (s *RunStore) CreateRun(r *model.RunDetail) (*model.RunDetail, error)
- func (s *RunStore) DeleteRun(id string) error
- func (s *RunStore) GetRun(runId string) (*model.RunDetail, error)
- func (s *RunStore) ListRuns(filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error)
- func (s *RunStore) ReportMetric(metric *model.RunMetric) (err error)
- func (s *RunStore) TerminateRun(runId string) error
- func (s *RunStore) UnarchiveRun(runId string) error
- func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int64, ...) (err error)
- type RunStoreInterface
- type SQLDialect
- type SQLiteDialect
- type TaskStore
- type TaskStoreInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NullInt64ToPointer ¶
func NullStringToPointer ¶
func NullStringToPointer(ns sql.NullString) *string
func PointerToNullInt64 ¶
func PointerToNullString ¶
func PointerToNullString(sp *string) sql.NullString
Types ¶
type DB ¶
type DB struct { *sql.DB SQLDialect }
DB a struct wrapping plain sql library with SQL dialect, to solve any feature difference between MySQL, which is used in production, and Sqlite, which is used for unit testing.
func NewFakeDbOrFatal ¶
func NewFakeDbOrFatal() *DB
type DBStatusStore ¶
type DBStatusStore struct {
// contains filtered or unexported fields
}
Implementation of a DBStatusStoreInterface. This store read/write state of the database. For now we store status like whether sample is loaded.
func NewDBStatusStore ¶
func NewDBStatusStore(db *DB) *DBStatusStore
factory function for database status store
func (*DBStatusStore) HaveSamplesLoaded ¶
func (s *DBStatusStore) HaveSamplesLoaded() (bool, error)
func (*DBStatusStore) InitializeDBStatusTable ¶
func (s *DBStatusStore) InitializeDBStatusTable() error
func (*DBStatusStore) MarkSampleLoaded ¶
func (s *DBStatusStore) MarkSampleLoaded() error
type DBStatusStoreInterface ¶
type DefaultExperimentStore ¶
type DefaultExperimentStore struct {
// contains filtered or unexported fields
}
Implementation of a DefaultExperimentStoreInterface. This stores the default experiment's ID, which is created the first time the API server is initialized.
func NewDefaultExperimentStore ¶
func NewDefaultExperimentStore(db *DB) *DefaultExperimentStore
factory function for creating default experiment store
func (*DefaultExperimentStore) GetDefaultExperimentId ¶
func (s *DefaultExperimentStore) GetDefaultExperimentId() (string, error)
func (*DefaultExperimentStore) SetDefaultExperimentId ¶
func (s *DefaultExperimentStore) SetDefaultExperimentId(id string) error
func (*DefaultExperimentStore) UnsetDefaultExperimentIdIfIdMatches ¶
func (s *DefaultExperimentStore) UnsetDefaultExperimentIdIfIdMatches(tx *sql.Tx, id string) error
Sets the default experiment ID stored in the DB to the empty string. This needs to happen if the experiment is deleted via the normal delete experiment API so that the server knows to create a new default. This is always done alongside the deletion of the actual experiment itself, so a transaction is needed as input. Update is used instead of delete so that we don't need to first check that the experiment ID is there.
type ExperimentStore ¶
type ExperimentStore struct {
// contains filtered or unexported fields
}
func NewExperimentStore ¶
func NewExperimentStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *ExperimentStore
factory function for experiment store
func (*ExperimentStore) ArchiveExperiment ¶
func (s *ExperimentStore) ArchiveExperiment(expId string) error
func (*ExperimentStore) CreateExperiment ¶
func (s *ExperimentStore) CreateExperiment(experiment *model.Experiment) (*model.Experiment, error)
func (*ExperimentStore) DeleteExperiment ¶
func (s *ExperimentStore) DeleteExperiment(id string) error
func (*ExperimentStore) GetExperiment ¶
func (s *ExperimentStore) GetExperiment(uuid string) (*model.Experiment, error)
func (*ExperimentStore) ListExperiments ¶
func (s *ExperimentStore) ListExperiments(filterContext *common.FilterContext, opts *list.Options) ([]*model.Experiment, int, string, error)
Runs two SQL queries in a transaction to return a list of matching experiments, as well as their total_size. The total_size does not reflect the page size.
func (*ExperimentStore) UnarchiveExperiment ¶
func (s *ExperimentStore) UnarchiveExperiment(expId string) error
type ExperimentStoreInterface ¶
type ExperimentStoreInterface interface { ListExperiments(filterContext *common.FilterContext, opts *list.Options) ([]*model.Experiment, int, string, error) GetExperiment(uuid string) (*model.Experiment, error) CreateExperiment(*model.Experiment) (*model.Experiment, error) DeleteExperiment(uuid string) error ArchiveExperiment(expId string) error UnarchiveExperiment(expId string) error }
type FakeMinioClient ¶
type FakeMinioClient struct {
// contains filtered or unexported fields
}
func NewFakeMinioClient ¶
func NewFakeMinioClient() *FakeMinioClient
func (*FakeMinioClient) DeleteObject ¶
func (c *FakeMinioClient) DeleteObject(bucketName, objectName string) error
func (*FakeMinioClient) ExistObject ¶
func (c *FakeMinioClient) ExistObject(objectName string) bool
func (*FakeMinioClient) GetObject ¶
func (c *FakeMinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
func (*FakeMinioClient) GetObjectCount ¶
func (c *FakeMinioClient) GetObjectCount() int
type JobStore ¶
type JobStore struct {
// contains filtered or unexported fields
}
func NewJobStore ¶
func NewJobStore(db *DB, time util.TimeInterface) *JobStore
factory function for job store
func (*JobStore) ListJobs ¶
func (s *JobStore) ListJobs( filterContext *common.FilterContext, opts *list.Options) ([]*model.Job, int, string, error)
Runs two SQL queries in a transaction to return a list of matching jobs, as well as their total_size. The total_size does not reflect the page size, but it does reflect the number of jobs matching the supplied filters and resource references.
type JobStoreInterface ¶
type JobStoreInterface interface { ListJobs(filterContext *common.FilterContext, opts *list.Options) ([]*model.Job, int, string, error) GetJob(id string) (*model.Job, error) CreateJob(*model.Job) (*model.Job, error) DeleteJob(id string) error EnableJob(id string, enabled bool) error UpdateJob(swf *util.ScheduledWorkflow) error }
type MinioClient ¶
func (*MinioClient) DeleteObject ¶
func (c *MinioClient) DeleteObject(bucketName, objectName string) error
func (*MinioClient) GetObject ¶
func (c *MinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
type MinioClientInterface ¶
type MinioClientInterface interface { PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error) DeleteObject(bucketName, objectName string) error }
Create interface for minio client struct, making it more unit testable.
type MinioObjectStore ¶
type MinioObjectStore struct {
// contains filtered or unexported fields
}
Managing pipeline using Minio
func NewMinioObjectStore ¶
func NewMinioObjectStore(minioClient MinioClientInterface, bucketName string, baseFolder string, disableMultipart bool) *MinioObjectStore
func (*MinioObjectStore) AddAsYamlFile ¶
func (m *MinioObjectStore) AddAsYamlFile(o interface{}, filePath string) error
func (*MinioObjectStore) AddFile ¶
func (m *MinioObjectStore) AddFile(file []byte, filePath string) error
func (*MinioObjectStore) DeleteFile ¶
func (m *MinioObjectStore) DeleteFile(filePath string) error
func (*MinioObjectStore) GetFile ¶
func (m *MinioObjectStore) GetFile(filePath string) ([]byte, error)
func (*MinioObjectStore) GetFromYamlFile ¶
func (m *MinioObjectStore) GetFromYamlFile(o interface{}, filePath string) error
func (*MinioObjectStore) GetPipelineKey ¶
func (m *MinioObjectStore) GetPipelineKey(pipelineID string) string
GetPipelineKey adds the configured base folder to pipeline id.
type MySQLDialect ¶
type MySQLDialect struct{}
MySQLDialect implements SQLDialect with mysql dialect implementation.
func NewMySQLDialect ¶
func NewMySQLDialect() MySQLDialect
func (MySQLDialect) GroupConcat ¶
func (d MySQLDialect) GroupConcat(expr string, separator string) string
func (MySQLDialect) IsDuplicateError ¶
func (d MySQLDialect) IsDuplicateError(err error) bool
func (MySQLDialect) SelectForUpdate ¶
func (d MySQLDialect) SelectForUpdate(query string) string
type ObjectStoreInterface ¶
type ObjectStoreInterface interface { AddFile(template []byte, filePath string) error DeleteFile(filePath string) error GetFile(filePath string) ([]byte, error) AddAsYamlFile(o interface{}, filePath string) error GetFromYamlFile(o interface{}, filePath string) error GetPipelineKey(pipelineId string) string }
Interface for object store.
func NewFakeObjectStore ¶
func NewFakeObjectStore() ObjectStoreInterface
Return the object store with faked minio client.
type PipelineStore ¶
type PipelineStore struct {
// contains filtered or unexported fields
}
func NewPipelineStore ¶
func NewPipelineStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *PipelineStore
factory function for pipeline store
func (*PipelineStore) CreatePipeline ¶
func (*PipelineStore) CreatePipelineVersion ¶
func (s *PipelineStore) CreatePipelineVersion(v *model.PipelineVersion, updatePipelineDefaultVersion bool) (*model.PipelineVersion, error)
func (*PipelineStore) DeletePipeline ¶
func (s *PipelineStore) DeletePipeline(id string) error
func (*PipelineStore) DeletePipelineVersion ¶
func (s *PipelineStore) DeletePipelineVersion(versionId string) error
func (*PipelineStore) GetPipeline ¶
func (s *PipelineStore) GetPipeline(id string) (*model.Pipeline, error)
func (*PipelineStore) GetPipelineVersion ¶
func (s *PipelineStore) GetPipelineVersion(versionId string) (*model.PipelineVersion, error)
func (*PipelineStore) GetPipelineVersionWithStatus ¶
func (s *PipelineStore) GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error)
func (*PipelineStore) GetPipelineWithStatus ¶
func (s *PipelineStore) GetPipelineWithStatus(id string, status model.PipelineStatus) (*model.Pipeline, error)
func (*PipelineStore) ListPipelineVersions ¶
func (s *PipelineStore) ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error)
func (*PipelineStore) ListPipelines ¶
func (s *PipelineStore) ListPipelines(filterContext *common.FilterContext, opts *list.Options) ([]*model.Pipeline, int, string, error)
Runs two SQL queries in a transaction to return a list of matching pipelines, as well as their total_size. The total_size does not reflect the page size.
func (*PipelineStore) SetUUIDGenerator ¶
func (s *PipelineStore) SetUUIDGenerator(new_uuid util.UUIDGeneratorInterface)
SetUUIDGenerator is for unit tests in other packages who need to set uuid, since uuid is not exported.
func (*PipelineStore) UpdatePipelineAndVersionsStatus ¶
func (s *PipelineStore) UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, pipelineVersionStatus model.PipelineVersionStatus) error
func (*PipelineStore) UpdatePipelineDefaultVersion ¶
func (s *PipelineStore) UpdatePipelineDefaultVersion(pipelineId string, versionId string) error
func (*PipelineStore) UpdatePipelineStatus ¶
func (s *PipelineStore) UpdatePipelineStatus(id string, status model.PipelineStatus) error
func (*PipelineStore) UpdatePipelineVersionStatus ¶
func (s *PipelineStore) UpdatePipelineVersionStatus(id string, status model.PipelineVersionStatus) error
type PipelineStoreInterface ¶
type PipelineStoreInterface interface { ListPipelines(filterContext *common.FilterContext, opts *list.Options) ([]*model.Pipeline, int, string, error) GetPipeline(pipelineId string) (*model.Pipeline, error) GetPipelineWithStatus(id string, status model.PipelineStatus) (*model.Pipeline, error) DeletePipeline(pipelineId string) error CreatePipeline(*model.Pipeline) (*model.Pipeline, error) UpdatePipelineStatus(string, model.PipelineStatus) error UpdatePipelineDefaultVersion(string, string) error CreatePipelineVersion(*model.PipelineVersion, bool) (*model.PipelineVersion, error) GetPipelineVersion(versionId string) (*model.PipelineVersion, error) GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error) ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error) DeletePipelineVersion(pipelineVersionId string) error // Change status of a particular version. UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error // TODO(jingzhang36): remove this temporary method after resource manager's // CreatePipeline stops using it. UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, pipelineVersionStatus model.PipelineVersionStatus) error }
type ResourceReferenceStore ¶
type ResourceReferenceStore struct {
// contains filtered or unexported fields
}
func NewResourceReferenceStore ¶
func NewResourceReferenceStore(db *DB) *ResourceReferenceStore
func (*ResourceReferenceStore) CreateResourceReferences ¶
func (s *ResourceReferenceStore) CreateResourceReferences(tx *sql.Tx, refs []*model.ResourceReference) error
Create a resource reference. This is always in company with creating a parent resource so a transaction is needed as input.
func (*ResourceReferenceStore) DeleteResourceReferences ¶
func (s *ResourceReferenceStore) DeleteResourceReferences(tx *sql.Tx, id string, resourceType common.ResourceType) error
Delete all resource references for a specific resource. This is always in company with creating a parent resource so a transaction is needed as input.
func (*ResourceReferenceStore) GetResourceReference ¶
func (s *ResourceReferenceStore) GetResourceReference(resourceId string, resourceType common.ResourceType, referenceType common.ResourceType) (*model.ResourceReference, error)
type ResourceReferenceStoreInterface ¶
type ResourceReferenceStoreInterface interface { // Retrieve the resource reference for a given resource id, type and a reference type. GetResourceReference(resourceId string, resourceType common.ResourceType, referenceType common.ResourceType) (*model.ResourceReference, error) }
type RunStore ¶
type RunStore struct {
// contains filtered or unexported fields
}
func NewRunStore ¶
func NewRunStore(db *DB, time util.TimeInterface) *RunStore
NewRunStore creates a new RunStore.
func (*RunStore) AddSortByRunMetricToSelect ¶
func (s *RunStore) AddSortByRunMetricToSelect(sqlBuilder sq.SelectBuilder, opts *list.Options) sq.SelectBuilder
Add a metric as a new field to the select clause by join the passed-in SQL query with run_metrics table. With the metric as a field in the select clause enable sorting on this metric afterwards. TODO(jingzhang36): example of resulting SQL query and explanation for it.
func (*RunStore) ArchiveRun ¶
func (*RunStore) CreateOrUpdateRun ¶
func (*RunStore) ListRuns ¶
func (s *RunStore) ListRuns( filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error)
Runs two SQL queries in a transaction to return a list of matching runs, as well as their total_size. The total_size does not reflect the page size, but it does reflect the number of runs matching the supplied filters and resource references.
func (*RunStore) ReportMetric ¶
ReportMetric inserts a new metric to run_metrics table. Conflicting metrics are ignored.
func (*RunStore) TerminateRun ¶
func (*RunStore) UnarchiveRun ¶
type RunStoreInterface ¶
type RunStoreInterface interface { GetRun(runId string) (*model.RunDetail, error) ListRuns(filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error) // Create a run entry in the database CreateRun(run *model.RunDetail) (*model.RunDetail, error) // Update run table. Only condition and runtime manifest is allowed to be updated. UpdateRun(id string, condition string, finishedAtInSec int64, workflowRuntimeManifest string) (err error) // Archive a run ArchiveRun(id string) error // Unarchive a run UnarchiveRun(id string) error // Delete a run entry from the database DeleteRun(id string) error // Update the run table or create one if the run doesn't exist CreateOrUpdateRun(run *model.RunDetail) error // Store a new metric entry to run_metrics table. ReportMetric(metric *model.RunMetric) (err error) // Terminate a run TerminateRun(runId string) error }
type SQLDialect ¶
type SQLDialect interface { // GroupConcat builds query to group concatenate `expr` in each row and use `separator` // to join rows in a group. GroupConcat(expr string, separator string) string // Concat builds query to concatenete a list of `exprs` into a single string with // a separator in between. Concat(exprs []string, separator string) string // Check whether the error is a SQL duplicate entry error or not IsDuplicateError(err error) bool // Modifies the SELECT clause in query to return one that locks the selected // row for update. SelectForUpdate(query string) string }
SQLDialect abstracts common sql queries which vary in different dialect. It is used to bridge the difference between mysql (production) and sqlite (test).
type SQLiteDialect ¶
type SQLiteDialect struct{}
SQLiteDialect implements SQLDialect with sqlite dialect implementation.
func NewSQLiteDialect ¶
func NewSQLiteDialect() SQLiteDialect
func (SQLiteDialect) Concat ¶
func (d SQLiteDialect) Concat(exprs []string, separator string) string
func (SQLiteDialect) GroupConcat ¶
func (d SQLiteDialect) GroupConcat(expr string, separator string) string
func (SQLiteDialect) IsDuplicateError ¶
func (d SQLiteDialect) IsDuplicateError(err error) bool
func (SQLiteDialect) SelectForUpdate ¶
func (d SQLiteDialect) SelectForUpdate(query string) string
type TaskStore ¶
type TaskStore struct {
// contains filtered or unexported fields
}
func NewTaskStore ¶
func NewTaskStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *TaskStore
NewTaskStore creates a new TaskStore.
func (*TaskStore) CreateTask ¶
func (*TaskStore) ListTasks ¶
func (s *TaskStore) ListTasks(filterContext *common.FilterContext, opts *list.Options) ([]*model.Task, int, string, error)
Runs two SQL queries in a transaction to return a list of matching experiments, as well as their total_size. The total_size does not reflect the page size.