schemaless

package
v1.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Code generated by mkunion. DO NOT EDIT.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound        = fmt.Errorf("not found")
	ErrEmptyCommand    = fmt.Errorf("empty command")
	ErrInvalidType     = fmt.Errorf("invalid type")
	ErrVersionConflict = fmt.Errorf("version conflict")
	ErrInternalError   = fmt.Errorf("internal error")
)

Functions

func CursorShape added in v1.24.0

func CursorShape() shape.Shape

func ExampleRecordShape added in v1.24.0

func ExampleRecordShape() shape.Shape

func FindingRecordsShape added in v1.24.0

func FindingRecordsShape() shape.Shape

func GetEnvOr

func GetEnvOr(key string, defaultValue string) string

func OpenSearchRepositoryShape added in v1.24.0

func OpenSearchRepositoryShape() shape.Shape

func OpenSearchSearchResultHitShape added in v1.21.0

func OpenSearchSearchResultHitShape() shape.Shape

func OpenSearchSearchResultHitsShape added in v1.21.0

func OpenSearchSearchResultHitsShape() shape.Shape

func OpenSearchSearchResultShape added in v1.21.0

func OpenSearchSearchResultShape() shape.Shape

func PageResultShape added in v1.21.0

func PageResultShape() shape.Shape

func RecordShape added in v1.21.0

func RecordShape() shape.Shape

func RecordTypeShape added in v1.24.0

func RecordTypeShape() shape.Shape

func SortFieldShape added in v1.21.1

func SortFieldShape() shape.Shape

func UpdateRecordsShape added in v1.24.0

func UpdateRecordsShape() shape.Shape

func UpdatingPolicyShape added in v1.24.0

func UpdatingPolicyShape() shape.Shape

Types

type Aggregator

type Aggregator[T, R any] interface {
	Append(data Record[T]) error
	Delete(data Record[T]) error
	GetVersionedIndices() map[string]Record[schema.Schema]
}

type AppendLog

type AppendLog[T any] struct {
	// contains filtered or unexported fields
}

AppendLog is a stream of events, and in context of schemaless, it is a stream of changes to records, or deleted record with past state

func NewAppendLog

func NewAppendLog[T any]() *AppendLog[T]

func (*AppendLog[T]) Append

func (a *AppendLog[T]) Append(b *AppendLog[T])

func (*AppendLog[T]) Change

func (a *AppendLog[T]) Change(from, to Record[T]) error

func (*AppendLog[T]) Close

func (a *AppendLog[T]) Close()

func (*AppendLog[T]) Delete

func (a *AppendLog[T]) Delete(data Record[T]) error

func (*AppendLog[T]) Push

func (a *AppendLog[T]) Push(x Change[T])

func (*AppendLog[T]) Subscribe

func (a *AppendLog[T]) Subscribe(ctx context.Context, fromOffset int, f func(Change[T])) error

type AppendLoger added in v1.21.0

type AppendLoger[T any] interface {
	Close()
	Change(from, to Record[T]) error
	Delete(data Record[T]) error
	Push(x Change[T])
	Append(b *AppendLog[T])
	Subscribe(ctx context.Context, fromOffset int, f func(Change[T])) error
}

type Change

type Change[T any] struct {
	Before  *Record[T]
	After   *Record[T]
	Deleted bool
	Offset  int
}

type Cursor

type Cursor = string

type DynamoDBRepository

type DynamoDBRepository[A any] struct {
	// contains filtered or unexported fields
}

func NewDynamoDBRepository

func NewDynamoDBRepository[A any](client *dynamodb.Client, tableName string) *DynamoDBRepository[A]

func (*DynamoDBRepository[A]) FindingRecords

func (d *DynamoDBRepository[A]) FindingRecords(query FindingRecords[Record[A]]) (PageResult[Record[A]], error)

func (*DynamoDBRepository[A]) Get

func (d *DynamoDBRepository[A]) Get(key, recordType string) (Record[A], error)

func (*DynamoDBRepository[A]) UpdateRecords

func (d *DynamoDBRepository[A]) UpdateRecords(command UpdateRecords[Record[A]]) error

type ExampleRecord added in v1.21.0

type ExampleRecord struct {
	Name string
	Age  int
}

func (*ExampleRecord) MarshalJSON added in v1.24.0

func (r *ExampleRecord) MarshalJSON() ([]byte, error)

