Versions in this module Expand all Collapse all v1 v1.8.7 Apr 30, 2024 v1.8.6 Apr 10, 2024 v1.8.5 Mar 8, 2024 v1.8.4 Jan 31, 2024 Changes in this version + const ContentIndex + const DatasetEntityChangeLog + const DatasetLatestEntities + const EntityIDToJSONIndexID + const IDToURIIndexID + const IncomingRefIndex + const JobConfigIndex + const JobDataIndex + const JobResultIndex + const LoginProviderIndex + const NamespacesIndex + const OutgoingRefIndex + const RdfNamespaceExpansion + const RdfTypeURI + const RdfsClassURI + const RdfsLabelURI + const StorageIDFileName + const StoreMetaIndex + const StoreNextDatasetID + const SysDatasetsID + const SysDatasetsSequences + const SysJobsID + const URIToIDIndexID + var AttemptStoreEntitiesErr = func(detail error) error + var ContentIndexBytes = uint16ToBytes(ContentIndex) + var HTTPBodyMissingErr = func(detail error) error + var HTTPContentStoreErr = func(detail error) error + var HTTPFullsyncErr = func(detail error) error + var HTTPGenericErr = func(detail error) error + var HTTPJobParsingErr = func(detail error) error + var HTTPJobSchedulingErr = func(detail error) error + var HTTPJsonParsingErr = func(detail error) error + var HTTPQueryParamErr = func(detail error) error + var JobConfigsIndexBytes = uint16ToBytes(JobConfigIndex) + var JobResultIndexBytes = uint16ToBytes(JobResultIndex) + var LoginProviderIndexBytes = uint16ToBytes(LoginProviderIndex) + var SinceParseErr = func(detail error) error + var StoreNextDatasetIDBytes = uint16ToBytes(StoreNextDatasetID) + func BytesToPrivateKey(priv []byte) *rsa.PrivateKey + func BytesToPublicKey(pub []byte) *rsa.PublicKey + func DecryptWithPrivateKey(ciphertext []byte, priv *rsa.PrivateKey) []byte + func EncryptWithPublicKey(msg []byte, pub *rsa.PublicKey) []byte + func GenerateKeyPair(bits int) (*rsa.PrivateKey, *rsa.PublicKey) + func PrivateKeyToBytes(priv *rsa.PrivateKey) []byte + func PublicKeyToBytes(pub *rsa.PublicKey) []byte + func ToLegacyQueryResult(res RelatedEntitiesQueryResult) [][]any + func URLJoin(baseURL string, elem ...string) (result string, err error) + type BackupManager struct + func NewBackupManager(store *Store, env *conf.Config) (*BackupManager, error) + func (backupManager *BackupManager) DoNativeBackup() error + func (backupManager *BackupManager) DoRsyncBackup() error + func (backupManager *BackupManager) LoadLastID() (uint64, error) + func (backupManager *BackupManager) Run() + func (backupManager *BackupManager) StoreLastID() error + type BadgerAccess struct + func NewBadgerAccess(s *Store, dsm *DsManager) BadgerAccess + func (b BadgerAccess) GetDB() *badger.DB + func (b BadgerAccess) IsDatasetDeleted(datasetID types.InternalDatasetID) bool + func (b BadgerAccess) LookupDatasetID(datasetName string) (types.InternalDatasetID, bool) + func (b BadgerAccess) LookupDatasetIDs(datasetNames []string) []types.InternalDatasetID + func (b BadgerAccess) LookupDatasetName(internalDatasetID types.InternalDatasetID) (string, bool) + func (b BadgerAccess) LookupExpansionPrefix(namespaceURI types.URI) (types.Prefix, error) + func (b BadgerAccess) LookupNamespaceExpansion(prefix types.Prefix) (types.URI, error) + type BadgerLogger struct + Logger *zap.SugaredLogger + func (bl BadgerLogger) Debugf(format string, v ...interface{}) + func (bl BadgerLogger) Errorf(format string, v ...interface{}) + func (bl BadgerLogger) Infof(format string, v ...interface{}) + func (bl BadgerLogger) Warningf(format string, v ...interface{}) + type Changes struct + Context *Context + Entities []*Entity + NextToken uint64 + func NewChanges() *Changes + type CollectionIndex uint16 + type Context struct + ID string + Namespaces map[string]string + type CreateDatasetConfig struct + ProxyDatasetConfig *ProxyDatasetConfig + PublicNamespaces []string + type Dataset struct + ID string + InternalID uint32 + ProxyConfig *ProxyDatasetConfig + PublicNamespaces []string + SubjectIdentifier string + WriteLock sync.Mutex + func NewDataset(store *Store, id string, internalID uint32, subjectIdentifier string) *Dataset + func (ds *Dataset) AsProxy(auth func(req *http.Request)) *ProxyDataset + func (ds *Dataset) CompleteFullSync(ctx context.Context) error + func (ds *Dataset) FullSyncStarted() bool + func (ds *Dataset) GetChanges(since uint64, count int, latestOnly bool) (*Changes, error) + func (ds *Dataset) GetChangesWatermark() (uint64, error) + func (ds *Dataset) GetContext() *Context + func (ds *Dataset) GetEntities(from string, count int) (*EntitiesResult, error) + func (ds *Dataset) IsProxy() bool + func (ds *Dataset) MapEntities(from string, count int, processEntity func(entity *Entity) error) (string, error) + func (ds *Dataset) MapEntitiesRaw(from string, count int, processEntity func(json []byte) error) (string, error) + func (ds *Dataset) ProcessChanges(since uint64, count int, latestOnly bool, ...) (uint64, error) + func (ds *Dataset) ProcessChangesRaw(since uint64, limit int, latestOnly bool, ...) (uint64, error) + func (ds *Dataset) RefreshFullSyncLease(fullSyncID string) error + func (ds *Dataset) ReleaseFullSyncLease(fullSyncID string) error + func (ds *Dataset) StartFullSync() error + func (ds *Dataset) StartFullSyncWithLease(fullSyncID string) error + func (ds *Dataset) StoreEntities(entities []*Entity) (Error error) + func (ds *Dataset) StoreEntitiesWithTransaction(entities []*Entity, txnTime int64, txn *badger.Txn) (newitems int64, Error error) + type DatasetName struct + Name string + type DsManager struct + func NewDsManager(env *conf.Config, store *Store, eb EventBus) *DsManager + func (dsm *DsManager) CreateDataset(name string, createDatasetConfig *CreateDatasetConfig) (*Dataset, error) + func (dsm *DsManager) DeleteDataset(name string) error + func (dsm *DsManager) GetDataset(id string) *Dataset + func (dsm *DsManager) GetDatasetDetails(name string) (*Entity, bool, error) + func (dsm *DsManager) GetDatasetNames() []DatasetName + func (dsm *DsManager) IsDataset(name string) bool + func (dsm *DsManager) NewDatasetEntity(name string, proxyDatasetConfig *ProxyDatasetConfig, publicNamespaces []string) *Entity + func (dsm *DsManager) UpdateDataset(name string, config *UpdateDatasetConfig) (*Dataset, error) + type DsNsInfo struct + DatasetPrefix string + ItemsKey string + NameKey string + PublicNamespacesKey string + type EntitiesResult struct + Context *Context + ContinuationToken string + Entities []*Entity + type Entity struct + ID string + InternalID uint64 + IsDeleted bool + Properties map[string]interface{} + Recorded uint64 + References map[string]interface{} + func NewEntity(ID string, internalID uint64) *Entity + func NewEntityFromMap(data map[string]interface{}) *Entity + func (e *Entity) ExpandIdentifiers(store *Store) error + func (e *Entity) GetName() string + func (e *Entity) GetProperty(propName string) interface{} + func (e *Entity) GetStringProperty(propName string) string + type EntityStreamParser struct + func NewEntityStreamParser(store *Store) *EntityStreamParser + func (esp *EntityStreamParser) ParseStream(reader io.Reader, emitEntity func(*Entity) error) error + func (esp *EntityStreamParser) ParseTransaction(reader io.Reader) (*Transaction, error) + type EventBus interface + Emit func(ctx context.Context, topicName string, data interface{}) + Init func(datasets []DatasetName) + RegisterTopic func(ds string) + SubscribeToDataset func(id string, matcher string, f func(e *bus.Event)) + UnregisterTopic func(ds string) + UnsubscribeToDataset func(id string) + func NewBus(env *conf.Config) (EventBus, error) + func NoOpBus() EventBus + type GarbageCollector struct + func NewGarbageCollector(store *Store, env *conf.Config) *GarbageCollector + func (garbageCollector *GarbageCollector) Cleandeleted() error + func (garbageCollector *GarbageCollector) GC() error + func (garbageCollector *GarbageCollector) Start(ctx context.Context) error + func (garbageCollector *GarbageCollector) Stop(ctx context.Context) error + type Instrumented interface + type InstrumentedTransaction struct + func InstrumentedTxn(btxn *badger.Txn, store *Store) *InstrumentedTransaction + func (t *InstrumentedTransaction) Get(id []byte) (*badger.Item, error) + func (t *InstrumentedTransaction) NewIterator(options badger.IteratorOptions) *InstumentedIterator + type InstumentedIterator struct + func (i *InstumentedIterator) Close() + func (i *InstumentedIterator) Item() *badger.Item + func (i *InstumentedIterator) Next() + func (i *InstumentedIterator) Rewind() + func (i *InstumentedIterator) Seek(buffer []byte) + func (i *InstumentedIterator) ValidForPrefix(bytes []byte) bool + type MEventBus struct + Bus *bus.Bus + func (eb *MEventBus) Emit(ctx context.Context, topicName string, data interface{}) + func (eb *MEventBus) Init(datasets []DatasetName) + func (eb *MEventBus) RegisterTopic(ds string) + func (eb *MEventBus) SubscribeToDataset(id string, matcher string, f func(e *bus.Event)) + func (eb *MEventBus) UnregisterTopic(ds string) + func (eb *MEventBus) UnsubscribeToDataset(id string) + type NamespaceManager struct + func NewNamespaceManager(store *Store) *NamespaceManager + func (namespaceManager *NamespaceManager) AssertPrefixMappingForExpansion(uriExpansion string) (string, error) + func (namespaceManager *NamespaceManager) ExpandCurie(curie string) (string, error) + func (namespaceManager *NamespaceManager) GetContext(includedNamespaces []string) (context *Context) + func (namespaceManager *NamespaceManager) GetDatasetNamespaceInfo() (DsNsInfo, error) + func (namespaceManager *NamespaceManager) GetPrefixMappingForExpansion(uriExpansion string) (string, error) + func (namespaceManager *NamespaceManager) GetPrefixToExpansionMap() (result map[string]string) + type NamespacesState struct + ExpansionToPrefixMapping map[string]string + PrefixToExpansionMapping map[string]string + type NoOp struct + func (eb *NoOp) Emit(ctx context.Context, topicName string, data interface{}) + func (eb *NoOp) Init(datasets []DatasetName) + func (eb *NoOp) RegisterTopic(ds string) + func (eb *NoOp) SubscribeToDataset(id string, matcher string, f func(e *bus.Event)) + func (eb *NoOp) UnregisterTopic(ds string) + func (eb *NoOp) UnsubscribeToDataset(id string) + type ProxyDataset struct + RemoteChangesURL string + RemoteEntitiesURL string + func (d *ProxyDataset) ForwardEntities(sourceBody io.ReadCloser, sourceHeader http.Header) error + func (d *ProxyDataset) StreamChanges(since string, limit int, latestOnly bool, reverse bool, f func(*Entity) error, ...) (string, error) + func (d *ProxyDataset) StreamChangesRaw(since string, limit int, latestOnly bool, reverse bool, ...) (string, error) + func (d *ProxyDataset) StreamEntities(from string, limit int, f func(*Entity) error, preStream func() error) (string, error) + func (d *ProxyDataset) StreamEntitiesRaw(from string, limit int, f func(jsonData []byte) error, preStream func() error) (string, error) + type ProxyDatasetConfig struct + AuthProviderName string + DownstreamTransform string + RemoteURL string + UpstreamTransform string + type RelatedEntitiesQueryResult struct + Cont []*RelatedFrom + Relations []RelatedEntityResult + type RelatedEntitiesResult struct + Continuation *RelatedFrom + Relations []RelatedEntityResult + type RelatedEntityResult struct + PredicateURI string + RelatedEntity *Entity + StartURI string + type RelatedFrom struct + At int64 + Datasets []uint32 + Inverse bool + Predicate uint64 + RelationIndexFromKey []byte + type StorageError struct + func NewStorageError(msg string, innerError error) *StorageError + func (e *StorageError) Error() string + type Store struct + NamespaceManager *NamespaceManager + SlowLogThreshold time.Duration + func NewStore(env *conf.Config, statsdClient statsd.ClientInterface) *Store + func (s *Store) Close() error + func (s *Store) DatasetsToInternalIDs(datasets []string) []uint32 + func (s *Store) Delete() error + func (s *Store) DeleteObject(collection CollectionIndex, id string) error + func (s *Store) ExecuteTransaction(transaction *Transaction) error + func (s *Store) ExpandCurie(curie string) (string, error) + func (s *Store) GetEntity(uri string, datasets []string, mergePartials bool) (*Entity, error) + func (s *Store) GetEntityAtPointInTimeWithInternalID(internalID uint64, at int64, targetDatasetIds []uint32, mergePartials bool) (*Entity, error) + func (s *Store) GetEntityWithInternalID(internalID uint64, targetDatasetIds []uint32, mergePartials bool) (*Entity, error) + func (s *Store) GetGlobalContext(strict bool) *Context + func (s *Store) GetManyRelatedEntities(startPoints []string, predicate string, inverse bool, datasets []string, ...) ([][]any, error) + func (s *Store) GetManyRelatedEntitiesAtTime(from []*RelatedFrom, limit int, mergePartials bool) (RelatedEntitiesQueryResult, error) + func (s *Store) GetManyRelatedEntitiesBatch(startPoints []string, predicate string, inverse bool, datasets []string, ...) (RelatedEntitiesQueryResult, error) + func (s *Store) GetNamespacedIdentifier(val string, localNamespaces map[string]string) (string, error) + func (s *Store) GetNamespacedIdentifierFromURI(val string) (string, error) + func (s *Store) GetObject(collection CollectionIndex, id string, obj interface{}) error + func (s *Store) GetPredicateID(predicate string, txn *badger.Txn) (uint64, error) + func (s *Store) GetRelatedAtTime(from *RelatedFrom, limit int) ([]qresult, *RelatedFrom, error) + func (s *Store) IsCurie(uri string) bool + func (s *Store) IterateObjectsRaw(prefix []byte, processJSON func([]byte) error) error + func (s *Store) Open() error + func (s *Store) StoreObject(collection CollectionIndex, id string, data interface{}) error + func (s *Store) ToRelatedFrom(startPoints []string, predicate string, inverse bool, datasets []string, ...) ([]*RelatedFrom, error) + type Transaction struct + DatasetEntities map[string][]*Entity + type UpdateDatasetConfig struct + ID string