Documentation ¶
Index ¶
- func AgeRangeKey(age int) string
- type TypedAppendLog
- func (t *TypedAppendLog[T]) Append(b *schemaless.AppendLog[T])
- func (t *TypedAppendLog[T]) Change(from, to schemaless.Record[T]) error
- func (t *TypedAppendLog[T]) Close()
- func (t *TypedAppendLog[T]) Delete(data schemaless.Record[T]) error
- func (t *TypedAppendLog[T]) Push(x schemaless.Change[T])
- func (t *TypedAppendLog[T]) Subscribe(ctx context.Context, fromOffset int, f func(schemaless.Change[T])) error
- type TypedRepoWithAggregator
- func (repo *TypedRepoWithAggregator[T, C]) FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error)
- func (repo *TypedRepoWithAggregator[T, C]) Get(recordID string, recordType RecordType) (Record[T], error)
- func (repo *TypedRepoWithAggregator[T, C]) ReindexAll()
- func (repo *TypedRepoWithAggregator[T, C]) UpdateRecords(s UpdateRecords[Record[T]]) error
- type User
- type UsersCountByAge
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AgeRangeKey ¶ added in v1.21.0
Types ¶
type TypedAppendLog ¶ added in v1.21.0
type TypedAppendLog[T any] struct { // contains filtered or unexported fields }
func NewTypedAppendLog ¶ added in v1.21.0
func NewTypedAppendLog[T any](log schemaless.AppendLoger[schema.Schema]) *TypedAppendLog[T]
func (*TypedAppendLog[T]) Append ¶ added in v1.21.0
func (t *TypedAppendLog[T]) Append(b *schemaless.AppendLog[T])
func (*TypedAppendLog[T]) Change ¶ added in v1.21.0
func (t *TypedAppendLog[T]) Change(from, to schemaless.Record[T]) error
func (*TypedAppendLog[T]) Close ¶ added in v1.21.0
func (t *TypedAppendLog[T]) Close()
func (*TypedAppendLog[T]) Delete ¶ added in v1.21.0
func (t *TypedAppendLog[T]) Delete(data schemaless.Record[T]) error
func (*TypedAppendLog[T]) Push ¶ added in v1.21.0
func (t *TypedAppendLog[T]) Push(x schemaless.Change[T])
func (*TypedAppendLog[T]) Subscribe ¶ added in v1.21.0
func (t *TypedAppendLog[T]) Subscribe(ctx context.Context, fromOffset int, f func(schemaless.Change[T])) error
type TypedRepoWithAggregator ¶
func NewTypedRepoWithAggregator ¶
func NewTypedRepoWithAggregator[T, C any]( store Repository[schema.Schema], aggregator func() Aggregator[T, C], ) *TypedRepoWithAggregator[T, C]
func NewTypedRepository ¶
func NewTypedRepository[A any]( store schemaless.Repository[schema.Schema], ) *TypedRepoWithAggregator[A, any]
func (*TypedRepoWithAggregator[T, C]) FindingRecords ¶
func (repo *TypedRepoWithAggregator[T, C]) FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error)
func (*TypedRepoWithAggregator[T, C]) Get ¶
func (repo *TypedRepoWithAggregator[T, C]) Get(recordID string, recordType RecordType) (Record[T], error)
func (*TypedRepoWithAggregator[T, C]) ReindexAll ¶
func (repo *TypedRepoWithAggregator[T, C]) ReindexAll()
ReindexAll is used to reindex all records with a provided aggregator definition Example: when aggregator is created, it's empty, so it needs to be filled with all records Example: when aggregator definition is changed, it needs to be reindexed Example: when aggregator is corrupted, it needs to be reindexed
How it works? 1. It's called by the user 2. It's called by the system when it detects that aggregator is corrupted 3. It's called by the system when it detects that aggregator definition is changed
How it's implemented?
- Create index from snapshot of all records. Because it's snapshot, changes are not applied.
- In parallel process stream of changes from give point of time.
- KayedAggregate must be idempotent, so same won't be indexed twice.
- When aggregator detects same record with new Version, it retracts old Version and accumulates new Version.
- When it's done, it's ready to be used
- When indices are set up as synchronous, then every change is indexed immediately. But, because synchronous index is from point of time, it needs to trigger reindex. Which imply that aggregator myst know when index was created, so it can know when to stop rebuilding process. This implies control plane. Versions of records should follow monotonically increasing order, that way it will be easier to detect when index is up to date.
func (*TypedRepoWithAggregator[T, C]) UpdateRecords ¶
func (repo *TypedRepoWithAggregator[T, C]) UpdateRecords(s UpdateRecords[Record[T]]) error
type UsersCountByAge ¶ added in v1.21.0
type UsersCountByAge struct {
Count int
}
Click to show internal directories.
Click to hide internal directories.