schemaless

package
v1.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2023 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound        = fmt.Errorf("not found")
	ErrEmptyCommand    = fmt.Errorf("empty command")
	ErrInvalidType     = fmt.Errorf("invalid type")
	ErrVersionConflict = fmt.Errorf("version conflict")
	ErrInternalError   = fmt.Errorf("internal error")
)
View Source
var WithOnlyRecordSchemaOptions = schema.WithExtraRules()

Functions

func GetEnvOr

func GetEnvOr(key string, defaultValue string) string

Types

type Aggregator

type Aggregator[T, R any] interface {
	Append(data Record[T]) error
	Delete(data Record[T]) error
	GetVersionedIndices() map[string]Record[schema.Schema]
}

type AppendLog

type AppendLog[T any] struct {
	// contains filtered or unexported fields
}

AppendLog is a stream of events, and in context of schemaless, it is a stream of changes to records, or deleted record with past state

func NewAppendLog

func NewAppendLog[T any]() *AppendLog[T]

func (*AppendLog[T]) Append

func (a *AppendLog[T]) Append(b *AppendLog[T])

func (*AppendLog[T]) Change

func (a *AppendLog[T]) Change(from, to Record[T]) error

func (*AppendLog[T]) Close

func (a *AppendLog[T]) Close()

func (*AppendLog[T]) Delete

func (a *AppendLog[T]) Delete(data Record[T]) error

func (*AppendLog[T]) Push

func (a *AppendLog[T]) Push(x Change[T])

func (*AppendLog[T]) Subscribe

func (a *AppendLog[T]) Subscribe(ctx context.Context, fromOffset int, f func(Change[T])) error

type Change

type Change[T any] struct {
	Before  *Record[T]
	After   *Record[T]
	Deleted bool
	Offset  int
}

type Cursor

type Cursor = string

type DynamoDBRepository

type DynamoDBRepository struct {
	// contains filtered or unexported fields
}

func NewDynamoDBRepository

func NewDynamoDBRepository(client *dynamodb.Client, tableName string) *DynamoDBRepository

func (*DynamoDBRepository) FindingRecords

func (d *DynamoDBRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)

func (*DynamoDBRepository) Get

func (d *DynamoDBRepository) Get(key, recordType string) (Record[schema.Schema], error)

func (*DynamoDBRepository) UpdateRecords

func (d *DynamoDBRepository) UpdateRecords(command UpdateRecords[Record[schema.Schema]]) error

type FindingRecords

type FindingRecords[T any] struct {
	RecordType string
	Where      *predicate.WherePredicates
	Sort       []SortField
	Limit      uint8
	After      *Cursor
}

type InMemoryRepository

type InMemoryRepository struct {
	// contains filtered or unexported fields
}

func NewInMemoryRepository

func NewInMemoryRepository() *InMemoryRepository

func (*InMemoryRepository) AppendLog

func (s *InMemoryRepository) AppendLog() *AppendLog[schema.Schema]

func (*InMemoryRepository) FindingRecords

func (s *InMemoryRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)

func (*InMemoryRepository) Get

func (s *InMemoryRepository) Get(recordID, recordType string) (Record[schema.Schema], error)

func (*InMemoryRepository) UpdateRecords

func (s *InMemoryRepository) UpdateRecords(x UpdateRecords[Record[schema.Schema]]) error

type KayedAggregate

type KayedAggregate[T, R any] struct {
	// contains filtered or unexported fields
}

func NewKeyedAggregate

func NewKeyedAggregate[T, R any](
	recordTypeName string,
	supportedRecordTypes []string,
	groupByFunc func(data T) (string, R),
	combineByFunc func(a, b R) (R, error),
	storage Repository[schema.Schema],
) *KayedAggregate[T, R]

func (*KayedAggregate[T, R]) Append

func (t *KayedAggregate[T, R]) Append(data Record[T]) error

func (*KayedAggregate[T, R]) Delete

func (t *KayedAggregate[T, R]) Delete(data Record[T]) error

func (*KayedAggregate[T, R]) GetIndexByKey

func (t *KayedAggregate[T, R]) GetIndexByKey(key string) R

func (*KayedAggregate[T, R]) GetVersionedIndices

func (t *KayedAggregate[T, R]) GetVersionedIndices() map[string]Record[schema.Schema]

type KinesisStream

type KinesisStream struct {
	// contains filtered or unexported fields
}

func NewKinesisStream

func NewKinesisStream(k *kinesis.Client, streamName string) *KinesisStream

func (*KinesisStream) Process

func (s *KinesisStream) Process()

func (*KinesisStream) Pull

func (s *KinesisStream) Pull() chan Change[schema.Schema]

func (*KinesisStream) Subscribe

func (s *KinesisStream) Subscribe(ctx context.Context, fromOffset int, f func(Change[schema.Schema])) error

type NoopAggregator

type NoopAggregator[T, R any] struct{}

func NewNoopAggregator

func NewNoopAggregator[T, R any]() *NoopAggregator[T, R]

func (*NoopAggregator[T, R]) Append

func (n *NoopAggregator[T, R]) Append(data Record[T]) error

func (*NoopAggregator[T, R]) Delete

func (n *NoopAggregator[T, R]) Delete(data Record[T]) error

func (*NoopAggregator[T, R]) GetVersionedIndices

func (n *NoopAggregator[T, R]) GetVersionedIndices() map[string]Record[schema.Schema]

type OpenSearchRepository

type OpenSearchRepository struct {
	// contains filtered or unexported fields
}

func NewOpenSearchRepository

func NewOpenSearchRepository(client *opensearch.Client, index string) *OpenSearchRepository

func (*OpenSearchRepository) FindingRecords

func (os *OpenSearchRepository) FindingRecords(query FindingRecords[Record[schema.Schema]]) (PageResult[Record[schema.Schema]], error)

func (*OpenSearchRepository) Get

func (os *OpenSearchRepository) Get(recordID string, recordType RecordType) (Record[schema.Schema], error)

func (*OpenSearchRepository) ToSorters

func (os *OpenSearchRepository) ToSorters(sort []SortField) []any

func (*OpenSearchRepository) UpdateRecords

func (os *OpenSearchRepository) UpdateRecords(command UpdateRecords[Record[schema.Schema]]) error

type PageResult

type PageResult[A any] struct {
	Items []A
	Next  *FindingRecords[A]
}

func (PageResult[A]) HasNext

func (a PageResult[A]) HasNext() bool

type Record

type Record[A any] struct {
	ID      string
	Type    string
	Data    A
	Version uint16
}

Record could have two types (to think about it more): data records, which is current implementation index records, which is future implementation

  • when two replicas have same aggregator rules, then during replication of logs, index can be reused

func RecordAs

func RecordAs[A any](record Record[schema.Schema]) (Record[A], error)

type RecordType

type RecordType = string

type Repository

type Repository[T any] interface {
	Get(recordID string, recordType RecordType) (Record[T], error)
	UpdateRecords(command UpdateRecords[Record[T]]) error
	FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error)
}

type SortField

type SortField struct {
	Field      string
	Descending bool
}

type Storage

type Storage[T any] interface {
	GetAs(id string, x *T) error
}

type UpdateRecords

type UpdateRecords[T any] struct {
	UpdatingPolicy UpdatingPolicy
	Saving         map[string]T
	Deleting       map[string]T
}

func Delete

func Delete[T any](xs ...Record[T]) UpdateRecords[Record[T]]

func Save

func Save[T any](xs ...Record[T]) UpdateRecords[Record[T]]

func SaveAndDelete

func SaveAndDelete(saving, deleting UpdateRecords[Record[schema.Schema]]) UpdateRecords[Record[schema.Schema]]

func (UpdateRecords[T]) IsEmpty

func (s UpdateRecords[T]) IsEmpty() bool

type UpdatingPolicy

type UpdatingPolicy uint
const (
	PolicyIfServerNotChanged UpdatingPolicy = iota
	PolicyOverwriteServerChanges
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL