stores

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2022 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ByteHash

func ByteHash(b []byte) uint64

For []byte keys, use this for hashFunc argument to NewShardedTree Uses "github.com/cespare/xxhash/v2".Sum64

func NumberLess

func NumberLess[T sak.Number](a, b T) bool

func StringHash

func StringHash(s string) uint64

For string keys, use this for hashFunc argument to NewShardedTree Uses "github.com/cespare/xxhash/v2".Sum64String

func StringLess

func StringLess(a, b string) bool

Types

type HashFunc

type HashFunc[T any] func(T) uint64

type Keyed

type Keyed interface {
	Key() string
}

type LessFunc

type LessFunc[T any] btree.LessFunc[T]

type MinMaxHeap

type MinMaxHeap[T Prioritizable[T]] struct {
	// contains filtered or unexported fields
}

MinMaxHeap provides min-max heap operations for any type that implements heap.Interface. A min-max heap can be used to implement a double-ended priority queue.

Min-max heap implementation from the 1986 paper "Min-Max Heaps and Generalized Priority Queues" by Atkinson et. al. https://doi.org/10.1145/6617.6621.

func NewMinMaxHeap

func NewMinMaxHeap[T Prioritizable[T]](items ...T) *MinMaxHeap[T]

func (*MinMaxHeap[T]) Len

func (pq *MinMaxHeap[T]) Len() int

func (*MinMaxHeap[T]) Max

func (pq *MinMaxHeap[T]) Max() *PrioritizedItem[T]

func (*MinMaxHeap[T]) Min

func (pq *MinMaxHeap[T]) Min() *PrioritizedItem[T]

func (*MinMaxHeap[T]) PopMax

func (pq *MinMaxHeap[T]) PopMax() *PrioritizedItem[T]

func (*MinMaxHeap[T]) PopMin

func (pq *MinMaxHeap[T]) PopMin() *PrioritizedItem[T]

func (*MinMaxHeap[T]) Push

func (pq *MinMaxHeap[T]) Push(item *PrioritizedItem[T])

func (*MinMaxHeap[T]) Remove

func (pq *MinMaxHeap[T]) Remove(item *PrioritizedItem[T])

func (*MinMaxHeap[T]) Update

func (pq *MinMaxHeap[T]) Update(item *PrioritizedItem[T])

type Prioritizable

type Prioritizable[T any] interface {
	HasPriorityOver(item T) bool
}

type PrioritizedItem

type PrioritizedItem[T Prioritizable[T]] struct {
	Value T
	// contains filtered or unexported fields
}

type ShardedTree

type ShardedTree[K any, T any] struct {
	// contains filtered or unexported fields
}

A convenience data structure provided for storing large amounts of data in an in-memory StateStore. Simply wraps an array of github.com/google/btree#BTreeG[T]. This is to help alleviate the O(log(n)) performance degerdation of a single, very large tree. If you need to store more than a million items in a single tree, you might consider using a ShardTree. There is an upfront O(1) performance hit for calculating the hash of key K when finding a tree. If you need ordering across your entire data set, this will not fit the bill.

Example (Contact)

This example creates a tree with 32 shards (2<<4). Each tree shard will be sorted by LastName, FirstName in ascending order.

package main

import (
	"fmt"
	"strings"

	"github.com/aws/go-kafka-event-source/streams/stores"
)

func main() {
	type Contact struct {
		PhoneNumber string
		FirstName   string
		LastName    string
	}

	contactSort := func(a, b Contact) bool {
		res := strings.Compare(a.LastName, b.LastName)
		if res != 0 {
			return res < 0
		}
		return a.FirstName < b.FirstName
	}

	shardedTree := stores.NewShardedTree(4, stores.StringHash, contactSort)
	contact := Contact{
		PhoneNumber: "+18005551213",
		FirstName:   "Billy",
		LastName:    "Bob",
	}

	tree := shardedTree.For(contact.LastName)
	tree.ReplaceOrInsert(contact)

	contact.PhoneNumber = "+18005551212"

	if oldContact, updated := tree.ReplaceOrInsert(contact); updated {
		fmt.Printf("PhoneNumber updated from %s to %s\n", oldContact.PhoneNumber, contact.PhoneNumber)
	}
}
Output:

PhoneNumber updated from +18005551213 to +18005551212
Example (Number)
package main

import (
	"github.com/aws/go-kafka-event-source/streams/stores"
)

func main() {
	shardedTree := stores.NewShardedTree(4, stores.StringHash, stores.NumberLess[int])
	partionKey := "Foo"
	value := 1000
	shardedTree.For(partionKey).ReplaceOrInsert(value)
}
Output:

Example (String)
package main

import (
	"github.com/aws/go-kafka-event-source/streams/stores"
)

func main() {
	shardedTree := stores.NewShardedTree(4, stores.StringHash, stores.StringLess)
	partionKey := "Foo"
	item := "Bar"
	shardedTree.For(partionKey).ReplaceOrInsert(item)
}
Output:

func NewShardedTree

func NewShardedTree[K any, T any](exponent int, hashFunc HashFunc[K], lessFunc LessFunc[T]) ShardedTree[K, T]

Return a ShardedTree. The exponent argument is used to produce the number of shards as follows:

shards := 2 << exponent

So an exponent of 10 will give you a ShardedTree with 2048 btree.Btree[T] shards. This is to facilitate quicker shard look up with bitwise AND as opposed to a modulo, necessitating a []tree that has a length in an exponent of 2:

mod := shards-1
tree := trees[hashFunc(key)&st.mod]

Your use case wil determine what the proper number of shards is, but it is recommended to start small -> shards counts between 32-512 (exponents of 4-8). `hashFunc` is used to find a the correct tree for a given key K The `lessFunc“ argument mirrors that required by the "github.com/google/btree" package. The trees in the data ShardedTree will share a common btree.FreeListG[T]

func (ShardedTree[K, T]) For

func (st ShardedTree[K, T]) For(key K) *btree.BTreeG[T]

Return the tree for key, invoking the supplied HashFunc[K]. If you're doing multiple operations on the same tree, it makes sense to declare a variable:

tree := shardeTree.For(item.key)
tree.Delete(item)
updateItem(item)
tree.ReplaceOrInsert(item)

func (ShardedTree[K, T]) Len

func (st ShardedTree[K, T]) Len() (l int)

Iterates through all trees and sums their lengths. O(n) performance where n = 2 << exponent.

type SimpleStore

type SimpleStore[T Keyed] struct {
	// contains filtered or unexported fields
}

func NewJsonSimpleStore

func NewJsonSimpleStore[T Keyed](tp streams.TopicPartition) *SimpleStore[T]

func NewSimpleStore

func NewSimpleStore[T Keyed](tp streams.TopicPartition, codec streams.Codec[T]) *SimpleStore[T]

func (*SimpleStore[T]) Delete

func (s *SimpleStore[T]) Delete(item T) (cle streams.ChangeLogEntry, ok bool)

func (*SimpleStore[T]) Get

func (s *SimpleStore[T]) Get(id string) (val T, ok bool)

func (*SimpleStore[T]) Put

func (s *SimpleStore[T]) Put(item T) streams.ChangeLogEntry

func (*SimpleStore[T]) ReceiveChange

func (s *SimpleStore[T]) ReceiveChange(record streams.IncomingRecord) (err error)

func (*SimpleStore[T]) Revoked

func (s *SimpleStore[T]) Revoked()

func (*SimpleStore[T]) ToChangeLogEntry

func (s *SimpleStore[T]) ToChangeLogEntry(item T) streams.ChangeLogEntry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL