Documentation
¶
Index ¶
- func ByteHash(b []byte) uint64
- func NumberLess[T sak.Number](a, b T) bool
- func StringHash(s string) uint64
- func StringLess(a, b string) bool
- type HashFunc
- type Keyed
- type LessFunc
- type MinMaxHeap
- func (pq *MinMaxHeap[T]) Len() int
- func (pq *MinMaxHeap[T]) Max() *PrioritizedItem[T]
- func (pq *MinMaxHeap[T]) Min() *PrioritizedItem[T]
- func (pq *MinMaxHeap[T]) PopMax() *PrioritizedItem[T]
- func (pq *MinMaxHeap[T]) PopMin() *PrioritizedItem[T]
- func (pq *MinMaxHeap[T]) Push(item *PrioritizedItem[T])
- func (pq *MinMaxHeap[T]) Remove(item *PrioritizedItem[T])
- func (pq *MinMaxHeap[T]) Update(item *PrioritizedItem[T])
- type Prioritizable
- type PrioritizedItem
- type ShardedTree
- type SimpleStore
- func (s *SimpleStore[T]) Delete(item T) (cle streams.ChangeLogEntry, ok bool)
- func (s *SimpleStore[T]) Get(id string) (val T, ok bool)
- func (s *SimpleStore[T]) Put(item T) streams.ChangeLogEntry
- func (s *SimpleStore[T]) ReceiveChange(record streams.IncomingRecord) (err error)
- func (s *SimpleStore[T]) Revoked()
- func (s *SimpleStore[T]) ToChangeLogEntry(item T) streams.ChangeLogEntry
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ByteHash ¶
For []byte keys, use this for hashFunc argument to NewShardedTree Uses "github.com/cespare/xxhash/v2".Sum64
func NumberLess ¶
func StringHash ¶
For string keys, use this for hashFunc argument to NewShardedTree Uses "github.com/cespare/xxhash/v2".Sum64String
func StringLess ¶
Types ¶
type MinMaxHeap ¶
type MinMaxHeap[T Prioritizable[T]] struct { // contains filtered or unexported fields }
MinMaxHeap provides min-max heap operations for any type that implements heap.Interface. A min-max heap can be used to implement a double-ended priority queue.
Min-max heap implementation from the 1986 paper "Min-Max Heaps and Generalized Priority Queues" by Atkinson et. al. https://doi.org/10.1145/6617.6621.
func NewMinMaxHeap ¶
func NewMinMaxHeap[T Prioritizable[T]](items ...T) *MinMaxHeap[T]
func (*MinMaxHeap[T]) Len ¶
func (pq *MinMaxHeap[T]) Len() int
func (*MinMaxHeap[T]) Max ¶
func (pq *MinMaxHeap[T]) Max() *PrioritizedItem[T]
func (*MinMaxHeap[T]) Min ¶
func (pq *MinMaxHeap[T]) Min() *PrioritizedItem[T]
func (*MinMaxHeap[T]) PopMax ¶
func (pq *MinMaxHeap[T]) PopMax() *PrioritizedItem[T]
func (*MinMaxHeap[T]) PopMin ¶
func (pq *MinMaxHeap[T]) PopMin() *PrioritizedItem[T]
func (*MinMaxHeap[T]) Push ¶
func (pq *MinMaxHeap[T]) Push(item *PrioritizedItem[T])
func (*MinMaxHeap[T]) Remove ¶
func (pq *MinMaxHeap[T]) Remove(item *PrioritizedItem[T])
func (*MinMaxHeap[T]) Update ¶
func (pq *MinMaxHeap[T]) Update(item *PrioritizedItem[T])
type Prioritizable ¶
type PrioritizedItem ¶
type PrioritizedItem[T Prioritizable[T]] struct { Value T // contains filtered or unexported fields }
type ShardedTree ¶
A convenience data structure provided for storing large amounts of data in an in-memory StateStore. Simply wraps an array of github.com/google/btree#BTreeG[T]. This is to help alleviate the O(log(n)) performance degerdation of a single, very large tree. If you need to store more than a million items in a single tree, you might consider using a ShardTree. There is an upfront O(1) performance hit for calculating the hash of key K when finding a tree. If you need ordering across your entire data set, this will not fit the bill.
Example (Contact) ¶
This example creates a tree with 32 shards (2<<4). Each tree shard will be sorted by LastName, FirstName in ascending order.
package main import ( "fmt" "strings" "github.com/aws/go-kafka-event-source/streams/stores" ) func main() { type Contact struct { PhoneNumber string FirstName string LastName string } contactSort := func(a, b Contact) bool { res := strings.Compare(a.LastName, b.LastName) if res != 0 { return res < 0 } return a.FirstName < b.FirstName } shardedTree := stores.NewShardedTree(4, stores.StringHash, contactSort) contact := Contact{ PhoneNumber: "+18005551213", FirstName: "Billy", LastName: "Bob", } tree := shardedTree.For(contact.LastName) tree.ReplaceOrInsert(contact) contact.PhoneNumber = "+18005551212" if oldContact, updated := tree.ReplaceOrInsert(contact); updated { fmt.Printf("PhoneNumber updated from %s to %s\n", oldContact.PhoneNumber, contact.PhoneNumber) } }
Output: PhoneNumber updated from +18005551213 to +18005551212
Example (Number) ¶
package main import ( "github.com/aws/go-kafka-event-source/streams/stores" ) func main() { shardedTree := stores.NewShardedTree(4, stores.StringHash, stores.NumberLess[int]) partionKey := "Foo" value := 1000 shardedTree.For(partionKey).ReplaceOrInsert(value) }
Output:
Example (String) ¶
package main import ( "github.com/aws/go-kafka-event-source/streams/stores" ) func main() { shardedTree := stores.NewShardedTree(4, stores.StringHash, stores.StringLess) partionKey := "Foo" item := "Bar" shardedTree.For(partionKey).ReplaceOrInsert(item) }
Output:
func NewShardedTree ¶
func NewShardedTree[K any, T any](exponent int, hashFunc HashFunc[K], lessFunc LessFunc[T]) ShardedTree[K, T]
Return a ShardedTree. The exponent argument is used to produce the number of shards as follows:
shards := 2 << exponent
So an exponent of 10 will give you a ShardedTree with 2048 btree.Btree[T] shards. This is to facilitate quicker shard look up with bitwise AND as opposed to a modulo, necessitating a []tree that has a length in an exponent of 2:
mod := shards-1 tree := trees[hashFunc(key)&st.mod]
Your use case wil determine what the proper number of shards is, but it is recommended to start small -> shards counts between 32-512 (exponents of 4-8). `hashFunc` is used to find a the correct tree for a given key K The `lessFunc“ argument mirrors that required by the "github.com/google/btree" package. The trees in the data ShardedTree will share a common btree.FreeListG[T]
func (ShardedTree[K, T]) For ¶
func (st ShardedTree[K, T]) For(key K) *btree.BTreeG[T]
Return the tree for key, invoking the supplied HashFunc[K]. If you're doing multiple operations on the same tree, it makes sense to declare a variable:
tree := shardeTree.For(item.key) tree.Delete(item) updateItem(item) tree.ReplaceOrInsert(item)
func (ShardedTree[K, T]) Len ¶
func (st ShardedTree[K, T]) Len() (l int)
Iterates through all trees and sums their lengths. O(n) performance where n = 2 << exponent.
type SimpleStore ¶
type SimpleStore[T Keyed] struct { // contains filtered or unexported fields }
func NewJsonSimpleStore ¶
func NewJsonSimpleStore[T Keyed](tp streams.TopicPartition) *SimpleStore[T]
func NewSimpleStore ¶
func NewSimpleStore[T Keyed](tp streams.TopicPartition, codec streams.Codec[T]) *SimpleStore[T]
func (*SimpleStore[T]) Delete ¶
func (s *SimpleStore[T]) Delete(item T) (cle streams.ChangeLogEntry, ok bool)
func (*SimpleStore[T]) Get ¶
func (s *SimpleStore[T]) Get(id string) (val T, ok bool)
func (*SimpleStore[T]) Put ¶
func (s *SimpleStore[T]) Put(item T) streams.ChangeLogEntry
func (*SimpleStore[T]) ReceiveChange ¶
func (s *SimpleStore[T]) ReceiveChange(record streams.IncomingRecord) (err error)
func (*SimpleStore[T]) Revoked ¶
func (s *SimpleStore[T]) Revoked()
func (*SimpleStore[T]) ToChangeLogEntry ¶
func (s *SimpleStore[T]) ToChangeLogEntry(item T) streams.ChangeLogEntry