Documentation ¶
Overview ¶
Package stream performs range-reads and filtering.
Example ¶
package main import ( "context" "fmt" "github.com/apple/foundationdb/bindings/go/src/fdb" "github.com/apple/foundationdb/bindings/go/src/fdb/directory" "github.com/janderland/fdbq/engine/facade" "github.com/janderland/fdbq/engine/stream" "github.com/janderland/fdbq/keyval" ) func main() { db := facade.NewTransactor(fdb.MustOpenDefault(), directory.Root()) s := stream.New(context.Background()) query := keyval.KeyValue{ Key: keyval.Key{ Directory: keyval.Directory{keyval.String("my"), keyval.String("dir")}, Tuple: keyval.Tuple{keyval.Float(22.9), keyval.MaybeMore{}}, }, Value: keyval.Variable{}, } _, err := db.ReadTransact(func(tr facade.ReadTransaction) (interface{}, error) { ch1 := s.OpenDirectories(tr, query.Key.Directory) ch2 := s.ReadRange(tr, query.Key.Tuple, stream.RangeOpts{}, ch1) ch3 := s.UnpackKeys(query.Key.Tuple, true, ch2) for msg := range s.UnpackValues(query.Value, true, ch3) { if msg.Err != nil { return nil, msg.Err } fmt.Printf("read kv: %v\n", msg.KV) } return nil, nil }) if err != nil { panic(err) } }
Output:
Index ¶
- type DirErr
- type DirKVErr
- type KeyValErr
- type Option
- type RangeOpts
- type Stream
- func (x *Stream) OpenDirectories(tr facade.ReadTransactor, query keyval.Directory) chan DirErr
- func (x *Stream) ReadRange(tr facade.ReadTransaction, query keyval.Tuple, opts RangeOpts, in chan DirErr) chan DirKVErr
- func (x *Stream) SendDir(out chan<- DirErr, in DirErr) bool
- func (x *Stream) SendDirKV(out chan<- DirKVErr, in DirKVErr) bool
- func (x *Stream) SendKV(out chan<- KeyValErr, in KeyValErr) bool
- func (x *Stream) UnpackKeys(query keyval.Tuple, filter bool, in chan DirKVErr) chan KeyValErr
- func (x *Stream) UnpackValues(query keyval.Value, filter bool, in chan KeyValErr) chan KeyValErr
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DirErr ¶
type DirErr struct { Dir directory.DirectorySubspace Err error }
DirErr is streamed from a call to Stream.OpenDirectories. If Err is nil, the other fields should be non-nil. If Err is non-nil, the other fields should be nil.
type DirKVErr ¶
type DirKVErr struct { Dir directory.DirectorySubspace KV fdb.KeyValue Err error }
DirKVErr is streamed from a call to Stream.ReadRange. If Err is nil, the other fields should be non-nil. If Err is non-nil, the other fields should be nil.
type KeyValErr ¶
KeyValErr is streamed from a call to Stream.UnpackKeys or Stream.UnpackValues. If Err is nil, the other fields should be non-nil. If Err is non-nil, the other fields should be nil.
type Option ¶
type Option func(*Stream)
Option can be passed as a trailing argument to the New function to modify properties of the created Stream.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream provides methods which build pipelines for reading a range of key-values.
func New ¶
New constructs a new Stream. The context provides a way to cancel any pipelines created with this Stream.
func (*Stream) OpenDirectories ¶
OpenDirectories executes the given directory query in a separate goroutine using the given transactor. When the goroutine exits, the returned channel is closed. If the associated context.Context is canceled, then the goroutine exits after the latest FDB call.
func (*Stream) ReadRange ¶
func (x *Stream) ReadRange(tr facade.ReadTransaction, query keyval.Tuple, opts RangeOpts, in chan DirErr) chan DirKVErr
ReadRange executes range-reads in a separate goroutine using the given transactor. When the goroutine exits, the returned channel is closed. Any errors read from the input channel are wrapped and forwarded. For each directory read from the input channel, a range-read is performed using the tuple prefix defined by the given keyval.Tuple. If the associated context.Context is canceled, then the goroutine exits after the latest FDB call.
func (*Stream) SendDir ¶
SendDir sends the given DirErr onto the given channel and returns true. If the context.Context associated with this Stream is canceled, then nothing is sent and false is returned.
func (*Stream) SendDirKV ¶
SendDirKV sends the given DirKVErr onto the given channel and returns true. If the context.Context associated with this Stream is canceled, then nothing is sent and false is returned.
func (*Stream) SendKV ¶
SendKV sends the given KeyValErr onto the given channel and returns true. If the context.Context associated with this Stream is canceled, then nothing is sent and false is returned.
func (*Stream) UnpackKeys ¶
UnpackKeys converts the channel of DirKVErr into a channel of KeyValErr in a separate goroutine. When the goroutine exits, the returned channel is closed. Any errors read from the input channel are wrapped and forwarded. Keys are unpacked using subspace.Subspace.Unpack and then converted to FDBQ types. Values are converted to keyval.Bytes; the actual byte string remains unchanged.
func (*Stream) UnpackValues ¶
UnpackValues deserializes the values in a separate goroutine. When the goroutine exits, the returned channel is closed. Any errors read from the input channel are wrapped and forwarded. The values of the key-values provided via the input channel are expected to be of type keyval.Bytes, and are converted to the type specified in the given schema.