gbigquery

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TableUpdateBackoffTime         = 8 * time.Second
	DefaultBigQueryDatasetLocation = "EU"
)

Variables

View Source
var (
	ErrMissingTableSpec  = errors.New("table spec cannot be empty")
	ErrMissingTableName  = errors.New("table name cannot be empty")
	ErrMissingDataset    = errors.New("dataset name cannot be empty")
	ErrMissingColumnSpec = errors.New("column spec cannot be empty")
)

Functions

func NewBigQueryClient

func NewBigQueryClient(c entity.Config, client *bigquery.Client) *defaultBigQueryClient

NewBigQueryClient provides a concrete wrapper client for internal usage by the Loader

Types

type BigQueryClient

type BigQueryClient interface {
	GetDatasetMetadata(ctx context.Context, dataset *bigquery.Dataset) (*bigquery.DatasetMetadata, DatasetTableStatus, error)
	CreateDatasetRef(datasetId string) *bigquery.Dataset
	CreateDataset(ctx context.Context, id string, md *bigquery.DatasetMetadata) error
	GetTableMetadata(ctx context.Context, table *bigquery.Table) (*bigquery.TableMetadata, DatasetTableStatus, error)
	CreateTableRef(datasetId string, tableId string) *bigquery.Table
	CreateTable(ctx context.Context, datasetId string, tableId string, tm *bigquery.TableMetadata) (*bigquery.Table, error)
	GetTableInserter(table *bigquery.Table) BigQueryInserter
	UpdateTable(ctx context.Context, table *bigquery.Table, tm bigquery.TableMetadataToUpdate, etag string) (*bigquery.TableMetadata, error)
	Close() error
}

type BigQueryInserter

type BigQueryInserter interface {
	Put(ctx context.Context, src any) error
}

type Column added in v0.4.0

type Column struct {
	// Name of the column as specified at spec registration time.
	// One of Name or NameFromId needs to be present in the column spec.
	Name string `json:"name"`

	// If NameFromId is non-nil columns will be generated dynamically based on transformation output.
	// The name of the column will be set to the value in the Transformed map, with the key as found in NameFromId.
	// Note that the field fetched from the event, to be the column name, need to be of string type.
	NameFromId *NameFromId `json:"nameFromId,omitempty"`

	// Mode uses the definitions as set by BigQuery with "NULLABLE", "REQUIRED" or "REPEATED"
	Mode string `json:"mode"`

	// Type uses the BigQuery Standard SQL types.
	// The type here needs to match the one used in the Transform extract field spec.
	// For date/time/timestamp types the type used in the Transform extract field spec needs to be set to
	// "isoTimestamp" or "unixTimestamp".
	Type string `json:"type"`

	Description string   `json:"description"`
	Fields      []Column `json:"fields"` // For nested columns

	// ValueFromId is not part of schema definition per se, but specifies what value from the incoming
	// transformed data that should be inserted here.
	// A special value can be set to have a column with GEIST ingestion time, which could be used together
	// with TimePartitioning config, as an alternative to the also available default BQ insert partitioning.
	// To enable this, the field should be set to "@geistIngestionTime", with column type set to "TIMESTAMP"
	// and mode set to "NULLABLE".
	ValueFromId string `json:"valueFromId"`
}

Columns specifies how to map transformed fields to rows/columns, etc.

type Columns

type Columns map[string]Column

type Config

type Config struct {

	// ProjectId (required) specifies GCP project ID for this deployment.
	ProjectId string

	// Client (optional) enables fully customized clients to be used, e.g., for unit
	// testing.
	Client BigQueryClient

	// Creds (optional) can be used to override the default authentication method (using
	// GOOGLE_APPLICATION_CREDENTIALS) by providing externally created credentials.
	Creds *google.Credentials
}

Config contains connector config parameters that are not available for providing in the stream specs.

A general constraint with BigQuery is to have each microbatch of transformed events (to be inserted as rows) be below 5MB to avoid BigQuery http request size limit of 10MB for streaming inserts.

type DatasetCreation added in v0.4.0

type DatasetCreation struct {
	Description string `json:"description"`

	// Geo location of dataset.
	// Valid values are:
	// EU
	// europe
	// US
	// plus all regional ones as described here: https://cloud.google.com/bigquery/docs/locations
	// If omitted or empty the default location will be set to EU.
	Location string `json:"location"`
}

DatasetCreation config contains table creation details.

type DatasetTableStatus

type DatasetTableStatus int
const (
	Unknown DatasetTableStatus = iota
	Existent
	NonExistent
)

type LoaderFactory

type LoaderFactory struct {
	// contains filtered or unexported fields
}

func NewLoaderFactory

func NewLoaderFactory(ctx context.Context, config Config) (*LoaderFactory, error)

NewLoaderFactory creates a new BigQuery loader connector.

func (*LoaderFactory) Close

func (lf *LoaderFactory) Close(ctx context.Context) error

func (*LoaderFactory) NewLoader

func (lf *LoaderFactory) NewLoader(ctx context.Context, c entity.Config) (entity.Loader, error)

func (*LoaderFactory) NewSinkExtractor

func (lf *LoaderFactory) NewSinkExtractor(ctx context.Context, c entity.Config) (entity.Extractor, error)

func (*LoaderFactory) SinkId

func (lf *LoaderFactory) SinkId() string

type NameFromId added in v0.4.0

type NameFromId struct {
	Prefix       string `json:"prefix"`
	SuffixFromId string `json:"suffixFromId"`

	// Preset contains a list of Column names that will be added to table directly during table creation.
	Preset []string `json:"preset,omitempty"`
}

Creates a Column name from id outputs in transloaded event map

type Row

type Row struct {
	InsertId string
	// contains filtered or unexported fields
}

func NewRow

func NewRow() *Row

func (*Row) AddItem

func (r *Row) AddItem(item *RowItem)

func (*Row) Save

func (r *Row) Save() (map[string]bigquery.Value, string, error)

Save is required for implementing the BigQuery ValueSaver interface, as used by the bigquery.Inserter

func (*Row) Size

func (r *Row) Size() int

type RowItem

type RowItem struct {
	Name  string
	Value any
}

type SinkConfig added in v0.4.0

type SinkConfig struct {
	Tables []Table `json:"tables,omitempty"`

	// DiscardInvalidData specifies if invalid data should be prevented from being stored in the sink and instead
	// logged and discarded.
	// It increases CPU load somewhat but can be useful to enable in case of data from an unreliable source is
	// being continuously retried and where the stream's HandlingOfUnretryableEvents mode is not granular enough.
	// One example is when having the MicroBatch mode enabled and we want to just discard individual invalid
	// events, instead of retrying or DLQ:ing the whole micro batch.
	DiscardInvalidData bool `json:"discardInvalidData,omitempty"`
}

SinkConfig specifies the schema for the "customConfig" field in the "sink" section of the stream spec. It enables arbitrary connector specific fields to be present in the stream spec.

func NewSinkConfig added in v0.4.0

func NewSinkConfig(spec *entity.Spec) (sc SinkConfig, err error)

func (SinkConfig) Validate added in v0.4.0

func (sc SinkConfig) Validate() error

type Table added in v0.4.0

type Table struct {

	// Name and Dataset are required
	Name    string `json:"name"`
	Dataset string `json:"dataset"`

	// DatasetCreation is only required if the dataset is meant to be created by this stream
	// *and* if other values than the default ones are required.
	// Default values are location: EU and empty description.
	DatasetCreation *DatasetCreation `json:"datasetCreation,omitempty"`

	// Columns is required
	Columns []Column `json:"columns"`

	// TableCreation is only required if non-existing tables should be created automatically
	TableCreation *TableCreation `json:"tableCreation,omitempty"`

	// InsertIdFromId defines which value in the Transformed output map will contain the insert ID,
	// as extracted from one of the input event fields.
	// The value referred to in the transloaded output map needs to be of string type.
	// This is used for BigQuery best-effort deduplication.
	InsertIdFromId string `json:"insertIdFromId"`
}

Table governs table interactions, including creation and how each transformed event should be mapped and inserted into a table.

type TableCreation added in v0.4.0

type TableCreation struct {
	Description string `json:"description"`

	// If non-nil, the table is partitioned by time. Only one of
	// time partitioning or range partitioning can be specified.
	TimePartitioning *TimePartitioning `json:"timePartitioning,omitempty"`

	// If set to true, queries that reference this table must specify a
	// partition filter (e.g. a WHERE clause) that can be used to eliminate
	// partitions. Used to prevent unintentional full data scans on large
	// partitioned tables.
	RequirePartitionFilter bool `json:"requirePartitionFilter"`

	// Clustering specifies the data clustering configuration for the table.
	Clustering []string `json:"clustering,omitempty"`
}

TableCreation config contains table creation details. Most of the fields/comments in the struct are copied directly from BQ client, with modifications to fit with the GEIST spec format.

type TimePartitioning added in v0.4.0

type TimePartitioning struct {
	// Defines the partition interval type. Supported values are "DAY" or "HOUR".
	Type string `json:"type"`

	// The amount of hours to keep the storage for a partition.
	// If the duration is empty (0), the data in the partitions do not expire.
	ExpirationHours int `json:"expirationHours"`

	// If empty, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the
	// table is partitioned by this field. The field must be a top-level TIMESTAMP or
	// DATE field. Its mode must be NULLABLE or REQUIRED.
	Field string `json:"field"`
}

TimePartitioning describes the time-based date partitioning on a table. It is currently only used by BigQuery sinks and most of the fields/docs in the struct are copied directly from BQ client, with modifications to fit with the GEIST spec format. For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL