Documentation ¶
Overview ¶
Package bq is a library for working with BigQuery.
Limits ¶
Please see BigQuery docs: https://cloud.google.com/bigquery/quotas#streaminginserts for the most updated limits for streaming inserts. It is expected that the client is responsible for ensuring their usage will not exceed these limits through bq usage. A note on maximum rows per request: Put() batches rows per request, ensuring that no more than 10,000 rows are sent per request, and allowing for custom batch size. BigQuery recommends using 500 as a practical limit (so we use this as a default), and experimenting with your specific schema and data sizes to determine the batch size with the ideal balance of throughput and latency for your use case.
Authentication ¶
Authentication for the Cloud projects happens during client creation: https://godoc.org/cloud.google.com/go#pkg-examples. What form this takes depends on the application.
Monitoring ¶
You can use tsmon (https://godoc.org/go.chromium.org/luci/common/tsmon) to track upload latency and errors.
If Uploader.UploadsMetricName field is not zero, Uploader will create a counter metric to track successes and failures.
Index ¶
- Constants
- Variables
- func AddMissingFields(dest *bigquery.Schema, src bigquery.Schema)
- func EnsureTable(ctx context.Context, t Table, spec *bigquery.TableMetadata, ...) error
- func GenerateSchema(fdset *desc.FileDescriptorSet, message string) (schema bigquery.Schema, err error)
- func NewClient(ctx context.Context, gcpProject string) (*bigquery.Client, error)
- func NewWriterClient(ctx context.Context, gcpProject string) (*managedwriter.Client, error)
- func RowSize(row proto.Message) int
- func SchemaDiff(before, after bigquery.Schema) string
- func SchemaString(s bigquery.Schema) string
- func WaitForJob(ctx context.Context, job *bigquery.Job) (*bigquery.JobStatus, error)
- type EnsureTableOption
- type InsertIDGenerator
- type Row
- type SchemaApplyer
- type SchemaApplyerCache
- type SchemaConverter
- type SourceCodeInfoMap
- type Table
- type Uploader
- type Writer
Constants ¶
const MetadataVersionKey = "metadata_version"
MetadataVersionKey is the label key used to version table metadata. Increment the integer assigned to this label to push updated table metadata.
This label must be used in conjunction with the UpdateMetadata() EnsureTable(...) option to have effect.
The value assigned to this label must be a positive integer, like "1" or "9127".
See UpdateMetadata option for usage.
const RowMaxBytes = 9 * 1000 * 1000 // 9 MB
RowMaxBytes is the maximum number of row bytes to send in one BigQuery Storage Write API - AppendRows request. As at writing, the request size limit for this RPC is 10 MB: https://cloud.google.com/bigquery/quotas#write-api-limits. The maximum size of rows must be less than this as there are some overheads in each request. This also includes the overhead (16 bytes) for each row when calculating the row size. Please see RowSize for details.
const RowSizeOverhead = 16 // 16 bytes
RowSizeOverhead is the overhead for each row. It got added to proto.Size for the overhead not being captured.
Variables ¶
var ErrWrongTableKind = errors.New("cannot change a regular table into a view table or vice-versa")
ErrWrongTableKind represents a mismatch in BigQuery table type.
var InvalidRowTagKey = errors.NewTagKey("InvalidRow")
InvalidRowTagKey will be attached to error when appending rows if the rows are invalid (e.g. not UTF-8 compliance, or too large).
Functions ¶
func AddMissingFields ¶
AddMissingFields copies fields from src to dest if they are not present in dest.
func EnsureTable ¶
func EnsureTable(ctx context.Context, t Table, spec *bigquery.TableMetadata, options ...EnsureTableOption) error
EnsureTable creates a BigQuery table if it doesn't exist and updates its schema if it is stale.
By default, non-schema metadata, like View Definition, Partitioning and Clustering settings, will be applied if the table is being created but will not be synchronised after creation.
To synchronise more of the table metadata, including view definition, description and labels, see the MetadataVersioned option.
Existing fields will not be deleted.
func GenerateSchema ¶
func GenerateSchema(fdset *desc.FileDescriptorSet, message string) (schema bigquery.Schema, err error)
GenerateSchema generates BigQuery schema for the given proto message using the given set of message definitions.
func NewClient ¶
NewClient returns a new BigQuery client for use with the given GCP project, that authenticates as the LUCI service itself.
func NewWriterClient ¶
NewWriterClient returns a new BigQuery managedwriter client for use with the given GCP project, that authenticates as the project itself.
func SchemaDiff ¶
SchemaDiff returns unified diff of two schemas. Returns "" if there is no difference.
func SchemaString ¶
SchemaString returns schema in string format.
func WaitForJob ¶
WaitForJob waits for a BigQuery job to finish. If after timeout and the job has not finished, it will attempt to cancel the job. The cancellation is based on best-effort, so if there is an error, we just log instead of throwing the error. This is to avoid jobs overrunning each other and triggering a death spiral of write contention / starving each other of resources. The actual timeout for bigquery job will be context timeout reduced by 5 seconds. It is for the cancelling job to execute. If the context does not have a deadline, the bigquery job will have no timeout.
Types ¶
type EnsureTableOption ¶
type EnsureTableOption func(opts *ensureTableOpts)
EnsureTableOption defines an option passed to EnsureTable(...).
func RefreshViewInterval ¶
func RefreshViewInterval(d time.Duration) EnsureTableOption
RefreshViewInterval ensures the BigQuery view definition is updated if it has not been updated for duration d. This is to ensure indirect schema changes are propogated.
Scenario: You have a view defined with the SQL:
`SELECT * FROM base_table WHERE project = 'chromium'`.
By default, schema changes to base_table will not be reflected in the schema for the view (e.g. as seen in BigQuery UI). This is a usability issue for users of the view.
To cause indirect schema changes to propogate, when this option is set, the view definition will be prefixed with a one line comment like: -- Indirect schema version: 2023-05-01T12:34:56Z
The view (including comment) will be periodically refreshed if duration d has elapsed, triggering BigQuery to refresh the view schema.
If this option is set but the table definition is not for a view, an error will be returned by EnsureTable(...).
func UpdateMetadata ¶
func UpdateMetadata() EnsureTableOption
UpdateMetadata allows the non-schema metadata to be updated in EnsureTable(...), namely the view definition, clustering settings, description and labels.
This option requires the caller to use the `MetadataVersionKey` label to control metadata update rollouts.
Usage:
The table definition passed to EnsureTable(...) must define a label with the key `MetadataVersionKey`. The value of the label must be a positive integer. Incrementing the integer will trigger an update of table metadata.
table := client.Dataset("my_dataset").Table("my_table") spec := &bigquery.TableMetadata{ ... Labels: map[string]string { // Increment to update table metadata. MetadataVersionKey: "2", } } err := EnsureTable(ctx, table, spec, UpdateMetadata())
Rationale: Without a system to control rollouts, if there are multiple versions of the table metadata in production simultaneously (e.g. in canary and stable deployments), an edit war may ensue.
Such an edit war scenario is not an issue when we update schema only as columns are only added, never removed, so schema will always converge to the union of all columns.
type InsertIDGenerator ¶
type InsertIDGenerator struct { // Counter is an atomically-managed counter used to differentiate Insert // IDs produced by the same process. Counter int64 // Prefix should be able to uniquely identify this specific process, // to differentiate Insert IDs produced by different processes. // // If empty, prefix will be derived from system and process specific // properties. Prefix string }
InsertIDGenerator generates unique Insert IDs.
BigQuery uses Insert IDs to deduplicate rows in the streaming insert buffer. The association between Insert ID and row persists only for the time the row is in the buffer.
InsertIDGenerator is safe for concurrent use.
var ID InsertIDGenerator
ID is the global InsertIDGenerator
func (*InsertIDGenerator) Generate ¶
func (id *InsertIDGenerator) Generate() string
Generate returns a unique Insert ID.
type Row ¶
type Row struct { proto.Message // embedded // InsertID is unique per insert operation to handle deduplication. InsertID string }
Row implements bigquery.ValueSaver
type SchemaApplyer ¶
type SchemaApplyer struct {
// contains filtered or unexported fields
}
SchemaApplyer provides methods to synchronise BigQuery schema to match a desired state.
func NewSchemaApplyer ¶
func NewSchemaApplyer(cache SchemaApplyerCache) *SchemaApplyer
NewSchemaApplyer initialises a new schema applyer, using the given cache to cache BQ schema to avoid making duplicate BigQuery calls.
func (*SchemaApplyer) EnsureTable ¶
func (s *SchemaApplyer) EnsureTable(ctx context.Context, t Table, spec *bigquery.TableMetadata, options ...EnsureTableOption) error
EnsureTable creates a BigQuery table if it doesn't exist and updates its schema (or a view query for view tables) if it is stale. Non-schema options, like Partitioning and Clustering settings, will be applied if the table is being created but will not be synchronized after creation.
Existing fields will not be deleted.
Example usage: // At top of file var schemaApplyer = bq.NewSchemaApplyer(
bq.RegisterSchemaApplyerCache(50 ) // depending on how many different // tables will be used.
)
... // In method. table := client.Dataset("my_dataset").Table("my_table") schema := ... // e.g. from SchemaConverter.
spec := &bigquery.TableMetadata{ TimePartitioning: &bigquery.TimePartitioning{ Field: "partition_time", Expiration: 540 * time.Day, }, Schema: schema.Relax(), // Ensure no mandatory fields. }
err := schemaApplyer.EnsureBQTable(ctx, table, spec)
if err != nil { if transient.Tag.In(err) { // Handle retriable error. } else { // Handle fatal error. } }
type SchemaApplyerCache ¶
type SchemaApplyerCache struct {
// contains filtered or unexported fields
}
SchemaApplyerCache is used by SchemaApplyer to avoid making redundant BQ calls.
Instantiate it with RegisterSchemaApplyerCache(capacity) during init time.
func RegisterSchemaApplyerCache ¶
func RegisterSchemaApplyerCache(capacity int) SchemaApplyerCache
RegisterSchemaApplyerCache allocates a process cached used by SchemaApplier.
The capacity should roughly match expected number of tables the schema applier will work on.
Must be called during init time.
type SchemaConverter ¶
type SchemaConverter struct { Desc *descriptorpb.FileDescriptorSet SourceCodeInfo map[*descriptorpb.FileDescriptorProto]SourceCodeInfoMap }
type SourceCodeInfoMap ¶
type SourceCodeInfoMap map[any]*descriptorpb.SourceCodeInfo_Location
SourceCodeInfoMap maps descriptor proto messages to source code info, if available. See also descutil.IndexSourceCodeInfo.
type Table ¶
type Table interface { FullyQualifiedName() string Metadata(ctx context.Context, opts ...bigquery.TableMetadataOption) (md *bigquery.TableMetadata, err error) Create(ctx context.Context, md *bigquery.TableMetadata) error Update(ctx context.Context, md bigquery.TableMetadataToUpdate, etag string, opts ...bigquery.TableUpdateOption) (*bigquery.TableMetadata, error) }
Table is implemented by *bigquery.Table. See its documentation for description of the methods below.
type Uploader ¶
type Uploader struct { *bigquery.Inserter // Uploader is bound to a specific table. DatasetID and Table ID are // provided for reference. DatasetID string TableID string // UploadsMetricName is a string used to create a tsmon Counter metric // for event upload attempts via Put, e.g. // "/chrome/infra/commit_queue/events/count". If unset, no metric will // be created. UploadsMetricName string // BatchSize is the max number of rows to send to BigQuery at a time. // The default is 500. BatchSize int // contains filtered or unexported fields }
Uploader contains the necessary data for streaming data to BigQuery.
func NewUploader ¶
NewUploader constructs a new Uploader struct.
DatasetID and TableID are provided to the BigQuery client to gain access to a particular table.
You may want to change the default configuration of the bigquery.Inserter. Check the documentation for more details.
Set UploadsMetricName on the resulting Uploader to use the default counter metric.
Set BatchSize to set a custom batch size.
func (*Uploader) Put ¶
Put uploads one or more rows to the BigQuery service. Put takes care of adding InsertIDs, used by BigQuery to deduplicate rows.
If any rows do now match one of the expected types, Put will not attempt to upload any rows and returns an InvalidTypeError.
Put returns a PutMultiError if one or more rows failed to be uploaded. The PutMultiError contains a RowInsertionError for each failed row.
Put will retry on temporary errors. If the error persists, the call will run indefinitely. Because of this, if ctx does not have a timeout, Put will add one.
See bigquery documentation and source code for detailed information on how struct values are mapped to rows.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is used to export rows to BigQuery table.
func NewWriter ¶
func NewWriter( client *managedwriter.Client, tableName string, tableSchemaDescriptor *descriptorpb.DescriptorProto, ) *Writer
NewWriter creates a writer for exporting rows to the provided BigQuery table via the provided managedWriter client.
func (*Writer) AppendRowsWithDefaultStream ¶
AppendRowsWithDefaultStream write to the default stream. This does not provide exactly-once semantics (it provides at least once). The at least once semantic is similar to the legacy streaming API.