Documentation ¶
Overview ¶
Package managedwriter provides an EXPERIMENTAL thick client around the BigQuery storage API's BigQueryWriteClient. More information about this new write client may also be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api
It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this package may still make breaking changes to existing methods and functionality.
Currently, this client targets the BigQueryWriteClient present in the v1 endpoint, and is intended as a more feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST methods.
Creating a Client ¶
To start working with this package, create a client:
ctx := context.Background() client, err := managedwriter.NewClient(ctx, projectID) if err != nil { // TODO: Handle error. }
Defining the Protocol Buffer Schema ¶
The write functionality of BigQuery Storage requires data to be sent using encoded protocol buffer messages using proto2 wire format. As the protocol buffer is not self-describing, you will need to provide the protocol buffer schema. This is communicated using a DescriptorProto message, defined within the protocol buffer libraries: https://pkg.go.dev/google.golang.org/protobuf/types/descriptorpb#DescriptorProto
More information about protocol buffers can be found in the proto2 language guide: https://developers.google.com/protocol-buffers/docs/proto
Details about data type conversions between BigQuery and protocol buffers can be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
For cases where the protocol buffer is compiled from a static ".proto" definition, this process is straightforward. Instantiate an example message, then convert the descriptor into a descriptor proto:
m := &myprotopackage.MyCompiledMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
If the message uses advanced protocol buffer features like nested messages/groups, or enums, the cloud.google.com/go/bigquery/storage/managedwriter/adapt subpackage contains functionality to normalize the descriptor into a self-contained definition:
m := &myprotopackage.MyCompiledMessage{} descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) if err != nil { // TODO: Handle error. }
The adapt subpackage also contains functionality for generating a DescriptorProto using a BigQuery table's schema directly.
Constructing a ManagedStream ¶
The ManagedStream handles management of the underlying write connection to the BigQuery Storage service. You can either create a write session explicitly and pass it in, or create the write stream while setting up the ManagedStream.
It's easiest to register the protocol buffer descriptor you'll be using to send data when setting up the managed stream using the WithSchemaDescriptor option, though you can also set/change the schema as part of an append request once the ManagedStream is created.
// Create a ManagedStream using an explicit stream identifer, either a default // stream or one explicitly created by CreateWriteStream. managedStream, err := client.NewManagedStream(ctx, WithStreamName(streamName), WithSchemaDescriptor(descriptorProto)) if err != nil { // TODO: Handle error. }
In addition, NewManagedStream can create new streams implicitly:
// Alternately, allow the ManagedStream to handle stream construction by supplying // additional options. tableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", myProject, myDataset, myTable) manageStream, err := client.NewManagedStream(ctx, WithDestinationTable(tableName), WithType(managedwriter.BufferedStream), WithSchemaDescriptor(descriptorProto)) if err != nil { // TODO: Handle error. }
Writing Data ¶
Use the AppendRows function to write one or more serialized proto messages to a stream. You can choose to specify an offset in the stream to handle de-duplication for user-created streams, but a "default" stream neither accepts nor reports offsets.
AppendRows returns a future-like object that blocks until the write is successful or yields an error.
// Define a couple of messages. mesgs := []*myprotopackage.MyCompiledMessage{ { UserName: proto.String("johndoe"), EmailAddress: proto.String("jd@mycompany.mydomain", FavoriteNumbers: []proto.Int64{1,42,12345}, }, { UserName: proto.String("janesmith"), EmailAddress: proto.String("smith@othercompany.otherdomain", FavoriteNumbers: []proto.Int64{1,3,5,7,9}, }, } // Encode the messages into binary format. encoded := make([][]byte, len(mesgs)) for k, v := range mesgs{ b, err := proto.Marshal(v) if err != nil { // TODO: Handle error. } encoded[k] = b } // Send the rows to the service, and specify an offset for managing deduplication. result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0)) // Block until the write is complete and return the result. returnedOffset, err := result.GetResult(ctx) if err != nil { // TODO: Handle error. }
Buffered Stream Management ¶
For Buffered streams, users control when data is made visible in the destination table/stream independently of when it is written. Use FlushRows on the ManagedStream to advance the flush point ahead in the stream.
// We've written 1500+ rows in the stream, and want to advance the flush point // ahead to make the first 1000 rows available. flushOffset, err := managedStream.FlushRows(ctx, 1000)
Pending Stream Management ¶
Pending streams allow users to commit data from multiple streams together once the streams have been finalized, meaning they'll no longer allow further data writes.
// First, finalize the stream we're writing into. totalRows, err := managedStream.Finalize(ctx) if err != nil { // TODO: Handle error. } req := &storagepb.BatchCommitWriteStreamsRequest{ Parent: parentName, WriteStreams: []string{managedStream.StreamName()}, } // Using the client, we can commit data from multple streams to the same // table atomically. resp, err := client.BatchCommitWriteStreams(ctx, req)
Index ¶
- Constants
- Variables
- func TableParentFromParts(projectID, datasetID, tableID string) string
- func TableParentFromStreamName(streamName string) string
- type AppendOption
- type AppendResult
- type Client
- func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, ...) (*storagepb.BatchCommitWriteStreamsResponse, error)
- func (c *Client) Close() error
- func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, ...) (*storagepb.WriteStream, error)
- func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)
- type ManagedStream
- func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error)
- func (ms *ManagedStream) Close() error
- func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error)
- func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error)
- func (ms *ManagedStream) StreamName() string
- func (ms *ManagedStream) StreamType() StreamType
- type StreamType
- type WriterOption
- func WithAppendRowsCallOption(o gax.CallOption) WriterOption
- func WithDataOrigin(dataOrigin string) WriterOption
- func WithDestinationTable(destTable string) WriterOption
- func WithMaxInflightBytes(n int) WriterOption
- func WithMaxInflightRequests(n int) WriterOption
- func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption
- func WithStreamName(name string) WriterOption
- func WithTraceID(traceID string) WriterOption
- func WithType(st StreamType) WriterOption
Constants ¶
const DetectProjectID = "*detect-project-id*"
DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials (https://developers.google.com/accounts/docs/application-default-credentials) if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.
const NoStreamOffset int64 = -1
NoStreamOffset is a sentinel value for signalling we're not tracking stream offset (e.g. a default stream which allows simultaneous append streams).
Variables ¶
var ( // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) // AppendRequests is a measure of the number of append requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) // AppendRequestBytes is a measure of the bytes sent as append requests. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes) // AppendRequestErrors is a measure of the number of append requests that errored on send. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless) // AppendRequestRows is a measure of the number of append rows sent. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless) // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) // AppendResponseErrors is a measure of the number of append responses received with an error attached. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) )
var ( // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenView *view.View // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenRetryView *view.View // AppendRequestsView is a cumulative sum of AppendRequests. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestsView *view.View // AppendRequestBytesView is a cumulative sum of AppendRequestBytes. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestBytesView *view.View // AppendRequestErrorsView is a cumulative sum of AppendRequestErrors. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestErrorsView *view.View // AppendRequestRowsView is a cumulative sum of AppendRows. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRowsView *view.View // AppendResponsesView is a cumulative sum of AppendResponses. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponsesView *view.View // AppendResponseErrorsView is a cumulative sum of AppendResponseErrors. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrorsView *view.View // FlushRequestsView is a cumulative sum of FlushRequests. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequestsView *view.View )
var DefaultOpenCensusViews []*view.View
DefaultOpenCensusViews retains the set of all opencensus views that this library has instrumented, to add view registration for exporters.
Functions ¶
func TableParentFromParts ¶ added in v1.29.0
TableParentFromParts constructs a table identifier using individual identifiers and returns a string in the form "projects/{project}/datasets/{dataset}/tables/{table}".
func TableParentFromStreamName ¶
TableParentFromStreamName is a utility function for extracting the parent table prefix from a stream name. When an invalid stream ID is passed, this simply returns the original stream name.
Types ¶
type AppendOption ¶ added in v1.25.0
type AppendOption func(*pendingWrite)
AppendOption are options that can be passed when appending data with a managed stream instance.
func UpdateSchemaDescriptor ¶ added in v1.25.0
func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption
UpdateSchemaDescriptor is used to update the descriptor message schema associated with a given stream.
func WithOffset ¶ added in v1.25.0
func WithOffset(offset int64) AppendOption
WithOffset sets an explicit offset value for this append request.
type AppendResult ¶
type AppendResult struct {
// contains filtered or unexported fields
}
AppendResult tracks the status of a batch of data rows.
func (*AppendResult) GetResult ¶
func (ar *AppendResult) GetResult(ctx context.Context) (int64, error)
GetResult returns the optional offset of this row, or the associated error. It blocks until the result is ready.
func (*AppendResult) Ready ¶
func (ar *AppendResult) Ready() <-chan struct{}
Ready blocks until the append request has reached a completed state, which may be a successful append or an error.
func (*AppendResult) UpdatedSchema ¶ added in v1.26.0
func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error)
UpdatedSchema returns the updated schema for a table if supplied by the backend as part of the append response. It blocks until the result is ready.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a managed BigQuery Storage write client scoped to a single project.
func NewClient ¶
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)
NewClient instantiates a new client.
func (*Client) BatchCommitWriteStreams ¶ added in v1.25.0
func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error)
BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same parent table.
Streams must be finalized before commit and cannot be committed multiple times. Once a stream is committed, data in the stream becomes available for read operations.
func (*Client) CreateWriteStream ¶ added in v1.25.0
func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error)
CreateWriteStream creates a write stream to the given table. Additionally, every table has a special stream named ‘_default’ to which data can be written. This stream doesn’t need to be created using CreateWriteStream. It is a stream that can be used simultaneously by any number of clients. Data written to this stream is considered committed as soon as an acknowledgement is received.
func (*Client) NewManagedStream ¶
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)
NewManagedStream establishes a new managed stream for appending data into a table.
Context here is retained for use by the underlying streaming connections the managed stream may create.
type ManagedStream ¶
type ManagedStream struct {
// contains filtered or unexported fields
}
ManagedStream is the abstraction over a single write stream.
func (*ManagedStream) AppendRows ¶
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error)
AppendRows sends the append requests to the service, and returns a single AppendResult for tracking the set of data.
The format of the row data is binary serialized protocol buffer bytes. The message must be compatible with the schema currently set for the stream.
Use the WithOffset() AppendOption to set an explicit offset for this append. Setting an offset for a default stream is unsupported.
func (*ManagedStream) Finalize ¶
func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error)
Finalize is used to mark a stream as complete, and thus ensure no further data can be appended to the stream. You cannot finalize a DefaultStream, as it always exists.
Finalizing does not advance the current offset of a BufferedStream, nor does it commit data in a PendingStream.
func (*ManagedStream) FlushRows ¶
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error)
FlushRows advances the offset at which rows in a BufferedStream are visible. Calling this method for other stream types yields an error.
func (*ManagedStream) StreamName ¶
func (ms *ManagedStream) StreamName() string
StreamName returns the corresponding write stream ID being managed by this writer.
func (*ManagedStream) StreamType ¶
func (ms *ManagedStream) StreamType() StreamType
StreamType returns the configured type for this stream.
type StreamType ¶
type StreamType string
StreamType indicates the type of stream this write client is managing.
var ( // DefaultStream most closely mimics the legacy bigquery // tabledata.insertAll semantics. Successful inserts are // committed immediately, and there's no tracking offsets as // all writes go into a "default" stream that always exists // for a table. DefaultStream StreamType = "DEFAULT" // CommittedStream appends data immediately, but creates a // discrete stream for the work so that offset tracking can // be used to track writes. CommittedStream StreamType = "COMMITTED" // BufferedStream is a form of checkpointed stream, that allows // you to advance the offset of visible rows via Flush operations. // // NOTE: Buffered Streams are currently in limited preview, and as such // methods like FlushRows() may yield errors for non-enrolled projects. BufferedStream StreamType = "BUFFERED" // PendingStream is a stream in which no data is made visible to // readers until the stream is finalized and committed explicitly. PendingStream StreamType = "PENDING" )
type WriterOption ¶
type WriterOption func(*ManagedStream)
WriterOption are variadic options used to configure a ManagedStream instance.
func WithAppendRowsCallOption ¶ added in v1.25.0
func WithAppendRowsCallOption(o gax.CallOption) WriterOption
WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when it opens the underlying append stream.
func WithDataOrigin ¶ added in v1.21.0
func WithDataOrigin(dataOrigin string) WriterOption
WithDataOrigin is used to attach an origin context to the instrumentation metrics emitted by the library.
func WithDestinationTable ¶
func WithDestinationTable(destTable string) WriterOption
WithDestinationTable specifies the destination table to which a created stream will append rows. Format of the table:
projects/{projectid}/datasets/{dataset}/tables/{table}
func WithMaxInflightBytes ¶
func WithMaxInflightBytes(n int) WriterOption
WithMaxInflightBytes bounds the inflight append request bytes on the write connection.
func WithMaxInflightRequests ¶
func WithMaxInflightRequests(n int) WriterOption
WithMaxInflightRequests bounds the inflight appends on the write connection.
func WithSchemaDescriptor ¶
func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption
WithSchemaDescriptor describes the format of the serialized data being sent by AppendRows calls on the stream.
func WithStreamName ¶
func WithStreamName(name string) WriterOption
WithStreamName allows users to set the stream name this writer will append to explicitly. By default, the managed client will create the stream when instantiated if necessary.
Note: Supplying this option causes other options which affect stream construction such as WithStreamType and WithDestinationTable to be ignored.
func WithTraceID ¶
func WithTraceID(traceID string) WriterOption
WithTraceID allows instruments requests to the service with a custom trace prefix. This is generally for diagnostic purposes only.
func WithType ¶
func WithType(st StreamType) WriterOption
WithType sets the stream type for the managed stream.