func (*ExampleRecord) UnmarshalJSON added in v1.24.0

func (r *ExampleRecord) UnmarshalJSON(data []byte) error

type FindingRecords

type FindingRecords[T any] struct {
	RecordType string
	Where      *predicate.WherePredicates
	Sort       []SortField
	Limit      uint8
	After      *Cursor
	Before     *Cursor
}

type InMemoryRepository

type InMemoryRepository[A any] struct {
	// contains filtered or unexported fields
}

func NewInMemoryRepository

func NewInMemoryRepository[A any]() *InMemoryRepository[A]

func (*InMemoryRepository[A]) AppendLog

func (s *InMemoryRepository[A]) AppendLog() *AppendLog[A]

func (*InMemoryRepository[A]) FindingRecords

func (s *InMemoryRepository[A]) FindingRecords(query FindingRecords[Record[A]]) (PageResult[Record[A]], error)

func (*InMemoryRepository[A]) Get

func (s *InMemoryRepository[A]) Get(recordID, recordType string) (Record[A], error)

func (*InMemoryRepository[A]) UpdateRecords

func (s *InMemoryRepository[A]) UpdateRecords(x UpdateRecords[Record[A]]) error

type KayedAggregate

type KayedAggregate[T, R any] struct {
	// contains filtered or unexported fields
}

func NewKeyedAggregate

func NewKeyedAggregate[T, R any](
	recordTypeName string,
	supportedRecordTypes []string,
	groupByFunc func(data T) (string, R),
	combineByFunc func(a, b R) (R, error),
	storage Repository[schema.Schema],
) *KayedAggregate[T, R]

func (*KayedAggregate[T, R]) Append

func (t *KayedAggregate[T, R]) Append(data Record[T]) error

func (*KayedAggregate[T, R]) Delete

func (t *KayedAggregate[T, R]) Delete(data Record[T]) error

func (*KayedAggregate[T, R]) GetIndexByKey

func (t *KayedAggregate[T, R]) GetIndexByKey(key string) R

func (*KayedAggregate[T, R]) GetVersionedIndices

func (t *KayedAggregate[T, R]) GetVersionedIndices() map[string]Record[schema.Schema]

type KinesisStream

type KinesisStream struct {
	// contains filtered or unexported fields
}

func NewKinesisStream

func NewKinesisStream(k *kinesis.Client, streamName string) *KinesisStream

func (*KinesisStream) Process

func (s *KinesisStream) Process()

func (*KinesisStream) Pull

func (s *KinesisStream) Pull() chan Change[schema.Schema]

func (*KinesisStream) Subscribe

func (s *KinesisStream) Subscribe(ctx context.Context, fromOffset int, f func(Change[schema.Schema])) error

type NoopAggregator

type NoopAggregator[T, R any] struct{}

func NewNoopAggregator

func NewNoopAggregator[T, R any]() *NoopAggregator[T, R]

func (*NoopAggregator[T, R]) Append

func (n *NoopAggregator[T, R]) Append(data Record[T]) error

func (*NoopAggregator[T, R]) Delete

func (n *NoopAggregator[T, R]) Delete(data Record[T]) error

func (*NoopAggregator[T, R]) GetVersionedIndices

func (n *NoopAggregator[T, R]) GetVersionedIndices() map[string]Record[schema.Schema]

type OpenSearchRepository

type OpenSearchRepository[A any] struct {
	// contains filtered or unexported fields
}

func NewOpenSearchRepository

func NewOpenSearchRepository[A any](client *opensearch.Client, index string) *OpenSearchRepository[A]

func (*OpenSearchRepository[A]) FindingRecords

func (os *OpenSearchRepository[A]) FindingRecords(query FindingRecords[Record[A]]) (PageResult[Record[A]], error)

func (*OpenSearchRepository[A]) Get

func (os *OpenSearchRepository[A]) Get(recordID string, recordType RecordType) (Record[A], error)

func (*OpenSearchRepository[A]) ToSorters

func (os *OpenSearchRepository[A]) ToSorters(sort []SortField) []any

func (*OpenSearchRepository[A]) UpdateRecords

func (os *OpenSearchRepository[A]) UpdateRecords(command UpdateRecords[Record[A]]) error

type OpenSearchSearchResult added in v1.21.0

type OpenSearchSearchResult[A any] struct {
	Hits OpenSearchSearchResultHits[A] `json:"hits"`
}

