Documentation ¶
Overview ¶
The events are used for internal events to trigger an action when data is stored in a dataset It can possibly be used for other events as well, if we have any
Index ¶
- Constants
- Variables
- func BytesToPrivateKey(priv []byte) *rsa.PrivateKey
- func BytesToPublicKey(pub []byte) *rsa.PublicKey
- func DecryptWithPrivateKey(ciphertext []byte, priv *rsa.PrivateKey) []byte
- func EncryptWithPublicKey(msg []byte, pub *rsa.PublicKey) []byte
- func GenerateKeyPair(bits int) (*rsa.PrivateKey, *rsa.PublicKey)
- func PrivateKeyToBytes(priv *rsa.PrivateKey) []byte
- func PublicKeyToBytes(pub *rsa.PublicKey) []byte
- func ToLegacyQueryResult(res RelatedEntitiesQueryResult) [][]any
- func URLJoin(baseURL string, elem ...string) (result string, err error)
- type BackupManager
- type BadgerAccess
- func (b BadgerAccess) GetDB() *badger.DB
- func (b BadgerAccess) IsDatasetDeleted(datasetID types.InternalDatasetID) bool
- func (b BadgerAccess) LookupDatasetID(datasetName string) (types.InternalDatasetID, bool)
- func (b BadgerAccess) LookupDatasetIDs(datasetNames []string) []types.InternalDatasetID
- func (b BadgerAccess) LookupDatasetName(internalDatasetID types.InternalDatasetID) (string, bool)
- func (b BadgerAccess) LookupExpansionPrefix(namespaceURI types.URI) (types.Prefix, error)
- func (b BadgerAccess) LookupNamespaceExpansion(prefix types.Prefix) (types.URI, error)
- type BadgerLogger
- type Changes
- type CollectionIndex
- type Context
- type CreateDatasetConfig
- type Dataset
- func (ds *Dataset) AsProxy(auth func(req *http.Request)) *ProxyDataset
- func (ds *Dataset) AsVirtualDataset(datasetManager *DsManager, ...) *VirtualDataset
- func (ds *Dataset) CompleteFullSync(ctx context.Context) error
- func (ds *Dataset) FullSyncStarted() bool
- func (ds *Dataset) GetChanges(since uint64, count int, latestOnly bool) (*Changes, error)
- func (ds *Dataset) GetChangesWatermark() (uint64, error)
- func (ds *Dataset) GetContext() *Context
- func (ds *Dataset) GetEntities(from string, count int) (*EntitiesResult, error)
- func (ds *Dataset) IsProxy() bool
- func (ds *Dataset) IsVirtual() bool
- func (ds *Dataset) MapEntities(from string, count int, processEntity func(entity *Entity) error) (string, error)
- func (ds *Dataset) MapEntitiesRaw(from string, count int, processEntity func(json []byte) error) (string, error)
- func (ds *Dataset) ProcessChanges(since uint64, count int, latestOnly bool, ...) (uint64, error)
- func (ds *Dataset) ProcessChangesRaw(since uint64, limit int, latestOnly bool, ...) (uint64, error)
- func (ds *Dataset) RefreshFullSyncLease(fullSyncID string) error
- func (ds *Dataset) ReleaseFullSyncLease(fullSyncID string) error
- func (ds *Dataset) StartFullSync() error
- func (ds *Dataset) StartFullSyncWithLease(fullSyncID string) error
- func (ds *Dataset) StoreEntities(entities []*Entity) (Error error)
- func (ds *Dataset) StoreEntitiesWithTransaction(entities []*Entity, txnTime int64, txn *badger.Txn) (newitems int64, Error error)
- type DatasetName
- type DsManager
- func (dsm *DsManager) CreateDataset(name string, createDatasetConfig *CreateDatasetConfig) (*Dataset, error)
- func (dsm *DsManager) DeleteDataset(name string) error
- func (dsm *DsManager) GetDataset(id string) *Dataset
- func (dsm *DsManager) GetDatasetDetails(name string) (*Entity, bool, error)
- func (dsm *DsManager) GetDatasetNames() []DatasetName
- func (dsm *DsManager) IsDataset(name string) bool
- func (dsm *DsManager) NewDatasetEntity(name string, proxyDatasetConfig *ProxyDatasetConfig, ...) *Entity
- func (dsm *DsManager) UpdateDataset(name string, config *UpdateDatasetConfig) (*Dataset, error)
- type DsNsInfo
- type EntitiesResult
- type Entity
- type EntityStreamParser
- type EventBus
- type GarbageCollector
- type Instrumented
- type InstrumentedTransaction
- type InstumentedIterator
- type MEventBus
- func (eb *MEventBus) Emit(ctx context.Context, topicName string, data interface{})
- func (eb *MEventBus) Init(datasets []DatasetName)
- func (eb *MEventBus) RegisterTopic(ds string)
- func (eb *MEventBus) SubscribeToDataset(id string, matcher string, f func(e *bus.Event))
- func (eb *MEventBus) UnregisterTopic(ds string)
- func (eb *MEventBus) UnsubscribeToDataset(id string)
- type NamespaceManager
- func (namespaceManager *NamespaceManager) AssertPrefixMappingForExpansion(uriExpansion string) (string, error)
- func (namespaceManager *NamespaceManager) ExpandCurie(curie string) (string, error)
- func (namespaceManager *NamespaceManager) GetContext(includedNamespaces []string) (context *Context)
- func (namespaceManager *NamespaceManager) GetDatasetNamespaceInfo() (DsNsInfo, error)
- func (namespaceManager *NamespaceManager) GetPrefixMappingForExpansion(uriExpansion string) (string, error)
- func (namespaceManager *NamespaceManager) GetPrefixToExpansionMap() (result map[string]string)
- type NamespacesState
- type NoOp
- func (eb *NoOp) Emit(ctx context.Context, topicName string, data interface{})
- func (eb *NoOp) Init(datasets []DatasetName)
- func (eb *NoOp) RegisterTopic(ds string)
- func (eb *NoOp) SubscribeToDataset(id string, matcher string, f func(e *bus.Event))
- func (eb *NoOp) UnregisterTopic(ds string)
- func (eb *NoOp) UnsubscribeToDataset(id string)
- type ProxyDataset
- func (d *ProxyDataset) ForwardEntities(sourceBody io.ReadCloser, sourceHeader http.Header) error
- func (d *ProxyDataset) StreamChanges(since string, limit int, latestOnly bool, reverse bool, f func(*Entity) error, ...) (string, error)
- func (d *ProxyDataset) StreamChangesRaw(since string, limit int, latestOnly bool, reverse bool, ...) (string, error)
- func (d *ProxyDataset) StreamEntities(from string, limit int, f func(*Entity) error, preStream func() error) (string, error)
- func (d *ProxyDataset) StreamEntitiesRaw(from string, limit int, f func(jsonData []byte) error, preStream func() error) (string, error)
- type ProxyDatasetConfig
- type RelatedEntitiesQueryResult
- type RelatedEntitiesResult
- type RelatedEntityResult
- type RelatedFrom
- type Statistics
- type StorageError
- type Store
- func (s *Store) Close() error
- func (s *Store) DatasetsToInternalIDs(datasets []string) []uint32
- func (s *Store) Delete() error
- func (s *Store) DeleteObject(collection CollectionIndex, id string) error
- func (s *Store) ExecuteTransaction(transaction *Transaction) error
- func (s *Store) ExpandCurie(curie string) (string, error)
- func (s *Store) GetEntity(uri string, datasets []string, mergePartials bool) (*Entity, error)
- func (s *Store) GetEntityAtPointInTimeWithInternalID(internalID uint64, at int64, targetDatasetIds []uint32, mergePartials bool) (*Entity, error)
- func (s *Store) GetEntityWithInternalID(internalID uint64, targetDatasetIds []uint32, mergePartials bool) (*Entity, error)
- func (s *Store) GetGlobalContext(strict bool) *Context
- func (s *Store) GetManyRelatedEntities(startPoints []string, predicate string, inverse bool, datasets []string, ...) ([][]any, error)
- func (s *Store) GetManyRelatedEntitiesAtTime(from []*RelatedFrom, limit int, mergePartials bool) (RelatedEntitiesQueryResult, error)
- func (s *Store) GetManyRelatedEntitiesBatch(startPoints []string, predicate string, inverse bool, datasets []string, ...) (RelatedEntitiesQueryResult, error)
- func (s *Store) GetNamespacedIdentifier(val string, localNamespaces map[string]string) (string, error)
- func (s *Store) GetNamespacedIdentifierFromURI(val string) (string, error)
- func (s *Store) GetObject(collection CollectionIndex, id string, obj interface{}) error
- func (s *Store) GetPredicateID(predicate string, txn *badger.Txn) (uint64, error)
- func (s *Store) GetRelatedAtTime(from *RelatedFrom, limit int) ([]qresult, *RelatedFrom, error)
- func (s *Store) IsCurie(uri string) bool
- func (s *Store) IterateObjectsRaw(prefix []byte, processJSON func([]byte) error) error
- func (s *Store) Open() error
- func (s *Store) StoreObject(collection CollectionIndex, id string, data interface{}) error
- func (s *Store) ToRelatedFrom(startPoints []string, predicate string, inverse bool, datasets []string, ...) ([]*RelatedFrom, error)
- type Transaction
- type UpdateDatasetConfig
- type VirtualDataset
- type VirtualDatasetConfig
Constants ¶
const ( URIToIDIndexID uint16 = 0 EntityIDToJSONIndexID uint16 = 1 IncomingRefIndex uint16 = 2 OutgoingRefIndex uint16 = 3 DatasetEntityChangeLog uint16 = 4 SysDatasetsID uint16 = 5 SysJobsID uint16 = 6 SysDatasetsSequences uint16 = 7 DatasetLatestEntities uint16 = 8 IDToURIIndexID uint16 = 9 StoreMetaIndex CollectionIndex = 10 NamespacesIndex CollectionIndex = 11 JobResultIndex CollectionIndex = 12 JobDataIndex CollectionIndex = 13 JobConfigIndex CollectionIndex = 14 ContentIndex CollectionIndex = 15 StoreNextDatasetID CollectionIndex = 16 LoginProviderIndex CollectionIndex = 17 )
const ( RdfNamespaceExpansion = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" // rdf and rdfs core uris RdfTypeURI string = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" RdfsClassURI string = "http://www.w3.org/2000/01/rdf-schema#Class" RdfsLabelURI string = "http://www.w3.org/2000/01/rdf-schema#label" )
const StorageIDFileName = "DATAHUB_BACKUPID"
Variables ¶
var ( JobResultIndexBytes = uint16ToBytes(JobResultIndex) JobConfigsIndexBytes = uint16ToBytes(JobConfigIndex) ContentIndexBytes = uint16ToBytes(ContentIndex) StoreNextDatasetIDBytes = uint16ToBytes(StoreNextDatasetID) LoginProviderIndexBytes = uint16ToBytes(LoginProviderIndex) )
var ( AttemptStoreEntitiesErr = func(detail error) error { return fmt.Errorf("failed when attempting to store entities: %w", detail) } SinceParseErr = func(detail error) error { return fmt.Errorf("since should be an integer number: %w", detail) } HTTPBodyMissingErr = func(detail error) error { return fmt.Errorf("body is missing or could not read: %w", detail) } HTTPJobParsingErr = func(detail error) error { return fmt.Errorf("failed at parsing the job definition: %w", detail) } HTTPJobSchedulingErr = func(detail error) error { return fmt.Errorf("failed at scheduling the job definition: %w", detail) } HTTPJsonParsingErr = func(detail error) error { return fmt.Errorf("failed parsing the json body: %w", detail) } HTTPContentStoreErr = func(detail error) error { return fmt.Errorf("failed updating the content: %w", detail) } HTTPQueryParamErr = func(detail error) error { return fmt.Errorf("one or more of the query parameters failed its validation: %w", detail) } HTTPGenericErr = func(detail error) error { return fmt.Errorf("internal failure: %w", detail) } HTTPFullsyncErr = func(detail error) error { return fmt.Errorf("an error occured trying to start or update a full sync: %w", detail) } )
we define these errors to prevent leaking of internal details on the api
Functions ¶
func BytesToPrivateKey ¶
func BytesToPrivateKey(priv []byte) *rsa.PrivateKey
BytesToPrivateKey bytes to private key
func BytesToPublicKey ¶
BytesToPublicKey bytes to public key
func DecryptWithPrivateKey ¶
func DecryptWithPrivateKey(ciphertext []byte, priv *rsa.PrivateKey) []byte
DecryptWithPrivateKey decrypts data with private key
func EncryptWithPublicKey ¶
EncryptWithPublicKey encrypts data with public key
func GenerateKeyPair ¶
func GenerateKeyPair(bits int) (*rsa.PrivateKey, *rsa.PublicKey)
GenerateKeyPair generates a new key pair
func PrivateKeyToBytes ¶
func PrivateKeyToBytes(priv *rsa.PrivateKey) []byte
PrivateKeyToBytes private key to bytes
func PublicKeyToBytes ¶
PublicKeyToBytes public key to bytes
func ToLegacyQueryResult ¶
func ToLegacyQueryResult(res RelatedEntitiesQueryResult) [][]any
Types ¶
type BackupManager ¶
type BackupManager struct {
// contains filtered or unexported fields
}
func NewBackupManager ¶
func NewBackupManager(store *Store, env *conf.Config) (*BackupManager, error)
func (*BackupManager) DoNativeBackup ¶
func (backupManager *BackupManager) DoNativeBackup() error
func (*BackupManager) DoRsyncBackup ¶
func (backupManager *BackupManager) DoRsyncBackup() error
func (*BackupManager) LoadLastID ¶
func (backupManager *BackupManager) LoadLastID() (uint64, error)
func (*BackupManager) Run ¶
func (backupManager *BackupManager) Run()
This is the function called by the cron job scheduler
func (*BackupManager) StoreLastID ¶
func (backupManager *BackupManager) StoreLastID() error
type BadgerAccess ¶
type BadgerAccess struct {
// contains filtered or unexported fields
}
BadgerAccess implements service/store.BadgerStore and bridges badger access without cyclic dependencies
func NewBadgerAccess ¶
func NewBadgerAccess(s *Store, dsm *DsManager) BadgerAccess
func (BadgerAccess) GetDB ¶
func (b BadgerAccess) GetDB() *badger.DB
func (BadgerAccess) IsDatasetDeleted ¶
func (b BadgerAccess) IsDatasetDeleted(datasetID types.InternalDatasetID) bool
func (BadgerAccess) LookupDatasetID ¶
func (b BadgerAccess) LookupDatasetID(datasetName string) (types.InternalDatasetID, bool)
func (BadgerAccess) LookupDatasetIDs ¶
func (b BadgerAccess) LookupDatasetIDs(datasetNames []string) []types.InternalDatasetID
func (BadgerAccess) LookupDatasetName ¶
func (b BadgerAccess) LookupDatasetName(internalDatasetID types.InternalDatasetID) (string, bool)
func (BadgerAccess) LookupExpansionPrefix ¶
func (BadgerAccess) LookupNamespaceExpansion ¶
type BadgerLogger ¶
type BadgerLogger struct {
Logger *zap.SugaredLogger
}
func (BadgerLogger) Debugf ¶
func (bl BadgerLogger) Debugf(format string, v ...interface{})
func (BadgerLogger) Errorf ¶
func (bl BadgerLogger) Errorf(format string, v ...interface{})
func (BadgerLogger) Infof ¶
func (bl BadgerLogger) Infof(format string, v ...interface{})
func (BadgerLogger) Warningf ¶
func (bl BadgerLogger) Warningf(format string, v ...interface{})
type CollectionIndex ¶
type CollectionIndex uint16
type CreateDatasetConfig ¶
type CreateDatasetConfig struct { ProxyDatasetConfig *ProxyDatasetConfig `json:"ProxyDatasetConfig"` VirtualDatasetConfig *VirtualDatasetConfig `json:"VirtualDatasetConfig"` PublicNamespaces []string `json:"publicNamespaces"` }
type Dataset ¶
type Dataset struct { ID string `json:"id"` InternalID uint32 `json:"internalId"` SubjectIdentifier string `json:"subjectIdentifier"` WriteLock sync.Mutex PublicNamespaces []string `json:"publicNamespaces"` ProxyConfig *ProxyDatasetConfig `json:"proxyConfig"` VirtualDatasetConfig *VirtualDatasetConfig `json:"virtualDatasetConfig"` // contains filtered or unexported fields }
Dataset data structure
func NewDataset ¶
NewDataset Create a new dataset from the params provided
func (*Dataset) AsVirtualDataset ¶ added in v1.9.0
func (*Dataset) CompleteFullSync ¶
CompleteFullSync Full sync completed - mark unseen entities as deleted
func (*Dataset) FullSyncStarted ¶
func (*Dataset) GetChanges ¶
func (*Dataset) GetChangesWatermark ¶
func (*Dataset) GetContext ¶
func (*Dataset) GetEntities ¶
func (ds *Dataset) GetEntities(from string, count int) (*EntitiesResult, error)
GetEntities returns a batch of entities
func (*Dataset) MapEntities ¶
func (ds *Dataset) MapEntities(from string, count int, processEntity func(entity *Entity) error) (string, error)
MapEntities applies a function to all entities in the dataset returns the id of the last entity so that it can be used as a continuation token
func (*Dataset) MapEntitiesRaw ¶
func (ds *Dataset) MapEntitiesRaw(from string, count int, processEntity func(json []byte) error) (string, error)
MapEntities applies a function to all entities in the dataset. the entities are provided as raw json bytes returns the id of the last entity so that it can be used as a continuation token
func (*Dataset) ProcessChanges ¶
func (*Dataset) ProcessChangesRaw ¶
func (*Dataset) RefreshFullSyncLease ¶
func (*Dataset) ReleaseFullSyncLease ¶
func (*Dataset) StartFullSync ¶
StartFullSync Indicates that a full sync is starting
func (*Dataset) StartFullSyncWithLease ¶
func (*Dataset) StoreEntities ¶
type DatasetName ¶
type DatasetName struct {
Name string `json:"Name"`
}
type DsManager ¶
type DsManager struct {
// contains filtered or unexported fields
}
func (*DsManager) CreateDataset ¶
func (dsm *DsManager) CreateDataset(name string, createDatasetConfig *CreateDatasetConfig) (*Dataset, error)
func (*DsManager) DeleteDataset ¶
DeleteDataset deletes dataset if it exists
func (*DsManager) GetDataset ¶
func (*DsManager) GetDatasetDetails ¶
func (*DsManager) GetDatasetNames ¶
func (dsm *DsManager) GetDatasetNames() []DatasetName
GetDatasetNames returns a list of the dataset names
func (*DsManager) NewDatasetEntity ¶
func (dsm *DsManager) NewDatasetEntity( name string, proxyDatasetConfig *ProxyDatasetConfig, virtualDatasetConfig *VirtualDatasetConfig, publicNamespaces []string, ) *Entity
func (*DsManager) UpdateDataset ¶
func (dsm *DsManager) UpdateDataset(name string, config *UpdateDatasetConfig) (*Dataset, error)
type EntitiesResult ¶
type Entity ¶
type Entity struct { References map[string]interface{} `json:"refs"` Properties map[string]interface{} `json:"props"` ID string `json:"id,omitempty"` InternalID uint64 `json:"internalId,omitempty"` Recorded uint64 `json:"recorded,omitempty"` IsDeleted bool `json:"deleted,omitempty"` }
Entity data structure
func NewEntityFromMap ¶
func (*Entity) ExpandIdentifiers ¶
func (*Entity) GetProperty ¶
GetProperty returns the value of the named property as an interface
func (*Entity) GetStringProperty ¶
GetStringProperty returns the string value of the requested property
type EntityStreamParser ¶
type EntityStreamParser struct {
// contains filtered or unexported fields
}
func NewEntityStreamParser ¶
func NewEntityStreamParser(store *Store) *EntityStreamParser
func (*EntityStreamParser) ParseStream ¶
func (*EntityStreamParser) ParseTransaction ¶
func (esp *EntityStreamParser) ParseTransaction(reader io.Reader) (*Transaction, error)
type EventBus ¶
type GarbageCollector ¶
type GarbageCollector struct {
// contains filtered or unexported fields
}
func NewGarbageCollector ¶
func NewGarbageCollector(store *Store, env *conf.Config) *GarbageCollector
func (*GarbageCollector) Cleandeleted ¶
func (garbageCollector *GarbageCollector) Cleandeleted() error
func (*GarbageCollector) GC ¶
func (garbageCollector *GarbageCollector) GC() error
type Instrumented ¶
type Instrumented interface {
// contains filtered or unexported methods
}
type InstrumentedTransaction ¶
type InstrumentedTransaction struct {
// contains filtered or unexported fields
}
func InstrumentedTxn ¶
func InstrumentedTxn(btxn *badger.Txn, store *Store) *InstrumentedTransaction
func (*InstrumentedTransaction) Get ¶
func (t *InstrumentedTransaction) Get(id []byte) (*badger.Item, error)
func (*InstrumentedTransaction) NewIterator ¶
func (t *InstrumentedTransaction) NewIterator(options badger.IteratorOptions) *InstumentedIterator
type InstumentedIterator ¶
type InstumentedIterator struct {
// contains filtered or unexported fields
}
func (*InstumentedIterator) Close ¶
func (i *InstumentedIterator) Close()
func (*InstumentedIterator) Item ¶
func (i *InstumentedIterator) Item() *badger.Item
func (*InstumentedIterator) Next ¶
func (i *InstumentedIterator) Next()
func (*InstumentedIterator) Rewind ¶
func (i *InstumentedIterator) Rewind()
func (*InstumentedIterator) Seek ¶
func (i *InstumentedIterator) Seek(buffer []byte)
func (*InstumentedIterator) ValidForPrefix ¶
func (i *InstumentedIterator) ValidForPrefix(bytes []byte) bool
type MEventBus ¶
func (*MEventBus) Init ¶
func (eb *MEventBus) Init(datasets []DatasetName)
Init adds the list of existing datasets that can be subscribed to. Datasets are always prefixed with dataset. to separate them from other possible future events
func (*MEventBus) RegisterTopic ¶
RegisterTopic registers a topic for publishing. "dataset." is prefixed in front of the topic
func (*MEventBus) SubscribeToDataset ¶
SubscribeToDataset adds a subscription to an already registered topic. The id should be unique, the matcher is a regexp to match against the registered topics, ie: dataset.*, dataset.sdb.*, dataset.sdb.Animal are all valid registrations. f is the func to be called
func (*MEventBus) UnregisterTopic ¶
UnregisterTopic removes a topic from subscription
func (*MEventBus) UnsubscribeToDataset ¶
UnsubscribeToDataset removes a dataset subscription
type NamespaceManager ¶
type NamespaceManager struct {
// contains filtered or unexported fields
}
func NewNamespaceManager ¶
func NewNamespaceManager(store *Store) *NamespaceManager
func (*NamespaceManager) AssertPrefixMappingForExpansion ¶
func (namespaceManager *NamespaceManager) AssertPrefixMappingForExpansion(uriExpansion string) (string, error)
func (*NamespaceManager) ExpandCurie ¶
func (namespaceManager *NamespaceManager) ExpandCurie(curie string) (string, error)
func (*NamespaceManager) GetContext ¶
func (namespaceManager *NamespaceManager) GetContext(includedNamespaces []string) (context *Context)
GetContext return a context instance
containing namespace mappings for the given list of namespace prefixes if the give list of given namespace prefixes is empty, all namespace mappings are returned
func (*NamespaceManager) GetDatasetNamespaceInfo ¶
func (namespaceManager *NamespaceManager) GetDatasetNamespaceInfo() (DsNsInfo, error)
func (*NamespaceManager) GetPrefixMappingForExpansion ¶
func (namespaceManager *NamespaceManager) GetPrefixMappingForExpansion(uriExpansion string) (string, error)
func (*NamespaceManager) GetPrefixToExpansionMap ¶
func (namespaceManager *NamespaceManager) GetPrefixToExpansionMap() (result map[string]string)
type NamespacesState ¶
type NoOp ¶
type NoOp struct{}
func (*NoOp) Init ¶
func (eb *NoOp) Init(datasets []DatasetName)
func (*NoOp) RegisterTopic ¶
func (*NoOp) SubscribeToDataset ¶
func (*NoOp) UnregisterTopic ¶
func (*NoOp) UnsubscribeToDataset ¶
type ProxyDataset ¶
type ProxyDataset struct { *ProxyDatasetConfig RemoteChangesURL string RemoteEntitiesURL string // contains filtered or unexported fields }
func (*ProxyDataset) ForwardEntities ¶
func (d *ProxyDataset) ForwardEntities(sourceBody io.ReadCloser, sourceHeader http.Header) error
func (*ProxyDataset) StreamChanges ¶
func (d *ProxyDataset) StreamChanges(since string, limit int, latestOnly bool, reverse bool, f func(*Entity) error, preStream func() error) (string, error)
StreamChangesRaw stream through the dataset's changes and call `f` for each entity. a `preStream` function can be provided if StreamChanges is used in a web handler. It allows to leave the http response uncommitted until `f` is called, so that an http error handler still can modify status code while the response is uncommitted
func (*ProxyDataset) StreamChangesRaw ¶
func (d *ProxyDataset) StreamChangesRaw( since string, limit int, latestOnly bool, reverse bool, f func(jsonData []byte) error, preStream func() error, ) (string, error)
StreamChangesRaw stream through the dataset's changes and call `f` for each entity. a `preStream` function can be provided if StreamChangesRaw is used in a web handler. It allows to leave the http response uncommitted until `f` is called, so that an http error handler still can modify status code while the response is uncommitted
func (*ProxyDataset) StreamEntities ¶
func (*ProxyDataset) StreamEntitiesRaw ¶
type ProxyDatasetConfig ¶
type RelatedEntitiesQueryResult ¶
type RelatedEntitiesQueryResult struct { Cont []*RelatedFrom Relations []RelatedEntityResult }
type RelatedEntitiesResult ¶
type RelatedEntitiesResult struct { Continuation *RelatedFrom Relations []RelatedEntityResult }
type RelatedEntityResult ¶
type RelatedFrom ¶
type Statistics ¶ added in v1.9.0
type Statistics struct { Store *Store Logger *zap.SugaredLogger }
func (Statistics) GetStatistics ¶ added in v1.9.0
func (stats Statistics) GetStatistics(writer io.Writer) error
func (Statistics) GetStatisticsForDs ¶ added in v1.9.0
func (stats Statistics) GetStatisticsForDs(datasetName string, writer io.Writer) error
type StorageError ¶
type StorageError struct {
// contains filtered or unexported fields
}
StorageError custom error from storage
func NewStorageError ¶
func NewStorageError(msg string, innerError error) *StorageError
NewStorageError Creates a new storage error with optional innerError
func (*StorageError) Error ¶
func (e *StorageError) Error() string
type Store ¶
type Store struct { NamespaceManager *NamespaceManager // namespace manager for consistent short expansions SlowLogThreshold time.Duration // contains filtered or unexported fields }
Store data structure
func NewStore ¶
func NewStore(env *conf.Config, statsdClient statsd.ClientInterface) *Store
NewStore Create a new store
func (*Store) DatasetsToInternalIDs ¶
DatasetsToInternalIDs map dataset IDs (strings) to InternaIDs (uint32)
func (*Store) DeleteObject ¶
func (s *Store) DeleteObject(collection CollectionIndex, id string) error
DeleteObject is a slightly less to the metal variation of the deleteValue method. You should probably use this instead of deleteValue if you need to delete stuff. It takes a collection and an object id, and attempts to delete it from the store.
func (*Store) ExecuteTransaction ¶
func (s *Store) ExecuteTransaction(transaction *Transaction) error
func (*Store) GetEntityAtPointInTimeWithInternalID ¶
func (*Store) GetEntityWithInternalID ¶
func (*Store) GetGlobalContext ¶
func (*Store) GetManyRelatedEntities ¶
func (s *Store) GetManyRelatedEntities( startPoints []string, predicate string, inverse bool, datasets []string, mergePartials bool) ([][]any, error)
Backwards compatibility function
func (*Store) GetManyRelatedEntitiesAtTime ¶
func (s *Store) GetManyRelatedEntitiesAtTime(from []*RelatedFrom, limit int, mergePartials bool) (RelatedEntitiesQueryResult, error)
func (*Store) GetManyRelatedEntitiesBatch ¶
func (*Store) GetNamespacedIdentifier ¶
func (*Store) GetNamespacedIdentifierFromURI ¶
func (*Store) GetObject ¶
func (s *Store) GetObject(collection CollectionIndex, id string, obj interface{}) error
func (*Store) GetPredicateID ¶
func (*Store) GetRelatedAtTime ¶
func (s *Store) GetRelatedAtTime(from *RelatedFrom, limit int) ([]qresult, *RelatedFrom, error)
func (*Store) IterateObjectsRaw ¶
func (*Store) StoreObject ¶
func (s *Store) StoreObject(collection CollectionIndex, id string, data interface{}) error
func (*Store) ToRelatedFrom ¶
type Transaction ¶
type UpdateDatasetConfig ¶
type UpdateDatasetConfig struct {
ID string // update id/name
}
type VirtualDataset ¶ added in v1.9.0
type VirtualDataset struct { *VirtualDatasetConfig Store *Store DsManager *DsManager Logger *zap.SugaredLogger Wrap func(d *VirtualDataset, params map[string]any, since string, f func(entity *Entity) error) (string, error) }
func (*VirtualDataset) StreamChanges ¶ added in v1.9.0
func (d *VirtualDataset) StreamChanges(since string, body io.Reader, f func(entity *Entity) error) (string, error)
StreamChanges streams out entities, produced by the configured transform script. it returns a continuation token
type VirtualDatasetConfig ¶ added in v1.9.0
type VirtualDatasetConfig struct {
Transform string
}