files

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2022 License: MIT Imports: 20 Imported by: 0

README

File Data Source

Turn files into a SQL Queryable data source.

Allows Cloud storage (Google Storage, S3, etc) files (csv, json, custom-protobuf) to be queried with traditional sql. Also allows these files to have custom serializations, compressions, encryptions.

Design Goal

  • Hackable Stores easy to add to Google Storage, local files, s3, etc.
  • Hackable File Formats Protbuf files, WAL files, mysql-bin-log's etc.

Developing new stores or file formats

  • FileStore defines file storage Factory (s3, google-storage, local files, sftp, etc)
  • StoreReader a configured, initilized instance of a specific FileStore.
    File storage reader(writer) for finding lists of files, and opening files.
  • FileHandler FileHandlers handle processing files from StoreReader, developers Register FileHandler implementations in Registry. FileHandlers will create FileScanner that iterates rows of this file.
  • FileScanner File Row Reading, how to transform contents of file into qlbridge.Message for use in query engine. Currently CSV, Json types.

Example: Query CSV Files

We are going to create a CSV database of Baseball data from http://seanlahman.com/baseball-archive/statistics/

# download files to local /tmp
mkdir -p /tmp/baseball
cd /tmp/baseball
curl -Ls http://seanlahman.com/files/database/baseballdatabank-2017.1.zip > bball.zip
unzip bball.zip

mv baseball*/core/*.csv .
rm bball.zip
rm -rf baseballdatabank-*

# run a docker container locally
docker run -e "LOGGING=debug" --rm -it -p 4000:4000 \
  -v /tmp/baseball:/tmp/baseball \
  gcr.io/dataux-io/dataux:latest


In another Console open Mysql:

# connect to the docker container you just started
mysql -h 127.0.0.1 -P4000


-- Now create a new Source
CREATE source baseball WITH {
  "type":"cloudstore", 
  "schema":"baseball", 
  "settings" : {
     "type": "localfs",
     "format": "csv",
     "path": "baseball/",
     "localpath": "/tmp"
  }
};

show databases;

use baseball;

show tables;

describe appearances

select count(*) from appearances;

select * from appearances limit 10;


TODO

Documentation

Overview

Package files is a cloud (gcs, s3) and local file datasource that translates json, csv, files into appropriate interface for qlbridge DataSource so we can run queries. Provides FileHandler interface to allow custom file type handling

Package files implements Cloud Files logic for getting, reading, and converting files into databases. It reads cloud(or local) files, gets lists of tables, and can scan through them using distributed query engine.

Index

Constants

View Source
const (
	// SourceType is the registered Source name in the qlbridge source registry
	SourceType = "cloudstore"
)

Variables

View Source
var (

	// Default file queue size to buffer by pager
	FileBufferSize = 5
)
View Source
var (
	// FileColumns are the default columns for the "file" table
	FileColumns = []string{"file", "table", "path", "partialpath", "size", "partition", "updated", "deleted", "filetype"}
)

Functions

func FileStoreLoader

func FileStoreLoader(ss *schema.Schema) (cloudstorage.StoreReader, error)

FileStoreLoader defines the interface for loading files

func RegisterFileHandler

func RegisterFileHandler(scannerType string, fh FileHandler)

RegisterFileHandler Register a FileHandler available by the provided @scannerType

func RegisterFileStore

func RegisterFileStore(storeType string, fs FileStoreCreator)

RegisterFileStore global registry for Registering implementations of FileStore factories of the provided @storeType

func SipPartitioner

func SipPartitioner(partitionCt uint64, fi *FileInfo) int

func TableFromFileAndPath

func TableFromFileAndPath(path, fileIn string) string

Find table name from full path of file, and the path of tables.

There are different "table" naming conventions we use to find table names.

  1. Support multiple partitioned files in folder which is name of table rootpath/tables/nameoftable/nameoftable1.csv rootpath/tables/nameoftable/nameoftable2.csv
  1. Suport Table as name of file inside folder rootpath/users.csv rootpath/accounts.csv

Types

type FileHandler

type FileHandler interface {
	Init(store FileStore, ss *schema.Schema) error
	// File Each time the underlying FileStore finds a new file it hands it off
	// to filehandler to determine if it is File or not (directory?), and to to extract any
	// metadata such as partition, and parse out fields that may exist in File/Folder path
	File(path string, obj cloudstorage.Object) *FileInfo
	// Create a scanner for particiular file
	Scanner(store cloudstorage.StoreReader, fr *FileReader) (schema.ConnScanner, error)
	// FileAppendColumns provides a method that this file-handler is going to provide additional
	// columns to the files list table, ie we are going to extract column info from the
	// folder paths, file-names.
	// For example:   `tables/appearances/2017/appearances.csv may extract the "2017" as "year"
	// and append that as column to all rows.
	// Optional:  may be nil
	FileAppendColumns() []string
}

FileHandler defines an interface for developers to build new File processing to allow these custom file-types to be queried with SQL.

Features of Filehandler

  • File() Converts a cloudstorage object to a FileInfo that describes the File.
  • Scanner() create a file-scanner, will allow the scanner to implement any custom file-type reading (csv, protobuf, json, enrcyption).

The File Reading, Opening, Listing is a separate layer, see FileSource for the Cloudstorage layer.

So it is a a factory to create Scanners for a speciffic format type such as csv, json

func NewJsonHandler

func NewJsonHandler(lh datasource.FileLineHandler) FileHandler

NewJsonHandler creates a json file handler for paging new-line delimited rows of json file

func NewJsonHandlerTables

func NewJsonHandlerTables(lh datasource.FileLineHandler, tables []string) FileHandler

NewJsonHandler creates a json file handler for paging new-line delimited rows of json file

type FileHandlerSchema

type FileHandlerSchema interface {
	FileHandler
	schema.SourceTableSchema
}

FileHandlerSchema - file handlers may optionally provide info about tables contained

type FileHandlerTables

type FileHandlerTables interface {
	FileHandler
	Tables() []*FileTable
}

FileHandlerTables - file handlers may optionally provide info about tables contained in store

type FileInfo

type FileInfo struct {
	Path        string         // Root path
	Name        string         // Name/Path of file
	PartialPath string         // non-file-name part of path
	Table       string         // Table name this file participates in
	FileType    string         // csv, json, etc
	Partition   int            // which partition
	Size        int            // Content-Length size in bytes
	AppendCols  []driver.Value // Additional Column info extracted from file name/folder path
	// contains filtered or unexported fields
}

FileInfo describes a single file Say a folder of "./tables" is the "root path" specified then say it has folders for "table names" underneath a redundant "tables" ./tables/

/tables/
       /appearances/appearances1.csv
       /players/players1.csv

Name = "tables/appearances/appearances1.csv" Table = "appearances" PartialPath = tables/appearances

func FileInfoFromCloudObject

func FileInfoFromCloudObject(path string, obj cloudstorage.Object) *FileInfo

Convert a cloudstorage object to a File. Interpret the table name for given full file path.

func (*FileInfo) String

func (m *FileInfo) String() string

func (*FileInfo) Values

func (m *FileInfo) Values() []driver.Value

Values as as slice, create a row of values describing this file for use in sql listing of files

type FilePager

type FilePager struct {
	Limit int

	schema.ConnScanner
	// contains filtered or unexported fields
}

FilePager acts like a Partitionied Data Source Conn, wrapping underlying FileSource and paging through list of files and only scanning those that match this pagers partition - by default the partitionct is -1 which means no partitioning

func NewFilePager

func NewFilePager(tableName string, fs *FileSource) *FilePager

NewFilePager creates default new FilePager

func (*FilePager) Close

func (m *FilePager) Close() error

Close this connection/pager

func (*FilePager) Columns

func (m *FilePager) Columns() []string

Columns part of Conn interface for providing columns for this table/conn

func (*FilePager) Next

func (m *FilePager) Next() schema.Message

Next iterator for next message, wraps the file Scanner, Next file abstractions

func (*FilePager) NextFile

func (m *FilePager) NextFile() (*FileReader, error)

NextFile gets next file

func (*FilePager) NextScanner

func (m *FilePager) NextScanner() (schema.ConnScanner, error)

NextScanner provides the next scanner assuming that each scanner represents different file, and multiple files for single source

func (*FilePager) RunFetcher

func (m *FilePager) RunFetcher()

func (*FilePager) WalkExecSource

func (m *FilePager) WalkExecSource(p *plan.Source) (exec.Task, error)

WalkExecSource Provide ability to implement a source plan for execution

type FileReader

type FileReader struct {
	*FileInfo
	F    io.ReadCloser // Actual file reader
	Exit chan bool     // exit channel to shutdown reader
}

FileReader file info and access to file to supply to ScannerMakers

type FileReaderIterator

type FileReaderIterator interface {
	// NextFile returns io.EOF on last file
	NextFile() (*FileReader, error)
}

FileReaderIterator defines a file source that can page through files getting next file from partition

type FileSource

type FileSource struct {
	Partitioner string // random, ??  (date, keyed?)
	// contains filtered or unexported fields
}

FileSource Source for reading files, and scanning them allowing the contents to be treated as a database, like doing a full table scan in mysql. But, you can partition across files.

  • readers: gcs, local-fs
  • tablesource: translate lists of files into tables. Normally we would have multiple files per table (ie partitioned, per-day, etc)
  • scanners: responsible for file-specific
  • files table: a "table" of all the files from this cloud source

func NewFileSource

func NewFileSource() *FileSource

NewFileSource provides a singleton manager for a particular Source Schema, and File-Handler to read/manage all files from a source such as gcs folder x, s3 folder y

func (*FileSource) Close

func (m *FileSource) Close() error

Close this File Source manager

func (*FileSource) File

func (m *FileSource) File(o cloudstorage.Object) *FileInfo

func (*FileSource) Init

func (m *FileSource) Init()

func (*FileSource) Open

func (m *FileSource) Open(tableName string) (schema.Conn, error)

Open a connection to given table, partition of Source interface

func (*FileSource) Setup

func (m *FileSource) Setup(ss *schema.Schema) error

Setup the filesource with schema info

func (*FileSource) Table

func (m *FileSource) Table(tableName string) (*schema.Table, error)

Table satisfys SourceSchema interface to get table schema for given table

func (*FileSource) Tables

func (m *FileSource) Tables() []string

Tables for this file-source

type FileStore

type FileStore interface {
	cloudstorage.StoreReader
}

FileStore Defines handler for reading Files, understanding folders and how to create scanners/formatters for files. Created by FileStoreCreator

FileStoreCreator(schema) -> FileStore

FileStore.Objects() -> File
         FileHandler(File) -> FileScanner
              FileScanner.Next() ->  Row

type FileStoreCreator

type FileStoreCreator func(*schema.Schema) (FileStore, error)

FileStoreCreator defines a Factory type for creating FileStore

type FileTable

type FileTable struct {
	Table       string
	PartialPath string
	FileCount   int
}

type Partitioner

type Partitioner func(uint64, *FileInfo) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL