Documentation ¶
Index ¶
- Variables
- func GetEnvOr(key string, defaultValue string) string
- type Aggregator
- type AppendLog
- func (a *AppendLog[T]) Append(b *AppendLog[T])
- func (a *AppendLog[T]) Change(from, to Record[T]) error
- func (a *AppendLog[T]) Close()
- func (a *AppendLog[T]) Delete(data Record[T]) error
- func (a *AppendLog[T]) Push(x Change[T])
- func (a *AppendLog[T]) Subscribe(ctx context.Context, fromOffset int, f func(Change[T])) error
- type Change
- type Cursor
- type DynamoDBRepository
- func (d *DynamoDBRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)
- func (d *DynamoDBRepository) Get(key, recordType string) (Record[schema.Schema], error)
- func (d *DynamoDBRepository) UpdateRecords(command UpdateRecords[Record[schema.Schema]]) error
- type FindingRecords
- type InMemoryRepository
- func (s *InMemoryRepository) AppendLog() *AppendLog[schema.Schema]
- func (s *InMemoryRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)
- func (s *InMemoryRepository) Get(recordID, recordType string) (Record[schema.Schema], error)
- func (s *InMemoryRepository) UpdateRecords(x UpdateRecords[Record[schema.Schema]]) error
- type KayedAggregate
- type KinesisStream
- type NoopAggregator
- type OpenSearchRepository
- func (os *OpenSearchRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)
- func (os *OpenSearchRepository) Get(recordID string, recordType RecordType) (Record[schema.Schema], error)
- func (os *OpenSearchRepository) ToSorters(sort []SortField) []any
- func (os *OpenSearchRepository) UpdateRecords(command UpdateRecords[Record[schema.Schema]]) error
- type PageResult
- type Record
- type RecordType
- type Repository
- type SortField
- type Storage
- type UpdateRecords
- type UpdatingPolicy
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNotFound = fmt.Errorf("not found") ErrEmptyCommand = fmt.Errorf("empty command") ErrInvalidType = fmt.Errorf("invalid type") ErrVersionConflict = fmt.Errorf("version conflict") ErrInternalError = fmt.Errorf("internal error") )
View Source
var WithOnlyRecordSchemaOptions = schema.WithExtraRules()
Functions ¶
Types ¶
type Aggregator ¶
type AppendLog ¶
type AppendLog[T any] struct { // contains filtered or unexported fields }
AppendLog is a stream of events, and in context of schemaless, it is a stream of changes to records, or deleted record with past state
func NewAppendLog ¶
type DynamoDBRepository ¶
type DynamoDBRepository struct {
// contains filtered or unexported fields
}
func NewDynamoDBRepository ¶
func NewDynamoDBRepository(client *dynamodb.Client, tableName string) *DynamoDBRepository
func (*DynamoDBRepository) FindingRecords ¶
func (d *DynamoDBRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)
func (*DynamoDBRepository) UpdateRecords ¶
func (d *DynamoDBRepository) UpdateRecords(command UpdateRecords[Record[schema.Schema]]) error
type FindingRecords ¶
type FindingRecords[T any] struct { RecordType string Where *predicate.WherePredicates Sort []SortField Limit uint8 After *Cursor }
go:generate go run ../../../cmd/mkunion/main.go -name RecordsDSL
type InMemoryRepository ¶
type InMemoryRepository struct {
// contains filtered or unexported fields
}
func NewInMemoryRepository ¶
func NewInMemoryRepository() *InMemoryRepository
func (*InMemoryRepository) AppendLog ¶
func (s *InMemoryRepository) AppendLog() *AppendLog[schema.Schema]
func (*InMemoryRepository) FindingRecords ¶
func (s *InMemoryRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)
func (*InMemoryRepository) UpdateRecords ¶
func (s *InMemoryRepository) UpdateRecords(x UpdateRecords[Record[schema.Schema]]) error
type KayedAggregate ¶
type KayedAggregate[T, R any] struct { // contains filtered or unexported fields }
func NewKeyedAggregate ¶
func NewKeyedAggregate[T, R any]( recordTypeName string, supportedRecordTypes []string, groupByFunc func(data T) (string, R), combineByFunc func(a, b R) (R, error), storage Repository[schema.Schema], ) *KayedAggregate[T, R]
func (*KayedAggregate[T, R]) Append ¶
func (t *KayedAggregate[T, R]) Append(data Record[T]) error
func (*KayedAggregate[T, R]) Delete ¶
func (t *KayedAggregate[T, R]) Delete(data Record[T]) error
func (*KayedAggregate[T, R]) GetIndexByKey ¶
func (t *KayedAggregate[T, R]) GetIndexByKey(key string) R
func (*KayedAggregate[T, R]) GetVersionedIndices ¶
func (t *KayedAggregate[T, R]) GetVersionedIndices() map[string]Record[schema.Schema]
type KinesisStream ¶
type KinesisStream struct {
// contains filtered or unexported fields
}
func NewKinesisStream ¶
func NewKinesisStream(k *kinesis.Client, streamName string) *KinesisStream
func (*KinesisStream) Process ¶
func (s *KinesisStream) Process()
type NoopAggregator ¶
type NoopAggregator[T, R any] struct{}
func NewNoopAggregator ¶
func NewNoopAggregator[T, R any]() *NoopAggregator[T, R]
func (*NoopAggregator[T, R]) Append ¶
func (n *NoopAggregator[T, R]) Append(data Record[T]) error
func (*NoopAggregator[T, R]) Delete ¶
func (n *NoopAggregator[T, R]) Delete(data Record[T]) error
func (*NoopAggregator[T, R]) GetVersionedIndices ¶
func (n *NoopAggregator[T, R]) GetVersionedIndices() map[string]Record[schema.Schema]
type OpenSearchRepository ¶
type OpenSearchRepository struct {
// contains filtered or unexported fields
}
func NewOpenSearchRepository ¶
func NewOpenSearchRepository(client *opensearch.Client, index string) *OpenSearchRepository
func (*OpenSearchRepository) FindingRecords ¶
func (os *OpenSearchRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)
func (*OpenSearchRepository) Get ¶
func (os *OpenSearchRepository) Get(recordID string, recordType RecordType) (Record[schema.Schema], error)
func (*OpenSearchRepository) ToSorters ¶
func (os *OpenSearchRepository) ToSorters(sort []SortField) []any
func (*OpenSearchRepository) UpdateRecords ¶
func (os *OpenSearchRepository) UpdateRecords(command UpdateRecords[Record[schema.Schema]]) error
type PageResult ¶
type PageResult[A any] struct { Items []A Next *FindingRecords[A] }
func (PageResult[A]) HasNext ¶
func (a PageResult[A]) HasNext() bool
type Record ¶
Record could have two types (to think about it more): data records, which is current implementation index records, which is future implementation
- when two replicas have same aggregator rules, then during replication of logs, index can be reused
go:generate go run ../../../cmd/mkunion/main.go -name RecordDSL
type RecordType ¶
type RecordType = string
type Repository ¶
type Repository[T any] interface { Get(recordID string, recordType RecordType) (Record[T], error) UpdateRecords(command UpdateRecords[Record[T]]) error FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error) }
type UpdateRecords ¶
type UpdateRecords[T any] struct { UpdatingPolicy UpdatingPolicy Saving map[string]T Deleting map[string]T }
go:generate go run ../../../cmd/mkunion/main.go -name RecordsDSL
func SaveAndDelete ¶
func SaveAndDelete(saving, deleting UpdateRecords[Record[schema.Schema]]) UpdateRecords[Record[schema.Schema]]
func (UpdateRecords[T]) IsEmpty ¶
func (s UpdateRecords[T]) IsEmpty() bool
type UpdatingPolicy ¶
type UpdatingPolicy uint
const ( PolicyIfServerNotChanged UpdatingPolicy = iota PolicyOverwriteServerChanges )
Source Files ¶
Click to show internal directories.
Click to hide internal directories.