stream

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package stream performs range-reads and filtering.

Example
package main

import (
	"context"
	"fmt"

	"github.com/apple/foundationdb/bindings/go/src/fdb"
	"github.com/apple/foundationdb/bindings/go/src/fdb/directory"

	"github.com/janderland/fdbq/engine/facade"
	"github.com/janderland/fdbq/engine/stream"
	"github.com/janderland/fdbq/keyval"
)

func main() {
	db := facade.NewTransactor(fdb.MustOpenDefault(), directory.Root())
	s := stream.New(context.Background())

	query := keyval.KeyValue{
		Key: keyval.Key{
			Directory: keyval.Directory{keyval.String("my"), keyval.String("dir")},
			Tuple:     keyval.Tuple{keyval.Float(22.9), keyval.MaybeMore{}},
		},
		Value: keyval.Variable{},
	}

	_, err := db.ReadTransact(func(tr facade.ReadTransaction) (interface{}, error) {
		ch1 := s.OpenDirectories(tr, query.Key.Directory)
		ch2 := s.ReadRange(tr, query.Key.Tuple, stream.RangeOpts{}, ch1)
		ch3 := s.UnpackKeys(query.Key.Tuple, true, ch2)
		for msg := range s.UnpackValues(query.Value, true, ch3) {
			if msg.Err != nil {
				return nil, msg.Err
			}
			fmt.Printf("read kv: %v\n", msg.KV)
		}
		return nil, nil
	})
	if err != nil {
		panic(err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DirErr

type DirErr struct {
	Dir directory.DirectorySubspace
	Err error
}

DirErr is streamed from a call to Stream.OpenDirectories. If Err is nil, the other fields should be non-nil. If Err is non-nil, the other fields should be nil.

type DirKVErr

type DirKVErr struct {
	Dir directory.DirectorySubspace
	KV  fdb.KeyValue
	Err error
}

DirKVErr is streamed from a call to Stream.ReadRange. If Err is nil, the other fields should be non-nil. If Err is non-nil, the other fields should be nil.

type KeyValErr

type KeyValErr struct {
	KV  keyval.KeyValue
	Err error
}

KeyValErr is streamed from a call to Stream.UnpackKeys or Stream.UnpackValues. If Err is nil, the other fields should be non-nil. If Err is non-nil, the other fields should be nil.

type Option

type Option func(*Stream)

Option can be passed as a trailing argument to the New function to modify properties of the created Stream.

func ByteOrder

func ByteOrder(order binary.ByteOrder) Option

ByteOrder sets the endianness used for encoding/decoding values. This method must not be called concurrently with other methods.

func Logger

func Logger(log zerolog.Logger) Option

Logger configures the logger that a Stream will use.

type RangeOpts

type RangeOpts struct {
	Reverse bool
	Limit   int
}

RangeOpts configures how a Stream performs a range read.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream provides methods which build pipelines for reading a range of key-values.

func New

func New(ctx context.Context, opts ...Option) Stream

New constructs a new Stream. The context provides a way to cancel any pipelines created with this Stream.

func (*Stream) OpenDirectories

func (x *Stream) OpenDirectories(tr facade.ReadTransactor, query keyval.Directory) chan DirErr

OpenDirectories executes the given directory query in a separate goroutine using the given transactor. When the goroutine exits, the returned channel is closed. If the associated context.Context is canceled, then the goroutine exits after the latest FDB call.

func (*Stream) ReadRange

func (x *Stream) ReadRange(tr facade.ReadTransaction, query keyval.Tuple, opts RangeOpts, in chan DirErr) chan DirKVErr

ReadRange executes range-reads in a separate goroutine using the given transactor. When the goroutine exits, the returned channel is closed. Any errors read from the input channel are wrapped and forwarded. For each directory read from the input channel, a range-read is performed using the tuple prefix defined by the given keyval.Tuple. If the associated context.Context is canceled, then the goroutine exits after the latest FDB call.

func (*Stream) SendDir

func (x *Stream) SendDir(out chan<- DirErr, in DirErr) bool

SendDir sends the given DirErr onto the given channel and returns true. If the context.Context associated with this Stream is canceled, then nothing is sent and false is returned.

func (*Stream) SendDirKV

func (x *Stream) SendDirKV(out chan<- DirKVErr, in DirKVErr) bool

SendDirKV sends the given DirKVErr onto the given channel and returns true. If the context.Context associated with this Stream is canceled, then nothing is sent and false is returned.

func (*Stream) SendKV

func (x *Stream) SendKV(out chan<- KeyValErr, in KeyValErr) bool

SendKV sends the given KeyValErr onto the given channel and returns true. If the context.Context associated with this Stream is canceled, then nothing is sent and false is returned.

func (*Stream) UnpackKeys

func (x *Stream) UnpackKeys(query keyval.Tuple, filter bool, in chan DirKVErr) chan KeyValErr

UnpackKeys converts the channel of DirKVErr into a channel of KeyValErr in a separate goroutine. When the goroutine exits, the returned channel is closed. Any errors read from the input channel are wrapped and forwarded. Keys are unpacked using subspace.Subspace.Unpack and then converted to FDBQ types. Values are converted to keyval.Bytes; the actual byte string remains unchanged.

func (*Stream) UnpackValues

func (x *Stream) UnpackValues(query keyval.Value, filter bool, in chan KeyValErr) chan KeyValErr

UnpackValues deserializes the values in a separate goroutine. When the goroutine exits, the returned channel is closed. Any errors read from the input channel are wrapped and forwarded. The values of the key-values provided via the input channel are expected to be of type keyval.Bytes, and are converted to the type specified in the given schema.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL