bq

package
v0.0.0-...-16534be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 46 Imported by: 30

Documentation

Overview

Package bq is a library for working with BigQuery.

Limits

Please see BigQuery docs: https://cloud.google.com/bigquery/quotas#streaminginserts for the most updated limits for streaming inserts. It is expected that the client is responsible for ensuring their usage will not exceed these limits through bq usage. A note on maximum rows per request: Put() batches rows per request, ensuring that no more than 10,000 rows are sent per request, and allowing for custom batch size. BigQuery recommends using 500 as a practical limit (so we use this as a default), and experimenting with your specific schema and data sizes to determine the batch size with the ideal balance of throughput and latency for your use case.

Authentication

Authentication for the Cloud projects happens during client creation: https://godoc.org/cloud.google.com/go#pkg-examples. What form this takes depends on the application.

Monitoring

You can use tsmon (https://godoc.org/go.chromium.org/luci/common/tsmon) to track upload latency and errors.

If Uploader.UploadsMetricName field is not zero, Uploader will create a counter metric to track successes and failures.

Index

Constants

View Source
const MetadataVersionKey = "metadata_version"

MetadataVersionKey is the label key used to version table metadata. Increment the integer assigned to this label to push updated table metadata.

This label must be used in conjunction with the UpdateMetadata() EnsureTable(...) option to have effect.

The value assigned to this label must be a positive integer, like "1" or "9127".

See UpdateMetadata option for usage.

View Source
const RowMaxBytes = 9 * 1000 * 1000 // 9 MB

RowMaxBytes is the maximum number of row bytes to send in one BigQuery Storage Write API - AppendRows request. As at writing, the request size limit for this RPC is 10 MB: https://cloud.google.com/bigquery/quotas#write-api-limits. The maximum size of rows must be less than this as there are some overheads in each request. This also includes the overhead (16 bytes) for each row when calculating the row size. Please see RowSize for details.

View Source
const RowSizeOverhead = 16 // 16 bytes

RowSizeOverhead is the overhead for each row. It got added to proto.Size for the overhead not being captured.

Variables

View Source
var ErrWrongTableKind = errors.New("cannot change a regular table into a view table or vice-versa")

ErrWrongTableKind represents a mismatch in BigQuery table type.

View Source
var InvalidRowTagKey = errors.NewTagKey("InvalidRow")

InvalidRowTagKey will be attached to error when appending rows if the rows are invalid (e.g. not UTF-8 compliance, or too large).

Functions

func AddMissingFields

func AddMissingFields(dest *bigquery.Schema, src bigquery.Schema)

AddMissingFields copies fields from src to dest if they are not present in dest.

func EnsureTable

func EnsureTable(ctx context.Context, t Table, spec *bigquery.TableMetadata, options ...EnsureTableOption) error

EnsureTable creates a BigQuery table if it doesn't exist and updates its schema if it is stale.

By default, non-schema metadata, like View Definition, Partitioning and Clustering settings, will be applied if the table is being created but will not be synchronised after creation.

To synchronise more of the table metadata, including view definition, description and labels, see the MetadataVersioned option.

Existing fields will not be deleted.

func GenerateSchema

func GenerateSchema(fdset *desc.FileDescriptorSet, message string) (schema bigquery.Schema, err error)

GenerateSchema generates BigQuery schema for the given proto message using the given set of message definitions.

func NewClient

func NewClient(ctx context.Context, gcpProject string) (*bigquery.Client, error)

NewClient returns a new BigQuery client for use with the given GCP project, that authenticates as the LUCI service itself.

func NewWriterClient

func NewWriterClient(ctx context.Context, gcpProject string) (*managedwriter.Client, error)

NewWriterClient returns a new BigQuery managedwriter client for use with the given GCP project, that authenticates as the project itself.

func RowSize

func RowSize(row proto.Message) int

RowSize return size of row when we do batching.

func SchemaDiff

func SchemaDiff(before, after bigquery.Schema) string

SchemaDiff returns unified diff of two schemas. Returns "" if there is no difference.

func SchemaString

func SchemaString(s bigquery.Schema) string

SchemaString returns schema in string format.

func WaitForJob

func WaitForJob(ctx context.Context, job *bigquery.Job) (*bigquery.JobStatus, error)

WaitForJob waits for a BigQuery job to finish. If after timeout and the job has not finished, it will attempt to cancel the job. The cancellation is based on best-effort, so if there is an error, we just log instead of throwing the error. This is to avoid jobs overrunning each other and triggering a death spiral of write contention / starving each other of resources. The actual timeout for bigquery job will be context timeout reduced by 5 seconds. It is for the cancelling job to execute. If the context does not have a deadline, the bigquery job will have no timeout.

Types

type EnsureTableOption

type EnsureTableOption func(opts *ensureTableOpts)

EnsureTableOption defines an option passed to EnsureTable(...).

func RefreshViewInterval

func RefreshViewInterval(d time.Duration) EnsureTableOption

RefreshViewInterval ensures the BigQuery view definition is updated if it has not been updated for duration d. This is to ensure indirect schema changes are propogated.

Scenario: You have a view defined with the SQL:

`SELECT * FROM base_table WHERE project = 'chromium'`.

By default, schema changes to base_table will not be reflected in the schema for the view (e.g. as seen in BigQuery UI). This is a usability issue for users of the view.

To cause indirect schema changes to propogate, when this option is set, the view definition will be prefixed with a one line comment like: -- Indirect schema version: 2023-05-01T12:34:56Z

The view (including comment) will be periodically refreshed if duration d has elapsed, triggering BigQuery to refresh the view schema.

If this option is set but the table definition is not for a view, an error will be returned by EnsureTable(...).

func UpdateMetadata

func UpdateMetadata() EnsureTableOption

UpdateMetadata allows the non-schema metadata to be updated in EnsureTable(...), namely the view definition, clustering settings, description and labels.

This option requires the caller to use the `MetadataVersionKey` label to control metadata update rollouts.

Usage:

The table definition passed to EnsureTable(...) must define a label with the key `MetadataVersionKey`. The value of the label must be a positive integer. Incrementing the integer will trigger an update of table metadata.

table := client.Dataset("my_dataset").Table("my_table")
spec :=	&bigquery.TableMetadata{
   ...
   Labels: map[string]string {
      // Increment to update table metadata.
      MetadataVersionKey: "2",
   }
}
err := EnsureTable(ctx, table, spec, UpdateMetadata())

Rationale: Without a system to control rollouts, if there are multiple versions of the table metadata in production simultaneously (e.g. in canary and stable deployments), an edit war may ensue.

Such an edit war scenario is not an issue when we update schema only as columns are only added, never removed, so schema will always converge to the union of all columns.

type InsertIDGenerator

type InsertIDGenerator struct {
	// Counter is an atomically-managed counter used to differentiate Insert
	// IDs produced by the same process.
	Counter int64
	// Prefix should be able to uniquely identify this specific process,
	// to differentiate Insert IDs produced by different processes.
	//
	// If empty, prefix will be derived from system and process specific
	// properties.
	Prefix string
}

InsertIDGenerator generates unique Insert IDs.

BigQuery uses Insert IDs to deduplicate rows in the streaming insert buffer. The association between Insert ID and row persists only for the time the row is in the buffer.

InsertIDGenerator is safe for concurrent use.

ID is the global InsertIDGenerator

func (*InsertIDGenerator) Generate

func (id *InsertIDGenerator) Generate() string

Generate returns a unique Insert ID.

type Row

type Row struct {
	proto.Message // embedded

	// InsertID is unique per insert operation to handle deduplication.
	InsertID string
}

Row implements bigquery.ValueSaver

func (*Row) Save

func (r *Row) Save() (map[string]bigquery.Value, string, error)

Save is used by bigquery.Inserter.Put when inserting values into a table.

type SchemaApplyer

type SchemaApplyer struct {
	// contains filtered or unexported fields
}

SchemaApplyer provides methods to synchronise BigQuery schema to match a desired state.

func NewSchemaApplyer

func NewSchemaApplyer(cache SchemaApplyerCache) *SchemaApplyer

NewSchemaApplyer initialises a new schema applyer, using the given cache to cache BQ schema to avoid making duplicate BigQuery calls.

func (*SchemaApplyer) EnsureTable

func (s *SchemaApplyer) EnsureTable(ctx context.Context, t Table, spec *bigquery.TableMetadata, options ...EnsureTableOption) error

EnsureTable creates a BigQuery table if it doesn't exist and updates its schema (or a view query for view tables) if it is stale. Non-schema options, like Partitioning and Clustering settings, will be applied if the table is being created but will not be synchronized after creation.

Existing fields will not be deleted.

Example usage: // At top of file var schemaApplyer = bq.NewSchemaApplyer(

bq.RegisterSchemaApplyerCache(50 ) // depending on how many different
                                   // tables will be used.

)

... // In method. table := client.Dataset("my_dataset").Table("my_table") schema := ... // e.g. from SchemaConverter.

spec := &bigquery.TableMetadata{
   TimePartitioning: &bigquery.TimePartitioning{
     Field:      "partition_time",
     Expiration: 540 * time.Day,
   },
   Schema: schema.Relax(), // Ensure no mandatory fields.
}

err := schemaApplyer.EnsureBQTable(ctx, table, spec)

if err != nil {
   if transient.Tag.In(err) {
      // Handle retriable error.
   } else {
      // Handle fatal error.
   }
}

type SchemaApplyerCache

type SchemaApplyerCache struct {
	// contains filtered or unexported fields
}

SchemaApplyerCache is used by SchemaApplyer to avoid making redundant BQ calls.

Instantiate it with RegisterSchemaApplyerCache(capacity) during init time.

func RegisterSchemaApplyerCache

func RegisterSchemaApplyerCache(capacity int) SchemaApplyerCache

RegisterSchemaApplyerCache allocates a process cached used by SchemaApplier.

The capacity should roughly match expected number of tables the schema applier will work on.

Must be called during init time.

type SchemaConverter

type SchemaConverter struct {
	Desc           *descriptorpb.FileDescriptorSet
	SourceCodeInfo map[*descriptorpb.FileDescriptorProto]SourceCodeInfoMap
}

func (*SchemaConverter) Schema

func (c *SchemaConverter) Schema(messageName string) (schema bigquery.Schema, description string, err error)

Schema constructs a bigquery.Schema from a named message.

type SourceCodeInfoMap

type SourceCodeInfoMap map[any]*descriptorpb.SourceCodeInfo_Location

SourceCodeInfoMap maps descriptor proto messages to source code info, if available. See also descutil.IndexSourceCodeInfo.

type Table

type Table interface {
	FullyQualifiedName() string
	Metadata(ctx context.Context, opts ...bigquery.TableMetadataOption) (md *bigquery.TableMetadata, err error)
	Create(ctx context.Context, md *bigquery.TableMetadata) error
	Update(ctx context.Context, md bigquery.TableMetadataToUpdate, etag string, opts ...bigquery.TableUpdateOption) (*bigquery.TableMetadata, error)
}

Table is implemented by *bigquery.Table. See its documentation for description of the methods below.

type Uploader

type Uploader struct {
	*bigquery.Inserter
	// Uploader is bound to a specific table. DatasetID and Table ID are
	// provided for reference.
	DatasetID string
	TableID   string
	// UploadsMetricName is a string used to create a tsmon Counter metric
	// for event upload attempts via Put, e.g.
	// "/chrome/infra/commit_queue/events/count". If unset, no metric will
	// be created.
	UploadsMetricName string

	// BatchSize is the max number of rows to send to BigQuery at a time.
	// The default is 500.
	BatchSize int
	// contains filtered or unexported fields
}

Uploader contains the necessary data for streaming data to BigQuery.

func NewUploader

func NewUploader(ctx context.Context, c *bigquery.Client, datasetID, tableID string) *Uploader

NewUploader constructs a new Uploader struct.

DatasetID and TableID are provided to the BigQuery client to gain access to a particular table.

You may want to change the default configuration of the bigquery.Inserter. Check the documentation for more details.

Set UploadsMetricName on the resulting Uploader to use the default counter metric.

Set BatchSize to set a custom batch size.

func (*Uploader) Put

func (u *Uploader) Put(ctx context.Context, messages ...proto.Message) error

Put uploads one or more rows to the BigQuery service. Put takes care of adding InsertIDs, used by BigQuery to deduplicate rows.

If any rows do now match one of the expected types, Put will not attempt to upload any rows and returns an InvalidTypeError.

Put returns a PutMultiError if one or more rows failed to be uploaded. The PutMultiError contains a RowInsertionError for each failed row.

Put will retry on temporary errors. If the error persists, the call will run indefinitely. Because of this, if ctx does not have a timeout, Put will add one.

See bigquery documentation and source code for detailed information on how struct values are mapped to rows.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is used to export rows to BigQuery table.

func NewWriter

func NewWriter(
	client *managedwriter.Client,
	tableName string,
	tableSchemaDescriptor *descriptorpb.DescriptorProto,
) *Writer

NewWriter creates a writer for exporting rows to the provided BigQuery table via the provided managedWriter client.

func (*Writer) AppendRowsWithDefaultStream

func (s *Writer) AppendRowsWithDefaultStream(ctx context.Context, rows []proto.Message) error

AppendRowsWithDefaultStream write to the default stream. This does not provide exactly-once semantics (it provides at least once). The at least once semantic is similar to the legacy streaming API.

func (*Writer) AppendRowsWithPendingStream

func (s *Writer) AppendRowsWithPendingStream(ctx context.Context, rows []proto.Message) error

AppendRowsWithPendingStream append rows to BigQuery table via the pending stream. This provides all-or-nothing semantics for insertion.

Directories

Path Synopsis
Package pb contains helper protobuf messages used to define BQ schemas.
Package pb contains helper protobuf messages used to define BQ schemas.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL