Documentation
¶
Index ¶
- Variables
- func FormIssueSummary(auditResults []*AuditReport) string
- func IsSpecInvalidError(err error) bool
- func IsUploadSpecValidationError(err error) bool
- type AuditGroup
- type AuditPublisher
- type AuditReport
- type AuditResult
- type AuditResultStore
- type AuditService
- type AuditStore
- type AuditSummary
- type AuditSummaryFactory
- type Auditor
- type BigqueryJob
- type BigqueryJobStore
- type Comparator
- type ConstraintStore
- type Entity
- type EntityFinder
- type EntityStore
- type Entry
- func (e Entry) Group() string
- func (e Entry) JobID() string
- func (e Entry) JobType() job.Type
- func (e Entry) Partition() string
- func (e Entry) Status() string
- func (e Entry) TableURN() string
- func (e Entry) WithGroup(group string) Entry
- func (e Entry) WithJobID(jobID string) Entry
- func (e Entry) WithJobType(jobType job.Type) Entry
- func (e Entry) WithPartition(partition string) Entry
- func (e Entry) WithStatus(status string) Entry
- func (e Entry) WithTableURN(tableURN string) Entry
- type ErrSpecInvalid
- type ErrUploadSpecValidation
- type File
- type FileStore
- type FileStoreFactory
- type GitAuth
- type GitInfo
- type GitRepository
- type GitRepositoryFactory
- type Label
- type Message
- type MessageProvider
- type MessageProviderFactory
- type MetadataStore
- type MetricGenerator
- type MetricProfiler
- type MetricQuery
- type MetricResultIdentifier
- type MetricSpecGenerator
- type MetricStore
- type PartitionScanner
- type PathResolver
- type PathType
- type ProfileBQLogger
- type ProfileConfig
- type ProfileGroup
- type ProfileMetric
- type ProfilePublisher
- type ProfileService
- type ProfileStatisticGenerator
- type ProfileStore
- type ProtoBuilder
- type Publisher
- type PublisherType
- type QueryExecutor
- type Row
- type SQLExpressionFactory
- type Sink
- type SinkConfig
- type SinkFactory
- type SpecValidator
- type Status
- type StatusLogger
- type StatusStore
- type Task
- type Tolerance
- type ToleranceRule
- type ToleranceSpec
- type ToleranceSpecStateStore
- type ToleranceStore
- type ToleranceStoreFactory
- type UploadFactory
- type ValidatedMetric
Constants ¶
This section is empty.
Variables ¶
var ( //ErrUniqueConstraintNotFound is an error ErrUniqueConstraintNotFound = errors.New("unique constraint not found") //ErrTableMetadataNotFound should be thrown when a table is not exist ErrTableMetadataNotFound = errors.New("table metadata not found") )
var ( //ErrProfileNotFound is an error ErrProfileNotFound = errors.New("profile not found") //ErrProfileInvalid when a profile doesnt have any status in status Log //normally profile at least has one status in status Log ErrProfileInvalid = errors.New("profile invalid") )
var ( //ErrAuditIDInvalid thrown when access audit that is not belong to a profile ErrAuditIDInvalid = errors.New("error audit not belong to profile") )
var ( //ErrAuditNotFound is an error ErrAuditNotFound = errors.New("audit not found") )
var ( //ErrAuditResultNotFound is an error ErrAuditResultNotFound = errors.New("audit result not found") )
var ErrEntityNotFound = errors.New("entity not found")
var ErrFileNotFound = errors.New("file not found")
var ( //ErrNoProfileMetricFound thrown when metric store do not find any profile metric record ErrNoProfileMetricFound = errors.New("no profile metric found") )
var ErrPartitionExpressionIsNotSupported = errors.New("partition expression is not supported for this table")
var ( //ErrStatusNotFound is an error when getting status ErrStatusNotFound = errors.New("status not found") )
var ( //ErrToleranceNotFound thrown when tolerance for a tableID not found ErrToleranceNotFound = errors.New("tolerance for tableID not found") )
var Ext = ".yaml"
Ext is default yaml file extension
var GitSshUrlPattern = regexp.MustCompile(`^git@.+\.git$`)
GitSshUrlPattern is regex pattern of supported git ssh url format The supported is git ssh format git@www.git.com:group/repo.git this can be used to verify the supported format
Functions ¶
func FormIssueSummary ¶
func FormIssueSummary(auditResults []*AuditReport) string
FormIssueSummary create issue summary
func IsSpecInvalidError ¶
Types ¶
type AuditGroup ¶
type AuditGroup []*AuditReport
AuditGroup is a type to do group by operation on AuditReport
func (AuditGroup) ByFieldID ¶
func (ag AuditGroup) ByFieldID() map[string][]*AuditReport
ByFieldID group by field ID
func (AuditGroup) ByGroupValue ¶
func (ag AuditGroup) ByGroupValue() map[string][]*AuditReport
ByGroupValue group by group value
func (AuditGroup) ByPartitionDate ¶
func (ag AuditGroup) ByPartitionDate() map[string][]*AuditReport
ByPartitionDate group by partition date
type AuditPublisher ¶
type AuditPublisher interface { PublishAuditResult(audit *job.Audit, auditResult []*AuditReport) error Close(ctx context.Context) error }
AuditPublisher for publisher for audit
type AuditReport ¶
type AuditReport struct { AuditID string Partition string GroupValue string TableURN string FieldID string MetricName metric.Type MetricValue float64 Condition string Metadata map[string]interface{} ToleranceRules []ToleranceRule PassFlag bool EventTimestamp time.Time }
AuditReport is the result of audit
type AuditResult ¶
type AuditResult struct { Audit *job.Audit AuditReports []*AuditReport }
AuditResult is audit job and the report detail
type AuditResultStore ¶
type AuditResultStore interface {
StoreResults(results []*AuditReport) error
}
AuditResultStore to store the auditing result
type AuditService ¶
type AuditService interface { //RunAudit start audit service RunAudit(profileID string) (*AuditResult, error) }
AuditService is service of auditor
type AuditStore ¶
type AuditStore interface { CreateAudit(audit *job.Audit) (*job.Audit, error) UpdateAudit(audit *job.Audit) error }
AuditStore is store for audit entity
type AuditSummary ¶
AuditSummary is summary of audit
type AuditSummaryFactory ¶
type AuditSummaryFactory interface {
Create(auditResults []*AuditReport, auditJob *job.Audit) (*AuditSummary, error)
}
type Auditor ¶
type Auditor interface {
Audit(audit *job.Audit) ([]*AuditReport, error)
}
Auditor to compare quality result with tolerances
type BigqueryJob ¶
type BigqueryJob struct { ID string ProfileID string //BqID is bigquery job ID provided by query execution BqID string CreatedAt time.Time }
BigqueryJob bigquery job information of a profile task
type BigqueryJobStore ¶
type BigqueryJobStore interface {
Store(bigqueryJob *BigqueryJob) error
}
BigqueryJobStore to store Bigquery job created by profile job
type Comparator ¶
type Comparator string
Comparator comparator of tolerance rule
const ( //ComparatorLessThan metric < value ComparatorLessThan Comparator = "less_than" //ComparatorLessThanEq metric <= value ComparatorLessThanEq Comparator = "less_than_eq" //ComparatorMoreThan metric > value ComparatorMoreThan Comparator = "more_than" //ComparatorMoreThanEq metric >= value ComparatorMoreThanEq Comparator = "more_than_eq" )
func (Comparator) String ¶
func (c Comparator) String() string
type ConstraintStore ¶
ConstraintStore interface of store that
type Entity ¶
type Entity struct { ID string Name string Environment string GitURL string GcpProjectIDs []string CreatedAt time.Time UpdatedAt time.Time }
Entity is information about an entity
type EntityFinder ¶
type EntityFinder []*Entity
func (EntityFinder) FindByProjectID ¶
func (e EntityFinder) FindByProjectID(projectID string) (*Entity, error)
type EntityStore ¶
type EntityStore interface { Save(entity *Entity) (*Entity, error) Create(entity *Entity) (*Entity, error) Get(ID string) (*Entity, error) GetEntityByGitURL(gitURL string) (*Entity, error) GetEntityByProjectID(gcpProjectID string) (*Entity, error) GetAll() ([]*Entity, error) Update(entity *Entity) (*Entity, error) }
EntityStore is storage for Entity
type Entry ¶
type Entry struct {
// contains filtered or unexported fields
}
Entry as an entry struct for logging
func (Entry) WithJobType ¶
WithJobType to set job type
func (Entry) WithPartition ¶
WithPartition to set partition
func (Entry) WithTableURN ¶
WithTableURN to set table URN
type ErrSpecInvalid ¶
ErrSpecInvalid error thrown when a spec content is invalid, contains list of errors
func (*ErrSpecInvalid) Error ¶
func (e *ErrSpecInvalid) Error() string
type ErrUploadSpecValidation ¶
type ErrUploadSpecValidation struct {
Errors []error
}
ErrUploadSpecValidation thrown when upload failed caused by invalid spec, contains list of invalid spec errors
func (*ErrUploadSpecValidation) Error ¶
func (s *ErrUploadSpecValidation) Error() string
type FileStore ¶
type FileStore interface { Get(filePath string) (*File, error) GetAll() ([]*File, error) GetPaths() ([]string, error) Create(file *File) error Delete(filePath string) error }
FileStore is storage that contains tolerance spec in a file
type FileStoreFactory ¶
FileStoreFactory is creator of FileStore
type GitAuth ¶
type GitAuth interface { transport.AuthMethod }
GitAuth describes authentication configuration for GitRepository
type GitInfo ¶
type GitInfo struct { //URL is a git url it must comply GitSshUrlPattern pattern format //this will be used to do git clone using ssh protocol URL string //CommitID is commit id that will be to be checked out //empty string means checking out latest revision CommitID string //PathPrefix is path prefix on git repository where the predator spec root directory structure is located //for example using Default directory structure, the files is flatly placed on a folder //if git url is git@github.com:username/project.git then in the repository the spec files placed under PathPrefix/ dir PathPrefix string }
GitInfo git repository information
type GitRepository ¶
GitRepository a git repository
type GitRepositoryFactory ¶
type GitRepositoryFactory interface { Create(url string) GitRepository CreateWithPrefix(url string, pathPrefix string) GitRepository }
GitRepositoryFactory creator of GitRepository
type Label ¶
Label is structured bigquery resource identifier
func ParseLabel ¶
ParseLabel create Label from string formatted bigquery fully qualified identifier
type MessageProvider ¶
type MessageProviderFactory ¶
type MessageProviderFactory interface { CreateProfileMessage(profile *job.Profile, metrics []*metric.Metric) []MessageProvider CreateAuditMessage(audit *job.Audit, auditResult []*AuditReport) []MessageProvider }
type MetadataStore ¶
type MetadataStore interface { //GetMetadata to fetch metadata as requirement for profiling GetMetadata(tableID string) (*meta.TableSpec, error) //GetUniqueConstraints to fetch unique constraints to calculate duplication metric GetUniqueConstraints(tableID string) ([]string, error) }
MetadataStore is store to get metadata information
type MetricGenerator ¶
type MetricGenerator interface { //Generate metrics Generate(entry Entry, config *job.Profile) ([]*metric.Metric, error) }
MetricsGenerator generate metric
type MetricProfiler ¶
type MetricProfiler interface {
Profile(entry Entry, profile *job.Profile, metricSpecs []*metric.Spec) ([]*metric.Metric, error)
}
MetricProfiler collect metrics, actually do metric calculation to obtain the value of metric
type MetricQuery ¶
MetricQuery is field selector to query metrics
type MetricResultIdentifier ¶
MetricResultIdentifier to identify result beside ID
type MetricSpecGenerator ¶
type MetricSpecGenerator interface { Generate(tableSpec *meta.TableSpec, tolerances []*Tolerance) ([]*metric.Spec, error) GenerateMetricSpec(urn string) ([]*metric.Spec, error) }
MetricSpecGenerator produce metric specification to be collected
type MetricStore ¶
type MetricStore interface { Store(profile *job.Profile, metrics []*metric.Metric) error GetMetricsByProfileID(ID string) ([]*metric.Metric, error) }
MetricStore to store profile result
type PartitionScanner ¶
type PartitionScanner interface {
GetAffectedPartition(tableURN string, lastModifiedTimestamp time.Time) ([]string, error)
}
PartitionScanner to get affected partitions using last modified timestamp
type PathResolver ¶
type PathResolver interface { GetPath(urn string) (string, error) GetURN(filePath string) (string, error) }
PathResolver to get path from resource name with possibility of multiple layout
type PathType ¶
type PathType string
PathType path of directory structure of a FileStore that is supported
const ( //Git is path type that used to resolve git directory structure Git PathType = "git" //MultiTenancy is path type that used to resolve multi tenancy directory structure MultiTenancy PathType = "multi_tenancy" //Default simple path type, mapping from project.dataset.table to project.dataset.table.yaml Default PathType = "default" )
type ProfileBQLogger ¶
ProfileBQLogger to log profile id and bq job id mapping
type ProfileConfig ¶
type ProfileConfig struct { ProfileID string TableSpec *meta.TableSpec MetricSpecs []*metric.Spec Partition string }
ProfileConfig as an identifier to do profiling
type ProfileGroup ¶
type ProfileGroup []*ProfileMetric
ProfileGroup is a type to do group by operation on ProfileMetric
func (ProfileGroup) ByPartitionDate ¶
func (pg ProfileGroup) ByPartitionDate() map[string][]*ProfileMetric
ByPartitionDate group by partition date
type ProfileMetric ¶
type ProfileMetric struct { ID string ProfileID string TableURN string Partition string FieldID string OwnerType metric.Owner Category metric.Category Condition string MetricName metric.Type MetricValue float64 EventTimestamp time.Time }
ProfileMetric is single metric that produced by publisher
type ProfilePublisher ¶
type ProfilePublisher interface { Publish(profileJob *job.Profile, metrics []*metric.Metric) error Close(context.Context) error }
ProfilePublisher for profiler
type ProfileService ¶
type ProfileService interface { //CreateProfile create profile job CreateProfile(detail *job.Profile) (*job.Profile, error) Get(ID string) (*job.Profile, error) WaitAll(ctx context.Context) error GetLog(ID string) ([]*Status, error) }
ProfileService is service of profiler
type ProfileStatisticGenerator ¶
ProfileStatisticGenerator generate profile statistic
type ProfileStore ¶
type ProfileStore interface { Create(profile *job.Profile) (*job.Profile, error) Update(profile *job.Profile) error Get(ID string) (*job.Profile, error) }
ProfileStore to store profile
type ProtoBuilder ¶
type Publisher ¶
type Publisher interface { Publish(provider MessageProvider) error Close(ctx context.Context) error }
type PublisherType ¶
type PublisherType string
PublisherType type of supported publisher
const ( //Kafka for publish to apache kafka Kafka PublisherType = "kafka" //Console for publish to terminal console or log (for testing purpose) Console PublisherType = "console" //Dummy if publish to none Dummy PublisherType = "none" )
type QueryExecutor ¶
type QueryExecutor interface {
Run(profile *job.Profile, query string, queryType job.QueryType) ([]Row, error)
}
QueryExecutor that execute bigquery SQL query script return list of Row as result
type Row ¶
type Row map[string]interface{}
Row is single table row the key of map is column name and the value is the cell value
type SQLExpressionFactory ¶
SQLExpressionFactory to generate SQL expression
type SinkConfig ¶
type SinkConfig struct { Type PublisherType Broker []string Topic string }
type SinkFactory ¶
type SinkFactory interface {
Create(config *SinkConfig) Sink
}
type SpecValidator ¶
type SpecValidator interface { //Validate content of data quality spec should return error ErrSpecInvalid when field or table not found Validate(spec *ToleranceSpec) error }
type Status ¶
type Status struct { ID string JobID string JobType job.Type Status string Message string EventTimestamp time.Time }
Status is status of any task
type StatusLogger ¶
StatusLogger to log status
type StatusStore ¶
type StatusStore interface { Store(status *Status) error GetLatestStatusByIDandType(jobID string, jobType job.Type) (*Status, error) GetStatusLogByIDandType(jobID string, jobType job.Type) ([]*Status, error) }
StatusStore to store status of profile and audit process
type Tolerance ¶
type Tolerance struct { ID string TableURN string FieldID string MetricName metric.Type Condition string //condition for invalid_pct metric Metadata map[string]interface{} ToleranceRules []ToleranceRule CreatedAt time.Time UpdatedAt time.Time }
Tolerance is tolerance of quality metrics
type ToleranceRule ¶
type ToleranceRule struct { Comparator Comparator `json:"comparator"` Value float64 `json:"value"` }
ToleranceRule represents tolerance comparator and its value
type ToleranceSpec ¶
type ToleranceSpecStateStore ¶
type ToleranceSpecStateStore interface { SaveTolerances(profileID string, tolerances []*Tolerance) error GetTolerancesByProfileID(profileID string) ([]*Tolerance, error) }
ToleranceSpecStateStore store of tolerance to be used for profile and audit
type ToleranceStore ¶
type ToleranceStore interface { Create(spec *ToleranceSpec) error GetByTableID(tableID string) (*ToleranceSpec, error) Delete(tableID string) error GetAll() ([]*ToleranceSpec, error) GetByProjectID(projectID string) ([]*ToleranceSpec, error) //GetResourceNames provide information all of tableID in the stored specs GetResourceNames() ([]string, error) }
ToleranceStore to fetch the quality tolerances
type ToleranceStoreFactory ¶
type ToleranceStoreFactory interface { Create(URL string, multiTenancyEnabled bool) (ToleranceStore, error) CreateWithOptions(store FileStore, pathType PathType) (ToleranceStore, error) }
ToleranceStoreFactory creator of ToleranceStore
type UploadFactory ¶
UploadFactory creator of UploadTask
type ValidatedMetric ¶
type ValidatedMetric struct { Metric *metric.Metric ToleranceRules []ToleranceRule PassFlag bool }
ValidatedMetric is metric audited