func (*OpenSearchSearchResult[A]) MarshalJSON added in v1.21.0

func (r *OpenSearchSearchResult[A]) MarshalJSON() ([]byte, error)

func (*OpenSearchSearchResult[A]) UnmarshalJSON added in v1.21.0

func (r *OpenSearchSearchResult[A]) UnmarshalJSON(data []byte) error

type OpenSearchSearchResultHit added in v1.21.0

type OpenSearchSearchResultHit[A any] struct {
	Item A        `json:"_source"`
	Sort []string `json:"sort"`
}

func (*OpenSearchSearchResultHit[A]) MarshalJSON added in v1.21.0

func (r *OpenSearchSearchResultHit[A]) MarshalJSON() ([]byte, error)

func (*OpenSearchSearchResultHit[A]) UnmarshalJSON added in v1.21.0

func (r *OpenSearchSearchResultHit[A]) UnmarshalJSON(data []byte) error

type OpenSearchSearchResultHits added in v1.21.0

type OpenSearchSearchResultHits[A any] struct {
	Hits []OpenSearchSearchResultHit[A] `json:"hits"`
}

func (*OpenSearchSearchResultHits[A]) MarshalJSON added in v1.21.0

func (r *OpenSearchSearchResultHits[A]) MarshalJSON() ([]byte, error)

func (*OpenSearchSearchResultHits[A]) UnmarshalJSON added in v1.21.0

func (r *OpenSearchSearchResultHits[A]) UnmarshalJSON(data []byte) error

type PageResult

type PageResult[A any] struct {
	Items []A
	Next  *FindingRecords[A]
	Prev  *FindingRecords[A]
}

func (*PageResult[A]) HasNext

func (a *PageResult[A]) HasNext() bool

func (*PageResult[A]) HasPrev added in v1.23.0

func (a *PageResult[A]) HasPrev() bool

func (*PageResult[A]) MarshalJSON added in v1.21.0

func (r *PageResult[A]) MarshalJSON() ([]byte, error)

func (*PageResult[A]) UnmarshalJSON added in v1.21.0

func (r *PageResult[A]) UnmarshalJSON(data []byte) error

type Record

type Record[A any] struct {
	ID      string
	Type    string
	Data    A
	Version uint16
}

Record could have two types (to think about it more): data records, which is current implementation index records, which is future implementation

  • when two replicas have same aggregator rules, then during replication of logs, index can be reused

func RecordAs

func RecordAs[A any](record Record[schema.Schema]) (Record[A], error)

func (*Record[A]) MarshalJSON added in v1.21.0

func (r *Record[A]) MarshalJSON() ([]byte, error)

func (*Record[A]) UnmarshalJSON added in v1.21.0

func (r *Record[A]) UnmarshalJSON(data []byte) error

type RecordType

type RecordType = string

type Repository

type Repository[T any] interface {
	Get(recordID string, recordType RecordType) (Record[T], error)
	UpdateRecords(command UpdateRecords[Record[T]]) error
	FindingRecords(query FindingRecords[Record[T]]) (PageResult[Record[T]], error)
}

type SortField

type SortField struct {
	Field      string
	Descending bool
}

func (*SortField) MarshalJSON added in v1.21.1

func (r *SortField) MarshalJSON() ([]byte, error)

func (*SortField) UnmarshalJSON added in v1.21.1

func (r *SortField) UnmarshalJSON(data []byte) error

type Storage

type Storage[T any] interface {
	GetAs(id string, x *T) error
}

type UpdateRecords

type UpdateRecords[T any] struct {
	UpdatingPolicy UpdatingPolicy
	Saving         map[string]T
	Deleting       map[string]T
}

func Delete

func Delete[T any](xs ...Record[T]) UpdateRecords[Record[T]]

func Save

func Save[T any](xs ...Record[T]) UpdateRecords[Record[T]]

func SaveAndDelete

func SaveAndDelete(saving, deleting UpdateRecords[Record[schema.Schema]]) UpdateRecords[Record[schema.Schema]]

func (*UpdateRecords[T]) IsEmpty

func (s *UpdateRecords[T]) IsEmpty() bool

type UpdatingPolicy

type UpdatingPolicy uint
const (
	PolicyIfServerNotChanged UpdatingPolicy = iota
	PolicyOverwriteServerChanges
)

Directories

Path Synopsis
Code generated by mkunion.
Code generated by mkunion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL