Documentation ¶
Overview ¶
Package managedwriter provides an EXPERIMENTAL thick client around the BigQuery storage API's BigQueryWriteClient. More information about this new write client may also be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api
It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha state, and breaking changes are frequent.
Currently, this client targets the BigQueryWriteClient present in the v1beta2 endpoint, and is intended as a more feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST methods.
Appending data is accomplished through the use of streams. There are four stream types, each targetting slightly different use cases and needs. See the StreamType documentation for more details about each stream type.
This API uses serialized protocol buffer messages for appending data to streams. For users who don't have predefined protocol buffer messages for sending data, the cloud.google.com/go/bigquery/storage/managedwriter/adapt subpackage includes functionality for defining protocol buffer messages dynamically using table schema information, which enables users to do things like using protojson to convert json text into a protocol buffer.
Index ¶
- Constants
- Variables
- func TableParentFromStreamName(streamName string) string
- type AppendResult
- type Client
- type ManagedStream
- func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error)
- func (ms *ManagedStream) Close() error
- func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error)
- func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error)
- func (ms *ManagedStream) StreamName() string
- func (ms *ManagedStream) StreamType() StreamType
- type StreamType
- type WriterOption
- func WithDestinationTable(destTable string) WriterOption
- func WithMaxInflightBytes(n int) WriterOption
- func WithMaxInflightRequests(n int) WriterOption
- func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption
- func WithStreamName(name string) WriterOption
- func WithTraceID(traceID string) WriterOption
- func WithType(st StreamType) WriterOption
Constants ¶
const NoStreamOffset int64 = -1
NoStreamOffset is a sentinel value for signalling we're not tracking stream offset (e.g. a default stream which allows simultaneous append streams).
Variables ¶
var ( // AppendRequests is a measure of the number of append requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless) // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) // AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless) // AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless) )
var ( // AppendRequestsView is a cumulative sum of AppendRequests. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestsView *view.View // AppendResponsesView is a cumulative sum of AppendResponses. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponsesView *view.View // FlushRequestsView is a cumulative sum of FlushRequests. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequestsView *view.View // AppendClientOpenView is a cumulative sum of AppendClientOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenView *view.View // AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. AppendClientOpenRetryView *view.View )
Functions ¶
func TableParentFromStreamName ¶
TableParentFromStreamName is a utility function for extracting the parent table prefix from a stream name. When an invalid stream ID is passed, this simply returns the original stream name.
Types ¶
type AppendResult ¶
type AppendResult struct {
// contains filtered or unexported fields
}
AppendResult tracks the status of a single row of data.
func (*AppendResult) GetResult ¶
func (ar *AppendResult) GetResult(ctx context.Context) (int64, error)
GetResult returns the optional offset of this row, or the associated error. It blocks until the result is ready.
func (*AppendResult) Ready ¶
func (ar *AppendResult) Ready() <-chan struct{}
Ready blocks until the append request has reached a completed state, which may be a successful append or an error.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a managed BigQuery Storage write client scoped to a single project.
func NewClient ¶
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)
NewClient instantiates a new client.
func (*Client) BatchCommit ¶
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error)
BatchCommit is used to commit one or more PendingStream streams belonging to the same table as a single transaction. Streams must be finalized before committing.
Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility function TableParentFromStreamName can be used to derive this from a Stream's name.
If the returned response contains stream errors, this indicates that the batch commit failed and no data was committed.
TODO: currently returns the raw response. Determine how we want to surface StreamErrors.
func (*Client) NewManagedStream ¶
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)
NewManagedStream establishes a new managed stream for appending data into a table.
Context here is retained for use by the underlying streaming connections the managed stream may create.
type ManagedStream ¶
type ManagedStream struct {
// contains filtered or unexported fields
}
ManagedStream is the abstraction over a single write stream.
func (*ManagedStream) AppendRows ¶
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error)
AppendRows sends the append requests to the service, and returns one AppendResult per row. The format of the row data is binary serialized protocol buffer bytes, and and the message must adhere to the format of the schema Descriptor passed in when creating the managed stream.
func (*ManagedStream) Finalize ¶
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error)
Finalize is used to mark a stream as complete, and thus ensure no further data can be appended to the stream. You cannot finalize a DefaultStream, as it always exists.
Finalizing does not advance the current offset of a BufferedStream, nor does it commit data in a PendingStream.
func (*ManagedStream) FlushRows ¶
FlushRows advances the offset at which rows in a BufferedStream are visible. Calling this method for other stream types yields an error.
func (*ManagedStream) StreamName ¶
func (ms *ManagedStream) StreamName() string
StreamName returns the corresponding write stream ID being managed by this writer.
func (*ManagedStream) StreamType ¶
func (ms *ManagedStream) StreamType() StreamType
StreamType returns the configured type for this stream.
type StreamType ¶
type StreamType string
StreamType indicates the type of stream this write client is managing.
var ( // DefaultStream most closely mimics the legacy bigquery // tabledata.insertAll semantics. Successful inserts are // committed immediately, and there's no tracking offsets as // all writes go into a "default" stream that always exists // for a table. DefaultStream StreamType = "DEFAULT" // CommittedStream appends data immediately, but creates a // discrete stream for the work so that offset tracking can // be used to track writes. CommittedStream StreamType = "COMMITTED" // BufferedStream is a form of checkpointed stream, that allows // you to advance the offset of visible rows via Flush operations. // // NOTE: Buffered Streams are currently in limited preview, and as such // methods like FlushRows() may yield errors for non-enrolled projects. BufferedStream StreamType = "BUFFERED" // PendingStream is a stream in which no data is made visible to // readers until the stream is finalized and committed explicitly. PendingStream StreamType = "PENDING" )
type WriterOption ¶
type WriterOption func(*ManagedStream)
WriterOption are variadic options used to configure a ManagedStream instance.
func WithDestinationTable ¶
func WithDestinationTable(destTable string) WriterOption
WithDestinationTable specifies the destination table to which a created stream will append rows. Format of the table:
projects/{projectid}/datasets/{dataset}/tables/{table}
func WithMaxInflightBytes ¶
func WithMaxInflightBytes(n int) WriterOption
WithMaxInflightBytes bounds the inflight append request bytes on the write connection.
func WithMaxInflightRequests ¶
func WithMaxInflightRequests(n int) WriterOption
WithMaxInflightRequests bounds the inflight appends on the write connection.
func WithSchemaDescriptor ¶
func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption
WithSchemaDescriptor describes the format of the serialized data being sent by AppendRows calls on the stream.
func WithStreamName ¶
func WithStreamName(name string) WriterOption
WithStreamName allows users to set the stream name this writer will append to explicitly. By default, the managed client will create the stream when instantiated if necessary.
Note: Supplying this option causes other options which affect stream construction such as WithStreamType and WithDestinationTable to be ignored.
func WithTraceID ¶
func WithTraceID(traceID string) WriterOption
WithTraceID allows instruments requests to the service with a custom trace prefix. This is generally for diagnostic purposes only.
func WithType ¶
func WithType(st StreamType) WriterOption
WithType sets the stream type for the managed stream.