frames

package module
v0.6.3-v0.9.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2019 License: Apache-2.0 Imports: 19 Imported by: 1

README

V3IO Frames

Build Status GoDoc License

V3IO Frames is a high-speed server and client library for accessing time-series (TSDB), NoSQL, and streaming data in the Iguazio Data Science Platform.

Documentation

Frames currently supports 3 backends and basic CRUD functionality for each.

Supported Backends:

  1. TSDB
  2. KV
  3. Stream
  4. CSV - for testing purposes

All of frames operations are executed via the client object. To create a client object simply provide the Iguazio web-api endpoint and optional credentials.

import v3io_frames as v3f
client = v3f.Client('framesd:8081', user='user1', password='pass')

Note: When running from within the managed jupyter notebook on the iguazio platform there is no need to add credentials as this is handled by the platform.
Next, for every operation we need to provide a backend, and a table parameters and optionally other function specific arguments.

Create

Creates a new table for the desired backend. Not all backends require a table to be created prior to ingestion. For example KV table will be created while ingesting new data, on the other hand since TSDB tables have mandatory fields we need to create a table before ingesting new data.

client.create(backend=<backend>, table=<table>, attrs=<backend_specefic_attributes>)
backend specific parameters
TSDB
  • rate
  • aggregates (optional)
  • aggregation-granularity (optional)

For detailed info on these parameters please visit TSDB docs.
Example:

client.create('tsdb', '/mytable', attrs={'rate': '1/m'})
Stream
  • shards=1 (optional)
  • retention_hours=24 (optional)

For detailed info on these parameters please visit Stream docs.
Example:

client.create('stream', '/mystream', attrs={'shards': '6'})
Write

Writes a Dataframe into one of the supported backends.
Common write parameters:

  • dfs - list of Dataframes to write
  • index_cols=None (optional) - specify specific index columns, by default Dataframe's index columns will be used.
  • labels=None (optional)
  • max_in_message=0 (optional)
  • partition_keys=None (Not yet supported)

Example:

data = [['tom', 10], ['nick', 15], ['juli', 14]] 
df = pd.DataFrame(data, columns = ['name', 'age'])
df.set_index('name')
client.write(backend='kv', table='mytable', dfs=df)
backend specific parameters
KV
  • expression=' ' (optional) - for detailed information on update expressions see docs
  • condition=' ' (optional) - for detailed information on condition expressions see docs

Example:

data = [['tom', 10, 'TLV'], ['nick', 15, 'Berlin'], ['juli', 14, 'NY']] 
df = pd.DataFrame(data, columns = ['name', 'age', 'city'])
df.set_index('name')
v3c.write(backend='kv', table='mytable', dfs=tsdf, expression='city="NY"', condition='age>14')
Read

