Documentation ¶
Index ¶
- Constants
- Variables
- func NewBigQueryClient(c entity.Config, client *bigquery.Client) *defaultBigQueryClient
- type BigQueryClient
- type BigQueryInserter
- type Column
- type Columns
- type Config
- type DatasetCreation
- type DatasetTableStatus
- type LoaderFactory
- func (lf *LoaderFactory) Close(ctx context.Context) error
- func (lf *LoaderFactory) NewLoader(ctx context.Context, c entity.Config) (entity.Loader, error)
- func (lf *LoaderFactory) NewSinkExtractor(ctx context.Context, c entity.Config) (entity.Extractor, error)
- func (lf *LoaderFactory) SinkId() string
- type NameFromId
- type Row
- type RowItem
- type SinkConfig
- type Table
- type TableCreation
- type TimePartitioning
Constants ¶
const ( TableUpdateBackoffTime = 8 * time.Second DefaultBigQueryDatasetLocation = "EU" )
Variables ¶
Functions ¶
Types ¶
type BigQueryClient ¶
type BigQueryClient interface { GetDatasetMetadata(ctx context.Context, dataset *bigquery.Dataset) (*bigquery.DatasetMetadata, DatasetTableStatus, error) CreateDatasetRef(datasetId string) *bigquery.Dataset CreateDataset(ctx context.Context, id string, md *bigquery.DatasetMetadata) error GetTableMetadata(ctx context.Context, table *bigquery.Table) (*bigquery.TableMetadata, DatasetTableStatus, error) CreateTableRef(datasetId string, tableId string) *bigquery.Table CreateTable(ctx context.Context, datasetId string, tableId string, tm *bigquery.TableMetadata) (*bigquery.Table, error) GetTableInserter(table *bigquery.Table) BigQueryInserter UpdateTable(ctx context.Context, table *bigquery.Table, tm bigquery.TableMetadataToUpdate, etag string) (*bigquery.TableMetadata, error) Close() error }
type Column ¶ added in v0.4.0
type Column struct { // Name of the column as specified at spec registration time. // One of Name or NameFromId needs to be present in the column spec. Name string `json:"name"` // If NameFromId is non-nil columns will be generated dynamically based on transformation output. // The name of the column will be set to the value in the Transformed map, with the key as found in NameFromId. // Note that the field fetched from the event, to be the column name, need to be of string type. NameFromId *NameFromId `json:"nameFromId,omitempty"` // Mode uses the definitions as set by BigQuery with "NULLABLE", "REQUIRED" or "REPEATED" Mode string `json:"mode"` // Type uses the BigQuery Standard SQL types. // The type here needs to match the one used in the Transform extract field spec. // For date/time/timestamp types the type used in the Transform extract field spec needs to be set to // "isoTimestamp" or "unixTimestamp". Type string `json:"type"` Description string `json:"description"` Fields []Column `json:"fields"` // For nested columns // ValueFromId is not part of schema definition per se, but specifies what value from the incoming // transformed data that should be inserted here. // A special value can be set to have a column with GEIST ingestion time, which could be used together // with TimePartitioning config, as an alternative to the also available default BQ insert partitioning. // To enable this, the field should be set to "@geistIngestionTime", with column type set to "TIMESTAMP" // and mode set to "NULLABLE". ValueFromId string `json:"valueFromId"` }
Columns specifies how to map transformed fields to rows/columns, etc.
type Config ¶
type Config struct { // ProjectId (required) specifies GCP project ID for this deployment. ProjectId string // Client (optional) enables fully customized clients to be used, e.g., for unit // testing. Client BigQueryClient // Creds (optional) can be used to override the default authentication method (using // GOOGLE_APPLICATION_CREDENTIALS) by providing externally created credentials. Creds *google.Credentials }
Config contains connector config parameters that are not available for providing in the stream specs.
A general constraint with BigQuery is to have each microbatch of transformed events (to be inserted as rows) be below 5MB to avoid BigQuery http request size limit of 10MB for streaming inserts.
type DatasetCreation ¶ added in v0.4.0
type DatasetCreation struct { Description string `json:"description"` // Geo location of dataset. // Valid values are: // EU // europe // US // plus all regional ones as described here: https://cloud.google.com/bigquery/docs/locations // If omitted or empty the default location will be set to EU. Location string `json:"location"` }
DatasetCreation config contains table creation details.
type DatasetTableStatus ¶
type DatasetTableStatus int
const ( Unknown DatasetTableStatus = iota Existent NonExistent )
type LoaderFactory ¶
type LoaderFactory struct {
// contains filtered or unexported fields
}
func NewLoaderFactory ¶
func NewLoaderFactory(ctx context.Context, config Config) (*LoaderFactory, error)
NewLoaderFactory creates a new BigQuery loader connector.
func (*LoaderFactory) NewSinkExtractor ¶
func (*LoaderFactory) SinkId ¶
func (lf *LoaderFactory) SinkId() string
type NameFromId ¶ added in v0.4.0
type NameFromId struct { Prefix string `json:"prefix"` SuffixFromId string `json:"suffixFromId"` // Preset contains a list of Column names that will be added to table directly during table creation. Preset []string `json:"preset,omitempty"` }
Creates a Column name from id outputs in transloaded event map
type Row ¶
type Row struct { InsertId string // contains filtered or unexported fields }
type SinkConfig ¶ added in v0.4.0
type SinkConfig struct { Tables []Table `json:"tables,omitempty"` // DiscardInvalidData specifies if invalid data should be prevented from being stored in the sink and instead // logged and discarded. // It increases CPU load somewhat but can be useful to enable in case of data from an unreliable source is // being continuously retried and where the stream's HandlingOfUnretryableEvents mode is not granular enough. // One example is when having the MicroBatch mode enabled and we want to just discard individual invalid // events, instead of retrying or DLQ:ing the whole micro batch. DiscardInvalidData bool `json:"discardInvalidData,omitempty"` }
SinkConfig specifies the schema for the "customConfig" field in the "sink" section of the stream spec. It enables arbitrary connector specific fields to be present in the stream spec.
func NewSinkConfig ¶ added in v0.4.0
func NewSinkConfig(spec *entity.Spec) (sc SinkConfig, err error)
func (SinkConfig) Validate ¶ added in v0.4.0
func (sc SinkConfig) Validate() error
type Table ¶ added in v0.4.0
type Table struct { // Name and Dataset are required Name string `json:"name"` Dataset string `json:"dataset"` // DatasetCreation is only required if the dataset is meant to be created by this stream // *and* if other values than the default ones are required. // Default values are location: EU and empty description. DatasetCreation *DatasetCreation `json:"datasetCreation,omitempty"` // Columns is required Columns []Column `json:"columns"` // TableCreation is only required if non-existing tables should be created automatically TableCreation *TableCreation `json:"tableCreation,omitempty"` // InsertIdFromId defines which value in the Transformed output map will contain the insert ID, // as extracted from one of the input event fields. // The value referred to in the transloaded output map needs to be of string type. // This is used for BigQuery best-effort deduplication. InsertIdFromId string `json:"insertIdFromId"` }
Table governs table interactions, including creation and how each transformed event should be mapped and inserted into a table.
type TableCreation ¶ added in v0.4.0
type TableCreation struct { Description string `json:"description"` // If non-nil, the table is partitioned by time. Only one of // time partitioning or range partitioning can be specified. TimePartitioning *TimePartitioning `json:"timePartitioning,omitempty"` // If set to true, queries that reference this table must specify a // partition filter (e.g. a WHERE clause) that can be used to eliminate // partitions. Used to prevent unintentional full data scans on large // partitioned tables. RequirePartitionFilter bool `json:"requirePartitionFilter"` // Clustering specifies the data clustering configuration for the table. Clustering []string `json:"clustering,omitempty"` }
TableCreation config contains table creation details. Most of the fields/comments in the struct are copied directly from BQ client, with modifications to fit with the GEIST spec format.
type TimePartitioning ¶ added in v0.4.0
type TimePartitioning struct { // Defines the partition interval type. Supported values are "DAY" or "HOUR". Type string `json:"type"` // The amount of hours to keep the storage for a partition. // If the duration is empty (0), the data in the partitions do not expire. ExpirationHours int `json:"expirationHours"` // If empty, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the // table is partitioned by this field. The field must be a top-level TIMESTAMP or // DATE field. Its mode must be NULLABLE or REQUIRED. Field string `json:"field"` }
TimePartitioning describes the time-based date partitioning on a table. It is currently only used by BigQuery sinks and most of the fields/docs in the struct are copied directly from BQ client, with modifications to fit with the GEIST spec format. For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables.