Documentation ¶
Index ¶
- type TypedRepoWithAggregator
- func (r *TypedRepoWithAggregator[T, C]) FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error)
- func (r *TypedRepoWithAggregator[T, C]) Get(recordID string, recordType RecordType) (Record[T], error)
- func (r *TypedRepoWithAggregator[T, C]) ReindexAll()
- func (r *TypedRepoWithAggregator[T, C]) UpdateRecords(s UpdateRecords[Record[T]]) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type TypedRepoWithAggregator ¶
func NewTypedRepoWithAggregator ¶
func NewTypedRepoWithAggregator[T, C any]( store Repository[schema.Schema], aggregator func() Aggregator[T, C], ) *TypedRepoWithAggregator[T, C]
func NewTypedRepository ¶
func NewTypedRepository[A any]( store schemaless.Repository[schema.Schema], ) *TypedRepoWithAggregator[A, any]
func (*TypedRepoWithAggregator[T, C]) FindingRecords ¶
func (r *TypedRepoWithAggregator[T, C]) FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error)
func (*TypedRepoWithAggregator[T, C]) Get ¶
func (r *TypedRepoWithAggregator[T, C]) Get(recordID string, recordType RecordType) (Record[T], error)
func (*TypedRepoWithAggregator[T, C]) ReindexAll ¶
func (r *TypedRepoWithAggregator[T, C]) ReindexAll()
ReindexAll is used to reindex all records with a provided aggregator definition Example: when aggregator is created, it's empty, so it needs to be filled with all records Example: when aggregator definition is changed, it needs to be reindexed Example: when aggregator is corrupted, it needs to be reindexed
How it works? 1. It's called by the user 2. It's called by the system when it detects that aggregator is corrupted 3. It's called by the system when it detects that aggregator definition is changed
How it's implemented?
- Create index from snapshot of all records. Because it's snapshot, changes are not applied.
- In parallel process stream of changes from give point of time.
- KayedAggregate must be idempotent, so same won't be indexed twice.
- When aggregator detects same record with new Version, it retracts old Version and accumulates new Version.
- When it's done, it's ready to be used
- When indices are set up as synchronous, then every change is indexed immediately. But, because synchronous index is from point of time, it needs to trigger reindex. Which imply that aggregator myst know when index was created, so it can know when to stop rebuilding process. This implies control plane. Versions of records should follow monotonically increasing order, that way it will be easier to detect when index is up to date.
func (*TypedRepoWithAggregator[T, C]) UpdateRecords ¶
func (r *TypedRepoWithAggregator[T, C]) UpdateRecords(s UpdateRecords[Record[T]]) error
Click to show internal directories.
Click to hide internal directories.