Reads data from a backend.
Common read parameters:

  • iterator: bool - Return iterator of DataFrames or (if False) just one DataFrame
  • filter: string - Query filter (can't be used with query)
  • columns: []str - List of columns to pass (can't be used with query)
  • data_format: string - Data format (Not yet supported)
  • marker: string - Query marker (Not yet supported)
  • limit: int - Maximal number of rows to return (Not yet supported)
  • row_layout: bool - Weather to use row layout (vs the default column layout) (Not yet supported)
backend specific parameters
TSDB
  • start: string
  • end: string
  • step: string
  • aggregators: string
  • aggregationWindow: string
  • query: string - Query in SQL format
  • group_by: string - Query group by (can't be used with query)
  • multi_index: bool - Get the results as a multi index data frame where the labels are used as indexes in addition to the timestamp, or if False (default behavior) only the timestamp will function as the index.

For detailed info on these parameters please visit TSDB docs.
Example:

df = client.read(backend='tsdb', query="select avg(cpu) as cpu, avg(diskio), avg(network)from mytable", start='now-1d', end='now', step='2h')
KV
  • reset_index: bool - Reset the index. When set to false (default), the dataframe will have the key column of the v3io kv as the index column. When set to true, the index will be reset to a range index.
  • max_in_message: int - Maximal number of rows per message
  • sharding_keys: []string (Experimental)- list of specific sharding keys to query. For range scan formatted tables only.
  • segments: []int64 (Not yet supported)
  • total_segments: int64 (Not yet supported)
  • sort_key_range_start: string (Not yet supported)
  • sort_key_range_end: string (Not yet supported)

For detailed info on these parameters please visit KV docs.

Example:

df = client.read(backend='kv', table='mytable', filter='col1>666')
Stream
  • seek: string - excepted values: time | seq/sequence | latest | earliest.
    if seq seek type is requested, need to provide the desired sequence id via sequence parameter.
    if time seek type is requested, need to provide the desired start time via start parameter.
  • shard_id: string
  • sequence: int64 (optional)

For detailed info on these parameters please visit Stream docs.

Example:

df = client.read(backend='stream', table='mytable', seek='latest', shard_id='5')
Delete

Deletes a table of a specific backend.

Example:

df = client.delete(backend='<backend>', table='mytable')
backend specific parameters
TSDB
  • start: string - delete since start
  • end: string - delete since start

Note: if both start and end are not specified all the TSDB table will be deleted.
For detailed info on these parameters please visit TSDB docs.
Example:

df = client.delete(backend='tsdb', table='mytable', start='now-1d', end='now-5h')
KV
  • filter: string - Filter for selective delete

Example:

df = client.delete(backend='kv', table='mytable', filter='age>40')
Execute

Provides additional functions that are not covered in the basic CRUD functionality.

TSDB

Currently no execute commands are available for the TSDB backend.

KV
  • infer, inferschema - inferring and creating a schema file for a given kv table. Example: client.execute(backend='kv', table='mytable', command='infer')
  • update - perform an update expression for a specific key. Example: client.execute(backend='kv', table='mytable', command='update', args={'key': 'somekey', 'expression': 'col2=30', 'condition': 'col3>15'})
Stream
  • put - putting a new object to a stream. Example: client.execute(backend='stream', table='mystream', command='put', args={'data': 'this a record', 'clientinfo': 'some_info', 'partition': 'partition_key'})

Contributing

Components
  • Go server with support for both the gRPC and HTTP protocols
  • Go client
  • Python client
Development

The core is written in Go. The development is done on the development branch and then released to the master branch.

  • To execute the Go tests, run make test.
  • To execute the Python tests, run make test-python.
Adding/Changing Dependencies
  • If you add Go dependencies, run make update-go-deps.
  • If you add Python dependencies, update clients/py/Pipfile and run make update-py-deps.
Travis CI

Integration tests are run on Travis CI. See .travis.yml for details.

The following environment variables are defined in the Travis settings:

  • Docker Container Registry (Quay.io)
    • DOCKER_PASSWORD — Password for pushing images to Quay.io.
    • DOCKER_USERNAME — Username for pushing images to Quay.io.
  • Python Package Index (PyPI)
    • V3IO_PYPI_PASSWORD — Password for pushing a new release to PyPi.
    • V3IO_PYPI_USER — Username for pushing a new release to PyPi.
  • Iguazio Data Science Platform
    • V3IO_SESSION — A JSON encoded map with session information for running tests. For example:

      '{"url":"45.39.128.5:8081","container":"mitzi","user":"daffy","password":"rabbit season"}'
      

      Note: Make sure to embed the JSON object within single quotes ('{...}').

Docker Image
Building the Image

Use the following command to build the Docker image:

make build-docker
Running the Image

Use the following command to run the Docker image:

docker run \
	-v /path/to/config.yaml:/etc/framesd.yaml \
	quay.io/v3io/frames:unstable

LICENSE

Apache 2

Documentation

Overview

Package frames provides an efficient way of moving data from various sources.

The package is composed os a HTTP web server that can serve data from various sources and from clients in Go and in Python.

Index

Constants

View Source
const (
	IgnoreError = pb.ErrorOptions_IGNORE
	FailOnError = pb.ErrorOptions_FAIL
)

Shortcut for fail/ignore

Variables

View Source
var (
	BoolType   = DType(pb.DType_BOOLEAN)
	FloatType  = DType(pb.DType_FLOAT)
	IntType    = DType(pb.DType_INTEGER)
	StringType = DType(pb.DType_STRING)
	TimeType   = DType(pb.DType_TIME)
)

Possible data types

View Source
var (
	// DefaultLogLevel is the default log verbosity
	DefaultLogLevel string
)
View Source
var ZeroTime time.Time

ZeroTime is zero value for time

Functions

func InitBackendDefaults added in v0.3.0

func InitBackendDefaults(cfg *BackendConfig, framesConfig *Config)

InitBackendDefaults initializes default configuration for backend

func MarshalFrame

func MarshalFrame(frame Frame) ([]byte, error)

MarshalFrame serializes a frame to []byte

func NewLogger

func NewLogger(verbose string) (logger.Logger, error)

NewLogger returns a new logger

func SessionFromEnv

func SessionFromEnv() (*pb.Session, error)

SessionFromEnv return a session from V3IO_SESSION environment variable (JSON encoded)

Types

type BackendConfig

type BackendConfig struct {
	Type    string `json:"type"` // v3io, csv, ...
	Name    string `json:"name"`
	Workers int    `json:"workers"`
	// backend specific options
	Options map[string]interface{} `json:"options"`

	// CSV backend
	RootDir string `json:"rootdir,omitempty"`
}

BackendConfig is default backend configuration

type Client

type Client interface {
	// Read reads data from server
	Read(request *pb.ReadRequest) (FrameIterator, error)
	// Write writes data to server
	Write(request *WriteRequest) (FrameAppender, error)
	// Create creates a table
	Create(request *pb.CreateRequest) error
	// Delete deletes data or table
	Delete(request *pb.DeleteRequest) error
	// Exec executes a command on the backend
	Exec(request *pb.ExecRequest) (Frame, error)
}

Client interface

type Column

type Column interface {
	Len() int                                 // Number of elements
	Name() string                             // Column name
	DType() DType                             // Data type (e.g. IntType, FloatType ...)
	Ints() ([]int64, error)                   // Data as []int64
	IntAt(i int) (int64, error)               // Int value at index i
	Floats() ([]float64, error)               // Data as []float64
	FloatAt(i int) (float64, error)           // Float value at index i
	Strings() []string                        // Data as []string
	StringAt(i int) (string, error)           // String value at index i
	Times() ([]time.Time, error)              // Data as []time.Time
	TimeAt(i int) (time.Time, error)          // time.Time value at index i
	Bools() ([]bool, error)                   // Data as []bool
	BoolAt(i int) (bool, error)               // bool value at index i
	Slice(start int, end int) (Column, error) // Slice of data
	CopyWithName(newName string) Column       // Create a copy of the current column
}

Column is a data column

func NewLabelColumn

func NewLabelColumn(name string, value interface{}, size int) (Column, error)

NewLabelColumn returns a new slabel column

func NewSliceColumn

func NewSliceColumn(name string, data interface{}) (Column, error)

NewSliceColumn returns a new slice column

type ColumnBuilder

type ColumnBuilder interface {
	Append(value interface{}) error
	At(index int) (interface{}, error)
	Set(index int, value interface{}) error
	Delete(index int) error
	Finish() Column
}

ColumnBuilder is interface for building columns

func NewLabelColumnBuilder

func NewLabelColumnBuilder(name string, dtype DType, size int) ColumnBuilder

NewLabelColumnBuilder return a builder for LabelColumn

func NewSliceColumnBuilder

func NewSliceColumnBuilder(name string, dtype DType, size int) ColumnBuilder

NewSliceColumnBuilder return a builder for SliceColumn

type Config

type Config struct {
	Log            LogConfig `json:"log"`
	DefaultLimit   int       `json:"limit,omitempty"`
	DefaultTimeout int       `json:"timeout,omitempty"`

	// default V3IO connection details
	WebAPIEndpoint string `json:"webApiEndpoint"`
	Container      string `json:"container"`
	Username       string `json:"username,omitempty"`
	Password       string `json:"password,omitempty"`
	SessionKey     string `json:"sessionKey,omitempty"`

	// Number of parallel V3IO worker routines
	Workers int `json:"workers"`

	Backends []*BackendConfig `json:"backends,omitempty"`
}

Config is server configuration

func (*Config) InitDefaults

func (c *Config) InitDefaults() error

InitDefaults initializes the defaults for configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type CreateRequest

type CreateRequest struct {
	Proto    *pb.CreateRequest
	Password SecretString
	Token    SecretString
}

CreateRequest is a table creation request

type DType

type DType pb.DType

DType is data type

type DataBackend

type DataBackend interface {
	// TODO: Expose name, type, config ... ?
	Read(request *ReadRequest) (FrameIterator, error)
	Write(request *WriteRequest) (FrameAppender, error) // TODO: use Appender for write streaming
	Create(request *CreateRequest) error
	Delete(request *DeleteRequest) error
	Exec(request *ExecRequest) (Frame, error)
}

DataBackend is an interface for read/write on backend

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder is message decoder

func NewDecoder

func NewDecoder(r io.Reader) *Decoder

NewDecoder returns a new Decoder

func (*Decoder) Decode

func (d *Decoder) Decode(msg proto.Message) error

Decode decodes message from d.r

type DeleteRequest

type DeleteRequest struct {
	Proto    *pb.DeleteRequest
	Password SecretString
	Token    SecretString
}

DeleteRequest is a deletion request

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder is message encoder

func NewEncoder

func NewEncoder(w io.Writer) *Encoder

NewEncoder returns new Encoder

func (*Encoder) Encode

func (e *Encoder) Encode(msg proto.Message) error

Encode encoders the message to e.w

type ExecRequest

type ExecRequest struct {
	Proto    *pb.ExecRequest
	Password SecretString
	Token    SecretString
}

ExecRequest is execution request

type Frame

type Frame interface {
	Labels() map[string]interface{}          // Label set
	Names() []string                         // Column names
	Indices() []Column                       // Index columns
	Len() int                                // Number of rows
	Column(name string) (Column, error)      // Column by name
	Slice(start int, end int) (Frame, error) // Slice of Frame
	IterRows(includeIndex bool) RowIterator  // Iterate over rows
}

Frame is a collection of columns

func NewFrame

func NewFrame(columns []Column, indices []Column, labels map[string]interface{}) (Frame, error)

NewFrame returns a new Frame

func NewFrameFromMap

func NewFrameFromMap(columns map[string]interface{}, indices map[string]interface{}) (Frame, error)

NewFrameFromMap returns a new MapFrame from a map

func NewFrameFromProto

func NewFrameFromProto(msg *pb.Frame) Frame

NewFrameFromProto return a new frame from protobuf message

func NewFrameFromRows

func NewFrameFromRows(rows []map[string]interface{}, indices []string, labels map[string]interface{}) (Frame, error)

NewFrameFromRows creates a new frame from rows

func UnmarshalFrame

func UnmarshalFrame(data []byte) (Frame, error)

UnmarshalFrame de-serialize a frame from []byte

type FrameAppender

type FrameAppender interface {
	Add(frame Frame) error
	WaitForComplete(timeout time.Duration) error
}

FrameAppender appends frames

type FrameIterator

type FrameIterator interface {
	Next() bool
	Err() error
	At() Frame
}

FrameIterator iterates over frames

type JoinStruct

type JoinStruct = pb.JoinStruct

JoinStruct is a join structure

type LogConfig

type LogConfig struct {
	Level string `json:"level,omitempty"`
}

LogConfig is the logging configuration

type Query

type Query struct {
	Table   string
	Columns []string
	Filter  string
	GroupBy string
}

Query is query structure

func ParseSQL

func ParseSQL(sql string) (*Query, error)

ParseSQL parsers SQL query to a Query struct

type ReadRequest

type ReadRequest struct {
	Proto    *pb.ReadRequest
	Password SecretString
	Token    SecretString
}

ReadRequest is a read/query request

type RowIterator

type RowIterator interface {
	Next() bool                      // Advance to next row
	Row() map[string]interface{}     // Row as map of name->value
	RowNum() int                     // Current row number
	Indices() map[string]interface{} // MultiIndex as name->value
	Err() error                      // Iteration error
}

RowIterator is an iterator over frame rows

type SchemaField

type SchemaField = pb.SchemaField

SchemaField represents a schema field for Avro record.

type SchemaKey

type SchemaKey = pb.SchemaKey

SchemaKey is a schema key

type SecretString

type SecretString struct {
	// contains filtered or unexported fields
}

Hides a string such as a password from both plain and json logs.

func InitSecretString

func InitSecretString(s string) SecretString

func (SecretString) Get

func (s SecretString) Get() string

type Server

type Server interface {
	Start() error
	State() ServerState
	Err() error
}

Server is frames server interface

type ServerBase

type ServerBase struct {
	// contains filtered or unexported fields
}

ServerBase have common functionality for server

func NewServerBase

func NewServerBase() *ServerBase

NewServerBase returns a new server base

func (*ServerBase) Err

func (s *ServerBase) Err() error

Err returns the server error

func (*ServerBase) SetError

func (s *ServerBase) SetError(err error)

SetError sets current error and will change state to ErrorState

func (*ServerBase) SetState

func (s *ServerBase) SetState(state ServerState)

SetState sets the server state

func (*ServerBase) State

func (s *ServerBase) State() ServerState

State return the server state

type ServerState

type ServerState string

ServerState is state of server

const (
	ReadyState   ServerState = "ready"
	RunningState ServerState = "running"
	ErrorState   ServerState = "error"
)

Possible server states

type Session

type Session = pb.Session

Session information

func InitSessionDefaults

func InitSessionDefaults(session *Session, framesConfig *Config) *Session

InitSessionDefaults initializes session defaults

func NewSession

func NewSession(url, container, path, user, password, token, id string) (*Session, error)

NewSession will create a new session. It will populate missing values from the V3IO_SESSION environment variable (JSON encoded)

type TableSchema

type TableSchema = pb.TableSchema

TableSchema is a table schema

type WriteRequest

type WriteRequest struct {
	Session  *Session
	Password SecretString
	Token    SecretString
	Backend  string // backend name
	Table    string // Table name (path)
	// Data message sent with the write request (in case of a stream multiple messages can follow)
	ImmidiateData Frame
	// Expression template, for update expressions generated from combining columns data with expression
	Expression string
	// Condition template, for update conditions generated from combining columns data with expression
	Condition string
	// Will we get more message chunks (in a stream), if not we can complete
	HaveMore bool
}

WriteRequest is request for writing data TODO: Unite with probouf (currenly the protobuf message combines both this and a frame message)

Directories

Path Synopsis
csv
kv
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL