catalog

package
v0.0.0-...-aecf591 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: Apache-2.0 Imports: 13 Imported by: 1

README

Catalog Implementations

Integration Testing

The Catalog implementations can be manually tested using the CLI implemented in the cmd/iceberg folder.

REST Catalog

To test the REST catalog implementation, we have a docker configuration for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the dev/ folder and running docker-compose up. You can then follow the steps of the Iceberg Quickstart tutorial, which we've summarized below.

Setup your Iceberg catalog

First launch a pyspark console by running:

docker exec -it spark-iceberg pyspark

Once in the pyspark shell, we create a simple table with a namespace of "demo.nyc" called "taxis":

from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()

Finally, we write another data-frame to the table to add new files:

schema = spark.table("demo.nyc.taxis").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
Testing with the CLI

Now that we have a table in the catalog which is running. You can use the CLI which is implemented in the cmd/iceberg folder. You will need to set the following environment variables (which can also be found in the docker-compose.yml):

AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password

With those environment variables set you can now run the CLI:

$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs  |
| ---- |
| demo |
└──────┘

You can retrieve the schema of the table:

$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string

You can get the file list:

$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
  └─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    └──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet

and so on, for the various options available in the CLI.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoSuchTable is returned when a table does not exist in the catalog.
	ErrNoSuchTable            = errors.New("table does not exist")
	ErrNoSuchNamespace        = errors.New("namespace does not exist")
	ErrNamespaceAlreadyExists = errors.New("namespace already exists")
)
View Source
var (
	ErrorTableNotFound = fmt.Errorf("table not found")
)

Functions

func NamespaceFromIdent

func NamespaceFromIdent(ident table.Identifier) table.Identifier

func NewIcebucket

func NewIcebucket(warehouseURI string, bucket objstore.Bucket) *icebucket

NewIcebucket creates a new icebucket with the given prefix and bucket. The warehouseURI is used to strip the full path of the data warehouse from the object name.

func TableNameFromIdent

func TableNameFromIdent(ident table.Identifier) string

Types

type Catalog

type Catalog interface {
	// CatalogType returns the type of the catalog.
	CatalogType() CatalogType

	// ListTables returns a list of table identifiers in the catalog, with the returned
	// identifiers containing the information required to load the table via that catalog.
	ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
	// LoadTable loads a table from the catalog and returns a Table with the metadata.
	LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (table.Table, error)
	// DropTable tells the catalog to drop the table entirely
	DropTable(ctx context.Context, identifier table.Identifier) error
	// RenameTable tells the catalog to rename a given table by the identifiers
	// provided, and then loads and returns the destination table
	RenameTable(ctx context.Context, from, to table.Identifier) (table.Table, error)
	// ListNamespaces returns the list of available namespaces, optionally filtering by a
	// parent namespace
	ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
	// CreateNamespace tells the catalog to create a new namespace with the given properties
	CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
	// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
	DropNamespace(ctx context.Context, namespace table.Identifier) error
	// LoadNamespaceProperties returns the current properties in the catalog for
	// a given namespace
	LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
	// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
	UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
		removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)

	// CreateTable tells the catalog to create a new table with the given schema and properties
	CreateTable(ctx context.Context, location string, schema *iceberg.Schema, props iceberg.Properties, options ...TableOption) (table.Table, error)
}

Catalog for iceberg table operations like create, drop, load, list and others.

func NewHDFS

func NewHDFS(uri string, bucket objstore.Bucket) Catalog

type CatalogType

type CatalogType string
const (
	REST     CatalogType = "rest"
	Hive     CatalogType = "hive"
	Glue     CatalogType = "glue"
	DynamoDB CatalogType = "dynamodb"
	SQL      CatalogType = "sql"
	Hadoop   CatalogType = "hadoop"
)

type PropertiesUpdateSummary

type PropertiesUpdateSummary struct {
	Removed []string `json:"removed"`
	Updated []string `json:"updated"`
	Missing []string `json:"missing"`
}

type TableOption

type TableOption func(*tableOptions)

func WithPartitionSpec

func WithPartitionSpec(spec iceberg.PartitionSpec) TableOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL