delta

package
v0.0.0-rc13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

README

Delta Provider

The Delta Provider is a Snapshot Provider for S3-compatible storages that handle Delta Lake data (see https://delta.io/ for details).

The implementation of the Delta read protocol is based on two canonical implementations:

  1. Java standalone binary - https://docs.delta.io/latest/delta-standalone.html.
  2. Rust implementation - https://github.com/delta-io/delta-rs

The standalone binary contains "Golden" Delta Lake datasets with all combinations of data, which can be found here. To verify that everything works correctly, tests have been written using this "Golden" dataset stored in a sandbox.

Apart from the main provider and storage code, the implementation is divided into several sub-packages:

Types

Contains type definitions for Delta Lake tables, which are inherited from Parquet. See the full list here: https://docs.databricks.com/sql/language-manual/sql-ref-datatypes.html

Actions

Stores models of known Delta protocol log messages. Each message is stored as a one-of JSON row.

Store

Provides an abstraction layer for the actual log directory storage. This interface is designed to be similar to its Java counterpart, which can be found here: https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java. It is implemented with two storage options: local file system and S3-compatible storage.

Protocol

Contains the main abstractions to work with Delta Lake:

  • Table: Represents an actual table with a schema and multiple versions.
  • Snapshot: Represents a specific version of a table, built from a list of actual Parquet files.
  • TableLog: Represents all the table events, which can be used to calculate a snapshot for a specific version or timestamp.
  • LogSegment: Represents a single event related to data.

Workflow

The workflow for reading a Delta folder is as follows:

  1. Check if the folder is a Delta folder (i.e., it has a _delta_log subdirectory).
  2. List the contents of the _delta_log directory, with each file representing one version.
  3. Read each file line by line in the _delta_log directory. Each line represents an Action event.
  4. Replay the Action events to collect the remaining files in the table. Some events may add or remove files.
  5. Read all the files that make up the table, as each file is an actual Parquet file with data related to the table.

Documentation

Index

Constants

View Source
const ProviderType = abstract.ProviderType("delta")

Variables

This section is empty.

Functions

This section is empty.

Types

type DeltaSource

type DeltaSource struct {
	Bucket           string
	AccessKey        string
	S3ForcePathStyle bool
	SecretKey        model.SecretString
	PathPrefix       string
	Endpoint         string
	UseSSL           bool
	VersifySSL       bool
	Region           string

	HideSystemCols bool // to hide system cols `__delta_file_name` and `__delta_row_index` cols from out struct

	// delta lake hold always single table, and TableID of such table defined by user
	TableName      string
	TableNamespace string
}

func (*DeltaSource) ConnectionConfig

func (d *DeltaSource) ConnectionConfig() s3_provider.ConnectionConfig

func (*DeltaSource) GetProviderType

func (d *DeltaSource) GetProviderType() abstract.ProviderType

func (*DeltaSource) IsSource

func (d *DeltaSource) IsSource()

func (*DeltaSource) Validate

func (d *DeltaSource) Validate() error

func (*DeltaSource) WithDefaults

func (d *DeltaSource) WithDefaults()

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (Provider) Storage

func (p Provider) Storage() (abstract.Storage, error)

func (Provider) Type

func (p Provider) Type() abstract.ProviderType

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(cfg *DeltaSource, lgr log.Logger, registry metrics.Registry) (*Storage, error)

func (*Storage) BeginSnapshot

func (s *Storage) BeginSnapshot(_ context.Context) error

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EndSnapshot

func (s *Storage) EndSnapshot(_ context.Context) error

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(_ abstract.TableID) (uint64, error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, abstractPusher abstract.Pusher) error

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) SetShardingContext

func (s *Storage) SetShardingContext(shardedState []byte) error

func (*Storage) ShardTable

func (*Storage) ShardingContext

func (s *Storage) ShardingContext() ([]byte, error)

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL