Documentation ¶
Index ¶
- Constants
- func DocStoreErrorf(code DocStoreErrorCode, format string, a ...any) error
- func EnsureCoreSchema(ctx context.Context, store SchemaStore) error
- func EnsureSchema(ctx context.Context, store SchemaStore, name string, version string, ...) error
- func EntityRefToRPC(ref []revisor.EntityRef) []*repository.EntityRef
- func EventToRPC(evt Event) *repository.EventlogItem
- func IsDocStoreErrorCode(err error, code DocStoreErrorCode) bool
- func ListenAndServe(ctx context.Context, addr string, h http.Handler) error
- func ReportToRPC(r Report) *repository.Report
- func RequireAnyScope(ctx context.Context, scopes ...string) (*elephantine.AuthInfo, error)
- func S3Client(ctx context.Context, opts S3Options) (*s3.Client, error)
- func SetUpRouter(router *httprouter.Router, opts ...RouterOption) error
- func Subscope(scope string, resource ...string) string
- func ToMetricAggregation(a Aggregation) (repository.MetricAggregation, error)
- func ValidateLabel(label string) error
- func ValueProcessingFromRPC(r []*repository.ReportValue) (map[string][]ReportValueProcess, error)
- func ValueProcessingToRPC(v map[string][]ReportValueProcess) []*repository.ReportValue
- type ACLEntry
- type Aggregation
- type ArchiveEventType
- type ArchiveSignature
- type ArchivedDocumentStatus
- type ArchivedDocumentVersion
- type ArchivedEvent
- type Archiver
- type ArchiverOptions
- type CheckPermissionRequest
- type CheckPermissionResult
- type DeleteRequest
- type DocStore
- type DocStoreError
- type DocStoreErrorCode
- type DocumentMeta
- type DocumentStatus
- type DocumentUpdate
- type DocumentValidator
- type DocumentsService
- func (a *DocumentsService) BulkUpdate(ctx context.Context, req *repository.BulkUpdateRequest) (*repository.BulkUpdateResponse, error)
- func (a *DocumentsService) CompactedEventlog(ctx context.Context, req *repository.GetCompactedEventlogRequest) (*repository.GetCompactedEventlogResponse, error)
- func (a *DocumentsService) Delete(ctx context.Context, req *repository.DeleteDocumentRequest) (*repository.DeleteDocumentResponse, error)
- func (a *DocumentsService) Eventlog(ctx context.Context, req *repository.GetEventlogRequest) (*repository.GetEventlogResponse, error)
- func (a *DocumentsService) ExtendLock(ctx context.Context, req *repository.ExtendLockRequest) (*repository.LockResponse, error)
- func (a *DocumentsService) Get(ctx context.Context, req *repository.GetDocumentRequest) (*repository.GetDocumentResponse, error)
- func (a *DocumentsService) GetHistory(ctx context.Context, req *repository.GetHistoryRequest) (*repository.GetHistoryResponse, error)
- func (a *DocumentsService) GetMeta(ctx context.Context, req *repository.GetMetaRequest) (*repository.GetMetaResponse, error)
- func (a *DocumentsService) GetPermissions(ctx context.Context, req *repository.GetPermissionsRequest) (*repository.GetPermissionsResponse, error)
- func (a *DocumentsService) GetStatusHistory(ctx context.Context, req *repository.GetStatusHistoryRequest) (*repository.GetStatusHistoryReponse, error)
- func (a *DocumentsService) Lock(ctx context.Context, req *repository.LockRequest) (*repository.LockResponse, error)
- func (a *DocumentsService) Unlock(ctx context.Context, req *repository.UnlockRequest) (*repository.UnlockResponse, error)
- func (a *DocumentsService) Update(ctx context.Context, req *repository.UpdateRequest) (*repository.UpdateResponse, error)
- func (a *DocumentsService) Validate(_ context.Context, req *repository.ValidateRequest) (*repository.ValidateResponse, error)
- type Event
- type EventType
- type FanOut
- type GetCompactedEventlogRequest
- type Lock
- type LockRequest
- type LockResult
- type Metric
- type MetricKind
- type MetricStore
- type MetricsService
- func (m *MetricsService) DeleteKind(ctx context.Context, req *repository.DeleteMetricKindRequest) (*repository.DeleteMetricKindResponse, error)
- func (m *MetricsService) GetKinds(ctx context.Context, _ *repository.GetMetricKindsRequest) (*repository.GetMetricKindsResponse, error)
- func (m *MetricsService) RegisterKind(ctx context.Context, req *repository.RegisterMetricKindRequest) (*repository.RegisterMetricKindResponse, error)
- func (m *MetricsService) RegisterMetric(ctx context.Context, req *repository.RegisterMetricRequest) (*repository.RegisterMetricResponse, error)
- type NotifyChannel
- type PGDocStore
- func (s *PGDocStore) ActivateSchema(ctx context.Context, name, version string) error
- func (s *PGDocStore) CheckPermission(ctx context.Context, req CheckPermissionRequest) (CheckPermissionResult, error)
- func (s *PGDocStore) DeactivateSchema(ctx context.Context, name string) error
- func (s *PGDocStore) Delete(ctx context.Context, req DeleteRequest) error
- func (s *PGDocStore) DeleteMetricKind(ctx context.Context, name string) error
- func (s *PGDocStore) DeleteStatusRule(ctx context.Context, name string) error
- func (s *PGDocStore) GetActiveSchemas(ctx context.Context) ([]*Schema, error)
- func (s *PGDocStore) GetCompactedEventlog(ctx context.Context, req GetCompactedEventlogRequest) ([]Event, error)
- func (s *PGDocStore) GetDocument(ctx context.Context, uuid uuid.UUID, version int64) (*newsdoc.Document, error)
- func (s *PGDocStore) GetDocumentACL(ctx context.Context, uuid uuid.UUID) ([]ACLEntry, error)
- func (s *PGDocStore) GetDocumentMeta(ctx context.Context, uuid uuid.UUID) (*DocumentMeta, error)
- func (s *PGDocStore) GetEventlog(ctx context.Context, after int64, limit int32) ([]Event, error)
- func (s *PGDocStore) GetLastEvent(ctx context.Context) (*Event, error)
- func (s *PGDocStore) GetLastEventID(ctx context.Context) (int64, error)
- func (s *PGDocStore) GetMetricKind(ctx context.Context, name string) (*MetricKind, error)
- func (s *PGDocStore) GetMetricKinds(ctx context.Context) ([]*MetricKind, error)
- func (s *PGDocStore) GetReport(ctx context.Context, name string) (*StoredReport, error)
- func (s *PGDocStore) GetSchema(ctx context.Context, name string, version string) (*Schema, error)
- func (s *PGDocStore) GetSchemaVersions(ctx context.Context) (map[string]string, error)
- func (s *PGDocStore) GetSinkPosition(ctx context.Context, name string) (int64, error)
- func (s *PGDocStore) GetStatusHistory(ctx context.Context, uuid uuid.UUID, name string, before int64, count int) ([]Status, error)
- func (s *PGDocStore) GetStatusRules(ctx context.Context) ([]StatusRule, error)
- func (s *PGDocStore) GetStatuses(ctx context.Context) ([]DocumentStatus, error)
- func (s *PGDocStore) GetVersion(ctx context.Context, uuid uuid.UUID, version int64) (DocumentUpdate, error)
- func (s *PGDocStore) GetVersionHistory(ctx context.Context, uuid uuid.UUID, before int64, count int) ([]DocumentUpdate, error)
- func (s *PGDocStore) Lock(ctx context.Context, req LockRequest) (LockResult, error)
- func (s *PGDocStore) OnArchivedUpdate(ctx context.Context, ch chan ArchivedEvent)
- func (s *PGDocStore) OnEventlog(ctx context.Context, ch chan int64)
- func (s *PGDocStore) OnSchemaUpdate(ctx context.Context, ch chan SchemaEvent)
- func (s *PGDocStore) OnWorkflowUpdate(ctx context.Context, ch chan WorkflowEvent)
- func (s *PGDocStore) RegisterMetricKind(ctx context.Context, name string, aggregation Aggregation) error
- func (s *PGDocStore) RegisterOrIncrementMetric(ctx context.Context, metric Metric) error
- func (s *PGDocStore) RegisterOrReplaceMetric(ctx context.Context, metric Metric) error
- func (s *PGDocStore) RegisterSchema(ctx context.Context, req RegisterSchemaRequest) error
- func (s *PGDocStore) RunCleaner(ctx context.Context, period time.Duration)
- func (s *PGDocStore) RunListener(ctx context.Context)
- func (s *PGDocStore) SetSinkPosition(ctx context.Context, name string, pos int64) error
- func (s *PGDocStore) Unlock(ctx context.Context, uuid uuid.UUID, token string) error
- func (s *PGDocStore) Update(ctx context.Context, workflows WorkflowProvider, requests []*UpdateRequest) ([]DocumentUpdate, error)
- func (s *PGDocStore) UpdateLock(ctx context.Context, req UpdateLockRequest) (LockResult, error)
- func (s *PGDocStore) UpdateReport(ctx context.Context, report Report, enabled bool) (time.Time, error)
- func (s *PGDocStore) UpdateStatus(ctx context.Context, req UpdateStatusRequest) error
- func (s *PGDocStore) UpdateStatusRule(ctx context.Context, rule StatusRule) error
- type PGDocStoreOptions
- type PGReplication
- type Permission
- type Queryer
- type RegisterSchemaRequest
- type Report
- type ReportObject
- type ReportQuery
- type ReportResult
- type ReportRunner
- type ReportRunnerOptions
- type ReportStore
- type ReportValueProcess
- type Reporter
- type ReportsService
- func (s *ReportsService) Get(ctx context.Context, req *repository.GetReportRequest) (*repository.GetReportResponse, error)
- func (s *ReportsService) Run(ctx context.Context, req *repository.RunReportRequest) (*repository.RunReportResponse, error)
- func (s *ReportsService) Test(ctx context.Context, req *repository.TestReportRequest) (*repository.TestReportResponse, error)
- func (s *ReportsService) Update(ctx context.Context, req *repository.UpdateReportRequest) (*repository.UpdateReportResponse, error)
- type RouterOption
- func WithDocumentsAPI(service repository.Documents, opts ServerOptions) RouterOption
- func WithJWKSEndpoint(jwtKey *ecdsa.PrivateKey) RouterOption
- func WithMetricsAPI(service repository.Metrics, opts ServerOptions) RouterOption
- func WithReportsAPI(service repository.Reports, opts ServerOptions) RouterOption
- func WithSchemasAPI(service repository.Schemas, opts ServerOptions) RouterOption
- func WithTokenEndpoint(jwtKey *ecdsa.PrivateKey, sharedSecret string) RouterOption
- func WithWorkflowsAPI(service repository.Workflows, opts ServerOptions) RouterOption
- type S3Options
- type Schema
- type SchemaEvent
- type SchemaEventType
- type SchemaLoader
- type SchemaStore
- type SchemasService
- func (a *SchemasService) Get(ctx context.Context, req *repository.GetSchemaRequest) (*repository.GetSchemaResponse, error)
- func (a *SchemasService) GetAllActive(ctx context.Context, req *repository.GetAllActiveSchemasRequest) (*repository.GetAllActiveSchemasResponse, error)
- func (a *SchemasService) Register(ctx context.Context, req *repository.RegisterSchemaRequest) (*repository.RegisterSchemaResponse, error)
- func (a *SchemasService) SetActive(ctx context.Context, req *repository.SetActiveSchemaRequest) (*repository.SetActiveSchemaResponse, error)
- type ServerOptions
- type SigningKey
- type SigningKeySet
- type SpreadsheetReporter
- type Status
- type StatusRule
- type StatusRuleError
- type StatusRuleInput
- type StatusRuleViolation
- type StatusUpdate
- type StoredReport
- type TableReporter
- type TokenResponse
- type TupleDecoder
- type UpdateLockRequest
- type UpdateRequest
- type UpdateStatusRequest
- type Validator
- type WorkflowEvent
- type WorkflowEventType
- type WorkflowLoader
- type WorkflowProvider
- type WorkflowStore
- type Workflows
- type WorkflowsService
- func (s *WorkflowsService) CreateStatusRule(ctx context.Context, req *repository.CreateStatusRuleRequest) (*repository.CreateStatusRuleResponse, error)
- func (s *WorkflowsService) DeleteStatusRule(ctx context.Context, req *repository.DeleteStatusRuleRequest) (*repository.DeleteStatusRuleResponse, error)
- func (s *WorkflowsService) GetStatusRules(ctx context.Context, _ *repository.GetStatusRulesRequest) (*repository.GetStatusRulesResponse, error)
- func (s *WorkflowsService) GetStatuses(ctx context.Context, _ *repository.GetStatusesRequest) (*repository.GetStatusesResponse, error)
- func (s *WorkflowsService) UpdateStatus(ctx context.Context, req *repository.UpdateStatusRequest) (*repository.UpdateStatusResponse, error)
Constants ¶
const ( PermissionCheckDenied = iota PermissionCheckAllowed PermissionCheckNoSuchDocument )
const ( ScopeDocumentAdmin = "doc_admin" ScopeDocumentReadAll = "doc_read_all" ScopeDocumentRead = "doc_read" ScopeDocumentDelete = "doc_delete" ScopeDocumentWrite = "doc_write" ScopeDocumentImport = "doc_import" ScopeEventlogRead = "eventlog_read" ScopeMetricsAdmin = "metrics_admin" ScopeMetricsWrite = "metrics_write" ScopeReportAdmin = "report_admin" ScopeReportRun = "report_run" ScopeSchemaAdmin = "schema_admin" ScopeSchemaRead = "schema_read" ScopeWorkflowAdmin = "workflow_admin" )
const ( LockSigningKeys = elephantCRC + 1 LockLogicalReplication = elephantCRC + 2 )
const (
SQLCodeObjectInUse = "55006"
)
https://www.postgresql.org/docs/current/errcodes-appendix.html#ERRCODES-TABLE
Variables ¶
This section is empty.
Functions ¶
func DocStoreErrorf ¶
func DocStoreErrorf(code DocStoreErrorCode, format string, a ...any) error
func EnsureCoreSchema ¶
func EnsureCoreSchema(ctx context.Context, store SchemaStore) error
func EnsureSchema ¶
func EnsureSchema( ctx context.Context, store SchemaStore, name string, version string, schema revisor.ConstraintSet, ) error
func EntityRefToRPC ¶
func EntityRefToRPC(ref []revisor.EntityRef) []*repository.EntityRef
func EventToRPC ¶
func EventToRPC(evt Event) *repository.EventlogItem
func IsDocStoreErrorCode ¶
func IsDocStoreErrorCode(err error, code DocStoreErrorCode) bool
func ReportToRPC ¶
func ReportToRPC(r Report) *repository.Report
func RequireAnyScope ¶
func SetUpRouter ¶
func SetUpRouter( router *httprouter.Router, opts ...RouterOption, ) error
func ToMetricAggregation ¶
func ToMetricAggregation(a Aggregation) (repository.MetricAggregation, error)
func ValidateLabel ¶
func ValueProcessingFromRPC ¶
func ValueProcessingFromRPC( r []*repository.ReportValue, ) (map[string][]ReportValueProcess, error)
func ValueProcessingToRPC ¶
func ValueProcessingToRPC( v map[string][]ReportValueProcess, ) []*repository.ReportValue
Types ¶
type Aggregation ¶
type Aggregation int16
const ( AggregationNone Aggregation = 0 AggregationReplace Aggregation = 1 AggregationIncrement Aggregation = 2 )
func ToAggregation ¶
func ToAggregation(ma repository.MetricAggregation) (Aggregation, error)
type ArchiveEventType ¶
type ArchiveEventType int
const ( ArchiveEventTypeStatus ArchiveEventType = iota ArchiveEventTypeVersion )
type ArchiveSignature ¶
func NewArchiveSignature ¶
func NewArchiveSignature( key *SigningKey, hash [sha256.Size]byte, ) (*ArchiveSignature, error)
func ParseArchiveSignature ¶
func ParseArchiveSignature(sg string) (*ArchiveSignature, error)
func (*ArchiveSignature) String ¶
func (as *ArchiveSignature) String() string
func (*ArchiveSignature) Verify ¶
func (as *ArchiveSignature) Verify(key *SigningKey) error
type ArchivedDocumentStatus ¶
type ArchivedDocumentStatus struct { UUID uuid.UUID `json:"uuid"` Name string `json:"name"` ID int64 `json:"id"` Version int64 `json:"version"` Created time.Time `json:"created"` CreatorURI string `json:"creator_uri"` Meta json.RawMessage `json:"meta,omitempty"` ParentSignature string `json:"parent_signature,omitempty"` VersionSignature string `json:"version_signature"` }
type ArchivedDocumentVersion ¶
type ArchivedDocumentVersion struct { UUID uuid.UUID `json:"uuid"` Version int64 `json:"version"` Created time.Time `json:"created"` CreatorURI string `json:"creator_uri"` Meta json.RawMessage `json:"meta,omitempty"` DocumentData json.RawMessage `json:"document_data"` ParentSignature string `json:"parent_signature,omitempty"` }
type ArchivedEvent ¶
type ArchivedEvent struct { Type ArchiveEventType `json:"type"` UUID uuid.UUID `json:"uuid"` // Version is the version of a document or the ID of a status. Version int64 `json:"version"` Name string `json:"name,omitempty"` }
type Archiver ¶
type Archiver struct {
// contains filtered or unexported fields
}
Archiver reads unarchived document versions, and statuses and writes a copy to S3. It does this using SELECT ... FOR UPDATE SKIP LOCKED.
func NewArchiver ¶
func NewArchiver(opts ArchiverOptions) (*Archiver, error)
type ArchiverOptions ¶
type ArchiverOptions struct { Logger *slog.Logger S3 *s3.Client Bucket string DB *pgxpool.Pool MetricsRegisterer prometheus.Registerer }
type CheckPermissionRequest ¶
type CheckPermissionRequest struct { UUID uuid.UUID GranteeURIs []string Permission Permission }
type CheckPermissionResult ¶
type CheckPermissionResult int
type DeleteRequest ¶
type DocStore ¶
type DocStore interface { GetDocumentMeta( ctx context.Context, uuid uuid.UUID) (*DocumentMeta, error) GetDocument( ctx context.Context, uuid uuid.UUID, version int64, ) (*newsdoc.Document, error) GetVersion( ctx context.Context, uuid uuid.UUID, version int64, ) (DocumentUpdate, error) GetVersionHistory( ctx context.Context, uuid uuid.UUID, before int64, count int, ) ([]DocumentUpdate, error) Update( ctx context.Context, workflows WorkflowProvider, update []*UpdateRequest, ) ([]DocumentUpdate, error) Delete(ctx context.Context, req DeleteRequest) error CheckPermission( ctx context.Context, req CheckPermissionRequest, ) (CheckPermissionResult, error) GetEventlog( ctx context.Context, after int64, limit int32, ) ([]Event, error) GetLastEvent( ctx context.Context, ) (*Event, error) GetLastEventID( ctx context.Context, ) (int64, error) GetCompactedEventlog( ctx context.Context, req GetCompactedEventlogRequest, ) ([]Event, error) OnEventlog( ctx context.Context, ch chan int64, ) GetStatusHistory( ctx context.Context, uuid uuid.UUID, name string, before int64, count int, ) ([]Status, error) GetDocumentACL( ctx context.Context, uuid uuid.UUID, ) ([]ACLEntry, error) Lock( ctx context.Context, req LockRequest, ) (LockResult, error) UpdateLock( ctx context.Context, req UpdateLockRequest, ) (LockResult, error) Unlock( ctx context.Context, uuid uuid.UUID, token string, ) error }
type DocStoreError ¶
type DocStoreError struct {
// contains filtered or unexported fields
}
func (DocStoreError) Error ¶
func (e DocStoreError) Error() string
func (DocStoreError) Unwrap ¶
func (e DocStoreError) Unwrap() error
type DocStoreErrorCode ¶
type DocStoreErrorCode string
DocStoreErrorCode TODO: Rename to StoreErrorCode and consistently rename all dependent types and methods.
const ( NoErrCode DocStoreErrorCode = "" ErrCodeNotFound DocStoreErrorCode = "not-found" ErrCodeNoSuchLock DocStoreErrorCode = "no-such-lock" ErrCodeOptimisticLock DocStoreErrorCode = "optimistic-lock" ErrCodeDeleteLock DocStoreErrorCode = "delete-lock" ErrCodeBadRequest DocStoreErrorCode = "bad-request" ErrCodeExists DocStoreErrorCode = "exists" ErrCodePermissionDenied DocStoreErrorCode = "permission-denied" ErrCodeFailedPrecondition DocStoreErrorCode = "failed-precondition" ErrCodeDocumentLock DocStoreErrorCode = "document-lock" )
func GetDocStoreErrorCode ¶
func GetDocStoreErrorCode(err error) DocStoreErrorCode
type DocumentMeta ¶
type DocumentStatus ¶
type DocumentUpdate ¶
type DocumentValidator ¶
type DocumentValidator interface {
ValidateDocument(document *newsdoc.Document) []revisor.ValidationResult
}
type DocumentsService ¶
type DocumentsService struct {
// contains filtered or unexported fields
}
func NewDocumentsService ¶
func NewDocumentsService( store DocStore, validator DocumentValidator, workflows WorkflowProvider, defaultLanguage string, ) *DocumentsService
func (*DocumentsService) BulkUpdate ¶ added in v0.4.0
func (a *DocumentsService) BulkUpdate( ctx context.Context, req *repository.BulkUpdateRequest, ) (*repository.BulkUpdateResponse, error)
BulkUpdate implements repository.Documents.
func (*DocumentsService) CompactedEventlog ¶ added in v0.4.0
func (a *DocumentsService) CompactedEventlog( ctx context.Context, req *repository.GetCompactedEventlogRequest, ) (*repository.GetCompactedEventlogResponse, error)
CompactedEventlog implements repository.Documents.
func (*DocumentsService) Delete ¶
func (a *DocumentsService) Delete( ctx context.Context, req *repository.DeleteDocumentRequest, ) (*repository.DeleteDocumentResponse, error)
Delete implements repository.Documents.
func (*DocumentsService) Eventlog ¶
func (a *DocumentsService) Eventlog( ctx context.Context, req *repository.GetEventlogRequest, ) (*repository.GetEventlogResponse, error)
Eventlog returns document update events, optionally waiting for new events.
func (*DocumentsService) ExtendLock ¶
func (a *DocumentsService) ExtendLock( ctx context.Context, req *repository.ExtendLockRequest, ) (*repository.LockResponse, error)
ExtendLock extends the expiration of an existing lock.
func (*DocumentsService) Get ¶
func (a *DocumentsService) Get( ctx context.Context, req *repository.GetDocumentRequest, ) (*repository.GetDocumentResponse, error)
Get implements repository.Documents.
func (*DocumentsService) GetHistory ¶
func (a *DocumentsService) GetHistory( ctx context.Context, req *repository.GetHistoryRequest, ) (*repository.GetHistoryResponse, error)
GetHistory implements repository.Documents.
func (*DocumentsService) GetMeta ¶
func (a *DocumentsService) GetMeta( ctx context.Context, req *repository.GetMetaRequest, ) (*repository.GetMetaResponse, error)
GetMeta implements repository.Documents.
func (*DocumentsService) GetPermissions ¶
func (a *DocumentsService) GetPermissions( ctx context.Context, req *repository.GetPermissionsRequest, ) (*repository.GetPermissionsResponse, error)
GetPermissions returns the permissions you have for the document.
func (*DocumentsService) GetStatusHistory ¶
func (a *DocumentsService) GetStatusHistory( ctx context.Context, req *repository.GetStatusHistoryRequest, ) (*repository.GetStatusHistoryReponse, error)
GetStatusHistory returns the history of a status for a document.
func (*DocumentsService) Lock ¶
func (a *DocumentsService) Lock( ctx context.Context, req *repository.LockRequest, ) (*repository.LockResponse, error)
func (*DocumentsService) Unlock ¶
func (a *DocumentsService) Unlock( ctx context.Context, req *repository.UnlockRequest, ) (*repository.UnlockResponse, error)
func (*DocumentsService) Update ¶
func (a *DocumentsService) Update( ctx context.Context, req *repository.UpdateRequest, ) (*repository.UpdateResponse, error)
Update implements repository.Documents.
func (*DocumentsService) Validate ¶
func (a *DocumentsService) Validate( _ context.Context, req *repository.ValidateRequest, ) (*repository.ValidateResponse, error)
Validate implements repository.Documents.
type Event ¶
type Event struct { ID int64 `json:"id"` Event EventType `json:"event"` UUID uuid.UUID `json:"uuid"` Timestamp time.Time `json:"timestamp"` Updater string `json:"updater"` Type string `json:"type,omitempty"` Version int64 `json:"version,omitempty"` StatusID int64 `json:"status_id,omitempty"` Status string `json:"status,omitempty"` ACL []ACLEntry `json:"acl,omitempty"` }
func RPCToEvent ¶
func RPCToEvent(evt *repository.EventlogItem) (Event, error)
type FanOut ¶
type FanOut[T any] struct { // contains filtered or unexported fields }
type GetCompactedEventlogRequest ¶ added in v0.4.0
type LockRequest ¶
type MetricKind ¶
type MetricKind struct { Name string Aggregation Aggregation }
type MetricStore ¶
type MetricStore interface { RegisterMetricKind( ctx context.Context, name string, aggregation Aggregation, ) error DeleteMetricKind( ctx context.Context, name string, ) error GetMetricKind( ctx context.Context, name string, ) (*MetricKind, error) GetMetricKinds( ctx context.Context, ) ([]*MetricKind, error) RegisterOrReplaceMetric( ctx context.Context, metric Metric, ) error RegisterOrIncrementMetric( ctx context.Context, metric Metric, ) error }
type MetricsService ¶
type MetricsService struct {
// contains filtered or unexported fields
}
func NewMetricsService ¶
func NewMetricsService(store MetricStore) *MetricsService
func (*MetricsService) DeleteKind ¶
func (m *MetricsService) DeleteKind( ctx context.Context, req *repository.DeleteMetricKindRequest, ) (*repository.DeleteMetricKindResponse, error)
DeleteKind implements repository.Metrics.
func (*MetricsService) GetKinds ¶
func (m *MetricsService) GetKinds( ctx context.Context, _ *repository.GetMetricKindsRequest, ) (*repository.GetMetricKindsResponse, error)
GetKinds implements repository.Metrics.
func (*MetricsService) RegisterKind ¶
func (m *MetricsService) RegisterKind( ctx context.Context, req *repository.RegisterMetricKindRequest, ) (*repository.RegisterMetricKindResponse, error)
RegisterKind implements repository.Metrics.
func (*MetricsService) RegisterMetric ¶
func (m *MetricsService) RegisterMetric( ctx context.Context, req *repository.RegisterMetricRequest, ) (*repository.RegisterMetricResponse, error)
RegisterMetric implements repository.Metrics.
type NotifyChannel ¶
type NotifyChannel string
const ( NotifySchemasUpdated NotifyChannel = "schemas" NotifyArchived NotifyChannel = "archived" NotifyWorkflowsUpdated NotifyChannel = "workflows" NotifyEventlog NotifyChannel = "eventlog" )
type PGDocStore ¶
type PGDocStore struct {
// contains filtered or unexported fields
}
func NewPGDocStore ¶
func NewPGDocStore( logger *slog.Logger, pool *pgxpool.Pool, options PGDocStoreOptions, ) (*PGDocStore, error)
func (*PGDocStore) ActivateSchema ¶
func (s *PGDocStore) ActivateSchema( ctx context.Context, name, version string, ) error
RegisterSchema implements DocStore.
func (*PGDocStore) CheckPermission ¶
func (s *PGDocStore) CheckPermission( ctx context.Context, req CheckPermissionRequest, ) (CheckPermissionResult, error)
CheckPermission implements DocStore.
func (*PGDocStore) DeactivateSchema ¶
func (s *PGDocStore) DeactivateSchema(ctx context.Context, name string) error
DeactivateSchema implements DocStore.
func (*PGDocStore) Delete ¶
func (s *PGDocStore) Delete(ctx context.Context, req DeleteRequest) error
Delete implements DocStore.
func (*PGDocStore) DeleteMetricKind ¶
func (s *PGDocStore) DeleteMetricKind( ctx context.Context, name string, ) error
func (*PGDocStore) DeleteStatusRule ¶
func (s *PGDocStore) DeleteStatusRule( ctx context.Context, name string, ) error
func (*PGDocStore) GetActiveSchemas ¶
func (s *PGDocStore) GetActiveSchemas( ctx context.Context, ) ([]*Schema, error)
GetActiveSchemas implements DocStore.
func (*PGDocStore) GetCompactedEventlog ¶ added in v0.4.0
func (s *PGDocStore) GetCompactedEventlog( ctx context.Context, req GetCompactedEventlogRequest, ) ([]Event, error)
func (*PGDocStore) GetDocument ¶
func (s *PGDocStore) GetDocument( ctx context.Context, uuid uuid.UUID, version int64, ) (*newsdoc.Document, error)
GetDocument implements DocStore.
func (*PGDocStore) GetDocumentACL ¶
func (*PGDocStore) GetDocumentMeta ¶
func (s *PGDocStore) GetDocumentMeta( ctx context.Context, uuid uuid.UUID, ) (*DocumentMeta, error)
GetDocumentMeta implements DocStore.
func (*PGDocStore) GetEventlog ¶
func (*PGDocStore) GetLastEvent ¶ added in v0.4.0
func (s *PGDocStore) GetLastEvent( ctx context.Context, ) (*Event, error)
func (*PGDocStore) GetLastEventID ¶ added in v0.4.0
func (s *PGDocStore) GetLastEventID(ctx context.Context) (int64, error)
GetLastEventID implements DocStore.
func (*PGDocStore) GetMetricKind ¶
func (s *PGDocStore) GetMetricKind( ctx context.Context, name string, ) (*MetricKind, error)
func (*PGDocStore) GetMetricKinds ¶
func (s *PGDocStore) GetMetricKinds( ctx context.Context, ) ([]*MetricKind, error)
func (*PGDocStore) GetReport ¶
func (s *PGDocStore) GetReport( ctx context.Context, name string, ) (*StoredReport, error)
func (*PGDocStore) GetSchema ¶
func (s *PGDocStore) GetSchema( ctx context.Context, name string, version string, ) (*Schema, error)
GetSchema implements DocStore.
func (*PGDocStore) GetSchemaVersions ¶
func (*PGDocStore) GetSinkPosition ¶
func (*PGDocStore) GetStatusHistory ¶
func (*PGDocStore) GetStatusRules ¶
func (s *PGDocStore) GetStatusRules(ctx context.Context) ([]StatusRule, error)
func (*PGDocStore) GetStatuses ¶
func (s *PGDocStore) GetStatuses(ctx context.Context) ([]DocumentStatus, error)
func (*PGDocStore) GetVersion ¶
func (s *PGDocStore) GetVersion( ctx context.Context, uuid uuid.UUID, version int64, ) (DocumentUpdate, error)
func (*PGDocStore) GetVersionHistory ¶
func (s *PGDocStore) GetVersionHistory( ctx context.Context, uuid uuid.UUID, before int64, count int, ) ([]DocumentUpdate, error)
func (*PGDocStore) Lock ¶
func (s *PGDocStore) Lock(ctx context.Context, req LockRequest) (LockResult, error)
func (*PGDocStore) OnArchivedUpdate ¶
func (s *PGDocStore) OnArchivedUpdate( ctx context.Context, ch chan ArchivedEvent, )
OnSchemaUpdate notifies the channel ch of all archived status updates. Subscription is automatically cancelled once the context is cancelled.
Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.
func (*PGDocStore) OnEventlog ¶
func (s *PGDocStore) OnEventlog( ctx context.Context, ch chan int64, )
OnEventlog notifies the channel ch of all new eventlog IDs. Subscription is automatically cancelled once the context is cancelled.
Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.
func (*PGDocStore) OnSchemaUpdate ¶
func (s *PGDocStore) OnSchemaUpdate( ctx context.Context, ch chan SchemaEvent, )
OnSchemaUpdate notifies the channel ch of all schema updates. Subscription is automatically cancelled once the context is cancelled.
Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.
func (*PGDocStore) OnWorkflowUpdate ¶
func (s *PGDocStore) OnWorkflowUpdate( ctx context.Context, ch chan WorkflowEvent, )
OnWorkflowUpdate notifies the channel ch of all workflow updates. Subscription is automatically cancelled once the context is cancelled.
Note that we don't provide any delivery guarantees for these events. non-blocking send is used on ch, so if it's unbuffered events will be discarded if the receiver is busy.
func (*PGDocStore) RegisterMetricKind ¶
func (s *PGDocStore) RegisterMetricKind( ctx context.Context, name string, aggregation Aggregation, ) error
func (*PGDocStore) RegisterOrIncrementMetric ¶
func (s *PGDocStore) RegisterOrIncrementMetric(ctx context.Context, metric Metric) error
RegisterMetric implements MetricStore.
func (*PGDocStore) RegisterOrReplaceMetric ¶
func (s *PGDocStore) RegisterOrReplaceMetric(ctx context.Context, metric Metric) error
RegisterMetric implements MetricStore.
func (*PGDocStore) RegisterSchema ¶
func (s *PGDocStore) RegisterSchema( ctx context.Context, req RegisterSchemaRequest, ) error
RegisterSchema implements DocStore.
func (*PGDocStore) RunCleaner ¶ added in v0.3.0
func (s *PGDocStore) RunCleaner(ctx context.Context, period time.Duration)
func (*PGDocStore) RunListener ¶
func (s *PGDocStore) RunListener(ctx context.Context)
RunListener opens a connection to the database and subscribes to all store notifications.
func (*PGDocStore) SetSinkPosition ¶
func (*PGDocStore) Update ¶
func (s *PGDocStore) Update( ctx context.Context, workflows WorkflowProvider, requests []*UpdateRequest, ) ([]DocumentUpdate, error)
Update implements DocStore.
func (*PGDocStore) UpdateLock ¶
func (s *PGDocStore) UpdateLock(ctx context.Context, req UpdateLockRequest) (LockResult, error)
func (*PGDocStore) UpdateReport ¶
func (*PGDocStore) UpdateStatus ¶
func (s *PGDocStore) UpdateStatus( ctx context.Context, req UpdateStatusRequest, ) error
func (*PGDocStore) UpdateStatusRule ¶
func (s *PGDocStore) UpdateStatusRule( ctx context.Context, rule StatusRule, ) error
type PGDocStoreOptions ¶
type PGReplication ¶
type PGReplication struct {
// contains filtered or unexported fields
}
func NewPGReplication ¶
func NewPGReplication( logger *slog.Logger, pool *pgxpool.Pool, dbURI string, slotName string, metricsRegisterer prometheus.Registerer, ) (*PGReplication, error)
func (*PGReplication) Run ¶
func (pr *PGReplication) Run(ctx context.Context)
func (*PGReplication) Started ¶
func (pr *PGReplication) Started() <-chan bool
Started emits true as a signal every time replication has started.
func (*PGReplication) Stop ¶
func (pr *PGReplication) Stop()
type Permission ¶
type Permission string
const ( ReadPermission Permission = "r" WritePermission Permission = "w" )
func (Permission) Name ¶
func (p Permission) Name() string
type RegisterSchemaRequest ¶
type RegisterSchemaRequest struct { Name string Version string Specification revisor.ConstraintSet Activate bool }
type Report ¶
type Report struct { Name string `json:"name"` Title string `json:"title"` CronExpression string `json:"cron_expression"` CronTimezone string `json:"cron_timezone"` GenerateSheet bool `json:"generate_sheet,omitempty"` Queries []ReportQuery `json:"queries"` SlackChannels []string `json:"slack_channels,omitempty"` }
func ReportFromRPC ¶
func ReportFromRPC(r *repository.Report) (Report, error)
type ReportObject ¶
type ReportQuery ¶
type ReportQuery struct { Name string `json:"name"` SQL string `json:"sql"` ValueProcessing map[string][]ReportValueProcess `json:"value_processing,omitempty"` Summarise []int `json:"summarize,omitempty"` }
type ReportResult ¶
func GenerateReport ¶
type ReportRunner ¶
type ReportRunner struct {
// contains filtered or unexported fields
}
func NewReportRunner ¶
func NewReportRunner(opts ReportRunnerOptions) (*ReportRunner, error)
func (*ReportRunner) Run ¶
func (r *ReportRunner) Run(ctx context.Context)
func (*ReportRunner) Stop ¶
func (r *ReportRunner) Stop()
type ReportRunnerOptions ¶
type ReportRunnerOptions struct { Logger *slog.Logger S3 *s3.Client Bucket string // ReportQueryer should be a read-only connection to the database with // access to the tables `document`, `delete_record`, `document_version`, // `document_status`, `status_heads`, `acl`, `acl_audit`. ReportQueryer Queryer // DB should be a normal database connection with full repository // access. DB *pgxpool.Pool MetricsRegisterer prometheus.Registerer }
type ReportStore ¶
type ReportValueProcess ¶
type ReportValueProcess string
const (
ReportHTMLDecode ReportValueProcess = "html_decode"
)
type ReportsService ¶
type ReportsService struct {
// contains filtered or unexported fields
}
func NewReportsService ¶
func NewReportsService( logger *slog.Logger, store ReportStore, reportingDB Queryer, ) *ReportsService
func (*ReportsService) Get ¶
func (s *ReportsService) Get( ctx context.Context, req *repository.GetReportRequest, ) (*repository.GetReportResponse, error)
Get a report.
func (*ReportsService) Run ¶
func (s *ReportsService) Run( ctx context.Context, req *repository.RunReportRequest, ) (*repository.RunReportResponse, error)
Test a report. This will run the report and return the results instead of sending it to any outputs.
func (*ReportsService) Test ¶
func (s *ReportsService) Test( ctx context.Context, req *repository.TestReportRequest, ) (*repository.TestReportResponse, error)
func (*ReportsService) Update ¶
func (s *ReportsService) Update( ctx context.Context, req *repository.UpdateReportRequest, ) (*repository.UpdateReportResponse, error)
Update or create a report.
type RouterOption ¶
type RouterOption func(router *httprouter.Router) error
func WithDocumentsAPI ¶
func WithDocumentsAPI( service repository.Documents, opts ServerOptions, ) RouterOption
func WithJWKSEndpoint ¶
func WithJWKSEndpoint(jwtKey *ecdsa.PrivateKey) RouterOption
func WithMetricsAPI ¶
func WithMetricsAPI( service repository.Metrics, opts ServerOptions, ) RouterOption
func WithReportsAPI ¶
func WithReportsAPI( service repository.Reports, opts ServerOptions, ) RouterOption
func WithSchemasAPI ¶
func WithSchemasAPI( service repository.Schemas, opts ServerOptions, ) RouterOption
func WithTokenEndpoint ¶
func WithTokenEndpoint( jwtKey *ecdsa.PrivateKey, sharedSecret string, ) RouterOption
func WithWorkflowsAPI ¶
func WithWorkflowsAPI( service repository.Workflows, opts ServerOptions, ) RouterOption
type SchemaEvent ¶
type SchemaEvent struct { Type SchemaEventType `json:"type"` Name string `json:"name"` }
type SchemaEventType ¶
type SchemaEventType int
const ( SchemaEventTypeActivation SchemaEventType = iota SchemaEventTypeDeactivation )
type SchemaLoader ¶
type SchemaStore ¶
type SchemaStore interface { RegisterSchema( ctx context.Context, req RegisterSchemaRequest, ) error ActivateSchema( ctx context.Context, name, version string, ) error DeactivateSchema( ctx context.Context, name string, ) error GetSchema( ctx context.Context, name, version string, ) (*Schema, error) GetActiveSchemas(ctx context.Context) ([]*Schema, error) GetSchemaVersions(ctx context.Context) (map[string]string, error) OnSchemaUpdate(ctx context.Context, ch chan SchemaEvent) }
type SchemasService ¶
type SchemasService struct {
// contains filtered or unexported fields
}
func NewSchemasService ¶
func NewSchemasService(logger *slog.Logger, store SchemaStore) *SchemasService
func (*SchemasService) Get ¶
func (a *SchemasService) Get( ctx context.Context, req *repository.GetSchemaRequest, ) (*repository.GetSchemaResponse, error)
Get retrieves a schema.
func (*SchemasService) GetAllActive ¶
func (a *SchemasService) GetAllActive( ctx context.Context, req *repository.GetAllActiveSchemasRequest, ) (*repository.GetAllActiveSchemasResponse, error)
GetAllActiveSchemas returns the currently active schemas.
func (*SchemasService) Register ¶
func (a *SchemasService) Register( ctx context.Context, req *repository.RegisterSchemaRequest, ) (*repository.RegisterSchemaResponse, error)
Register register a new validation schema version.
func (*SchemasService) SetActive ¶
func (a *SchemasService) SetActive( ctx context.Context, req *repository.SetActiveSchemaRequest, ) (*repository.SetActiveSchemaResponse, error)
SetActive activates schema versions.
type ServerOptions ¶
type ServerOptions struct { Hooks *twirp.ServerHooks AuthMiddleware func( w http.ResponseWriter, r *http.Request, next http.Handler, ) error }
func (*ServerOptions) SetJWTValidation ¶
func (so *ServerOptions) SetJWTValidation(jwtKey *ecdsa.PrivateKey)
type SigningKey ¶
type SigningKeySet ¶
type SigningKeySet struct { Keys []SigningKey `json:"keys"` // contains filtered or unexported fields }
func (*SigningKeySet) CurrentKey ¶
func (s *SigningKeySet) CurrentKey(t time.Time) *SigningKey
func (*SigningKeySet) GetKeyByID ¶
func (s *SigningKeySet) GetKeyByID(kid string) *SigningKey
func (*SigningKeySet) LatestKey ¶
func (s *SigningKeySet) LatestKey() *SigningKey
func (*SigningKeySet) Replace ¶
func (s *SigningKeySet) Replace(keys []SigningKey)
type SpreadsheetReporter ¶
type SpreadsheetReporter struct { File *excelize.File // contains filtered or unexported fields }
func NewSpreadsheetReporter ¶
func NewSpreadsheetReporter() *SpreadsheetReporter
func (*SpreadsheetReporter) AddHeader ¶
func (sr *SpreadsheetReporter) AddHeader(q ReportQuery, columns []string) error
func (*SpreadsheetReporter) AddRow ¶
func (sr *SpreadsheetReporter) AddRow(values []any) error
func (*SpreadsheetReporter) QueryDone ¶
func (sr *SpreadsheetReporter) QueryDone() error
type StatusRule ¶
type StatusRuleError ¶
type StatusRuleError struct {
Violations []StatusRuleViolation
}
func (StatusRuleError) Error ¶
func (err StatusRuleError) Error() string
type StatusRuleInput ¶
type StatusRuleViolation ¶
type StatusUpdate ¶
func RPCToStatusUpdate ¶
func RPCToStatusUpdate(update []*repository.StatusUpdate) []StatusUpdate
type TableReporter ¶
func NewTableReporter ¶
func NewTableReporter() *TableReporter
func (*TableReporter) AddHeader ¶
func (tr *TableReporter) AddHeader(q ReportQuery, columns []string) error
func (*TableReporter) AddRow ¶
func (tr *TableReporter) AddRow(values []any) error
func (*TableReporter) QueryDone ¶
func (tr *TableReporter) QueryDone() error
type TokenResponse ¶
type TupleDecoder ¶
type TupleDecoder struct {
// contains filtered or unexported fields
}
func NewTupleDecoder ¶
func NewTupleDecoder() *TupleDecoder
func (*TupleDecoder) DecodeValues ¶
func (td *TupleDecoder) DecodeValues( relation uint32, tuple *pglogrepl.TupleData, ) (*pglogrepl.RelationMessage, map[string]interface{}, error)
func (*TupleDecoder) GetRelation ¶
func (td *TupleDecoder) GetRelation(id uint32) (*pglogrepl.RelationMessage, bool)
func (*TupleDecoder) RegisterRelation ¶
func (td *TupleDecoder) RegisterRelation(rel *pglogrepl.RelationMessage)
type UpdateRequest ¶
type UpdateStatusRequest ¶
type Validator ¶
type Validator struct {
// contains filtered or unexported fields
}
func NewValidator ¶
func (*Validator) GetValidator ¶
func (*Validator) ValidateDocument ¶
func (v *Validator) ValidateDocument(document *newsdoc.Document) []revisor.ValidationResult
type WorkflowEvent ¶
type WorkflowEvent struct { Type WorkflowEventType `json:"type"` Name string `json:"name"` }
type WorkflowEventType ¶
type WorkflowEventType int
const ( WorkflowEventTypeStatusChange WorkflowEventType = iota WorkflowEventTypeStatusRuleChange )
type WorkflowLoader ¶
type WorkflowLoader interface { GetStatuses(ctx context.Context) ([]DocumentStatus, error) GetStatusRules(ctx context.Context) ([]StatusRule, error) OnWorkflowUpdate(ctx context.Context, ch chan WorkflowEvent) }
type WorkflowProvider ¶
type WorkflowProvider interface { HasStatus(name string) bool EvaluateRules(input StatusRuleInput) []StatusRuleViolation }
type WorkflowStore ¶
type WorkflowStore interface { UpdateStatus( ctx context.Context, req UpdateStatusRequest, ) error GetStatuses(ctx context.Context) ([]DocumentStatus, error) UpdateStatusRule( ctx context.Context, rule StatusRule, ) error DeleteStatusRule( ctx context.Context, name string, ) error GetStatusRules(ctx context.Context) ([]StatusRule, error) }
type Workflows ¶
type Workflows struct {
// contains filtered or unexported fields
}
func NewWorkflows ¶
func (*Workflows) EvaluateRules ¶
func (w *Workflows) EvaluateRules( input StatusRuleInput, ) []StatusRuleViolation
type WorkflowsService ¶
type WorkflowsService struct {
// contains filtered or unexported fields
}
func NewWorkflowsService ¶
func NewWorkflowsService(store WorkflowStore) *WorkflowsService
func (*WorkflowsService) CreateStatusRule ¶
func (s *WorkflowsService) CreateStatusRule( ctx context.Context, req *repository.CreateStatusRuleRequest, ) (*repository.CreateStatusRuleResponse, error)
CreateStatusRule creates or updates a status rule that should be applied when setting statuses.
func (*WorkflowsService) DeleteStatusRule ¶
func (s *WorkflowsService) DeleteStatusRule( ctx context.Context, req *repository.DeleteStatusRuleRequest, ) (*repository.DeleteStatusRuleResponse, error)
DeleteStatusRule removes a status rule.
func (*WorkflowsService) GetStatusRules ¶
func (s *WorkflowsService) GetStatusRules( ctx context.Context, _ *repository.GetStatusRulesRequest, ) (*repository.GetStatusRulesResponse, error)
GetStatusRules returns all status rules.
func (*WorkflowsService) GetStatuses ¶
func (s *WorkflowsService) GetStatuses( ctx context.Context, _ *repository.GetStatusesRequest, ) (*repository.GetStatusesResponse, error)
GetStatuses lists all enabled statuses.
func (*WorkflowsService) UpdateStatus ¶
func (s *WorkflowsService) UpdateStatus( ctx context.Context, req *repository.UpdateStatusRequest, ) (*repository.UpdateStatusResponse, error)
UpdateStatus creates or updates a status that can be used for documents.
Source Files ¶
- archive.go
- docstore.go
- documents_api.go
- eventlog.go
- fanout.go
- jwt_dummy.go
- lock_cleaner.go
- metric_api.go
- notify.go
- permissions.go
- pgstore.go
- report_api.go
- report_runner.go
- reporter_table.go
- reporter_xlsx.go
- reports.go
- schema.go
- schema_api.go
- serve.go
- signature.go
- validator.go
- workflow_api.go
- workflows.go