catalog

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

README

Catalog Implementations

Integration Testing

The Catalog implementations can be manually tested using the CLI implemented in the cmd/iceberg folder.

REST Catalog

To test the REST catalog implementation, we have a docker configuration for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the dev/ folder and running docker-compose up. You can then follow the steps of the Iceberg Quickstart tutorial, which we've summarized below.

Setup your Iceberg catalog

First launch a pyspark console by running:

docker exec -it spark-iceberg pyspark

Once in the pyspark shell, we create a simple table with a namespace of "demo.nyc" called "taxis":

from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()

Finally, we write another data-frame to the table to add new files:

schema = spark.table("demo.nyc.taxis").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
Testing with the CLI

Now that we have a table in the catalog which is running. You can use the CLI which is implemented in the cmd/iceberg folder. You will need to set the following environment variables (which can also be found in the docker-compose.yml):

AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password

With those environment variables set you can now run the CLI:

$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs  |
| ---- |
| demo |
└──────┘

You can retrieve the schema of the table:

$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string

You can get the file list:

$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
  └─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    ├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
    └──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet

and so on, for the various options available in the CLI.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoSuchTable is returned when a table does not exist in the catalog.
	ErrNoSuchTable            = errors.New("table does not exist")
	ErrNoSuchNamespace        = errors.New("namespace does not exist")
	ErrNamespaceAlreadyExists = errors.New("namespace already exists")
)
View Source
var (
	ErrRESTError            = errors.New("REST error")
	ErrBadRequest           = fmt.Errorf("%w: bad request", ErrRESTError)
	ErrForbidden            = fmt.Errorf("%w: forbidden", ErrRESTError)
	ErrUnauthorized         = fmt.Errorf("%w: unauthorized", ErrRESTError)
	ErrAuthorizationExpired = fmt.Errorf("%w: authorization expired", ErrRESTError)
	ErrServiceUnavailable   = fmt.Errorf("%w: service unavailable", ErrRESTError)
	ErrServerError          = fmt.Errorf("%w: server error", ErrRESTError)
	ErrCommitFailed         = fmt.Errorf("%w: commit failed, refresh and try again", ErrRESTError)
	ErrCommitStateUnknown   = fmt.Errorf("%w: commit failed due to unknown reason", ErrRESTError)
	ErrOAuthError           = fmt.Errorf("%w: oauth error", ErrRESTError)
)

Functions

func GlueDatabaseIdentifier

func GlueDatabaseIdentifier(database string) table.Identifier

GlueDatabaseIdentifier returns a database identifier for a Glue database in the format [database].

func GlueTableIdentifier

func GlueTableIdentifier(database string, tableName string) table.Identifier

GlueTableIdentifier returns a glue table identifier for an Iceberg table in the format [database, table].

func NamespaceFromIdent

func NamespaceFromIdent(ident table.Identifier) table.Identifier

func TableNameFromIdent

func TableNameFromIdent(ident table.Identifier) string

func ToRestIdentifier

func ToRestIdentifier(ident ...string) table.Identifier

Types

type AwsProperties

type AwsProperties map[string]string

type Catalog

type Catalog interface {
	// CatalogType returns the type of the catalog.
	CatalogType() CatalogType

	// ListTables returns a list of table identifiers in the catalog, with the returned
	// identifiers containing the information required to load the table via that catalog.
	ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
	// LoadTable loads a table from the catalog and returns a Table with the metadata.
	LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
	// DropTable tells the catalog to drop the table entirely
	DropTable(ctx context.Context, identifier table.Identifier) error
	// RenameTable tells the catalog to rename a given table by the identifiers
	// provided, and then loads and returns the destination table
	RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
	// ListNamespaces returns the list of available namespaces, optionally filtering by a
	// parent namespace
	ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
	// CreateNamespace tells the catalog to create a new namespace with the given properties
	CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
	// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
	DropNamespace(ctx context.Context, namespace table.Identifier) error
	// LoadNamespaceProperties returns the current properties in the catalog for
	// a given namespace
	LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
	// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
	UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
		removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}

Catalog for iceberg table operations like create, drop, load, list and others.

type CatalogType

type CatalogType string
const (
	REST     CatalogType = "rest"
	Hive     CatalogType = "hive"
	Glue     CatalogType = "glue"
	DynamoDB CatalogType = "dynamodb"
	SQL      CatalogType = "sql"
)

type GlueCatalog

type GlueCatalog struct {
	// contains filtered or unexported fields
}

func NewGlueCatalog

func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog

NewGlueCatalog creates a new instance of GlueCatalog with the given options.

func (*GlueCatalog) CatalogType

func (c *GlueCatalog) CatalogType() CatalogType

func (*GlueCatalog) CreateNamespace

func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error

CreateNamespace creates a new Iceberg namespace in the Glue catalog.

func (*GlueCatalog) DropNamespace

func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error

DropNamespace deletes an Iceberg namespace from the Glue catalog.

func (*GlueCatalog) DropTable

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error

DropTable deletes an Iceberg table from the Glue catalog.

func (*GlueCatalog) ListNamespaces

func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)

ListNamespaces returns a list of Iceberg namespaces from the given Glue catalog.

func (*GlueCatalog) ListTables

func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)

ListTables returns a list of Iceberg tables in the given Glue database.

The namespace should just contain the Glue database name.

func (*GlueCatalog) LoadNamespaceProperties

func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)

LoadNamespaceProperties loads the properties of an Iceberg namespace from the Glue catalog.

func (*GlueCatalog) LoadTable

func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)

LoadTable loads a table from the catalog table details.

The identifier should contain the Glue database name, then Glue table name.

func (*GlueCatalog) RenameTable

func (c *GlueCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)

RenameTable renames an Iceberg table in the Glue catalog.

func (*GlueCatalog) UpdateNamespaceProperties

func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
	removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)

UpdateNamespaceProperties updates the properties of an Iceberg namespace in the Glue catalog. The removals list contains the keys to remove, and the updates map contains the keys and values to update.

type Option

type Option[T GlueCatalog | RestCatalog] func(*options)

func WithAuthURI

func WithAuthURI(uri *url.URL) Option[RestCatalog]

func WithAwsConfig

func WithAwsConfig(cfg aws.Config) Option[GlueCatalog]

WithAwsConfig sets the AWS configuration for the catalog.

func WithAwsProperties

func WithAwsProperties(props AwsProperties) Option[GlueCatalog]

func WithCredential

func WithCredential(cred string) Option[RestCatalog]

func WithMetadataLocation

func WithMetadataLocation(loc string) Option[RestCatalog]

func WithOAuthToken

func WithOAuthToken(token string) Option[RestCatalog]

func WithPrefix

func WithPrefix(prefix string) Option[RestCatalog]

func WithSigV4

func WithSigV4() Option[RestCatalog]

func WithSigV4RegionSvc

func WithSigV4RegionSvc(region, service string) Option[RestCatalog]

func WithTLSConfig

func WithTLSConfig(config *tls.Config) Option[RestCatalog]

func WithWarehouseLocation

func WithWarehouseLocation(loc string) Option[RestCatalog]

type PropertiesUpdateSummary

type PropertiesUpdateSummary struct {
	Removed []string `json:"removed"`
	Updated []string `json:"updated"`
	Missing []string `json:"missing"`
}

type RestCatalog

type RestCatalog struct {
	// contains filtered or unexported fields
}

func NewRestCatalog

func NewRestCatalog(name, uri string, opts ...Option[RestCatalog]) (*RestCatalog, error)

func (*RestCatalog) CatalogType

func (r *RestCatalog) CatalogType() CatalogType

func (*RestCatalog) CreateNamespace

func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error

func (*RestCatalog) DropNamespace

func (r *RestCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error

func (*RestCatalog) DropTable

func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier) error

func (*RestCatalog) ListNamespaces

func (r *RestCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)

func (*RestCatalog) ListTables

func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)

func (*RestCatalog) LoadNamespaceProperties

func (r *RestCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)

func (*RestCatalog) LoadTable

func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)

func (*RestCatalog) RenameTable

func (r *RestCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)

func (*RestCatalog) UpdateNamespaceProperties

func (r *RestCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
	removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL