Documentation ¶
Overview ¶
Package bqstreamer implements synchronous and asynchronous stream-inserters for Google BigQuery.
https://cloud.google.com/bigquery/
Stream-insert is performed using InsertAll(): https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll
This packages provides two inserter types that can be used to insert rows and tables:
1. SyncWorker
- A single blocking (synchronous) worker.
- Enqueues rows and performs insert operations in a blocking manner.
2. AsyncWorkerGroup
- Wraps multiple SyncWorkers.
- Enqueues rows and performs insert operations in the background.
- Background workers execute insert operations according to amount of enqueued rows or time thresholds.
- Errors are reported to an error channel for processing by the user.
- This provides a higher insert throughput for larger scale scenarios.
Index ¶
- Constants
- func NewJWTConfig(keyPath string) (c *jwt.Config, err error)
- type AsyncOptionFunc
- func SetAsyncErrorChannel(errChan chan *InsertErrors) AsyncOptionFunc
- func SetAsyncIgnoreUnknownValues(ignore bool) AsyncOptionFunc
- func SetAsyncMaxDelay(delay time.Duration) AsyncOptionFunc
- func SetAsyncMaxRetries(retries int) AsyncOptionFunc
- func SetAsyncMaxRows(rowLen int) AsyncOptionFunc
- func SetAsyncNumWorkers(workers int) AsyncOptionFunc
- func SetAsyncRetryInterval(sleep time.Duration) AsyncOptionFunc
- func SetAsyncSkipInvalidRows(skip bool) AsyncOptionFunc
- type AsyncWorkerGroup
- type InsertErrors
- type Row
- type RowErrors
- type SyncOptionFunc
- type SyncWorker
- type TableInsertAttemptErrors
- type TableInsertErrors
- type TooManyFailedInsertRetriesError
Examples ¶
Constants ¶
const ( DefaultAsyncNumWorkerss = 10 DefaultAsyncMaxRows = 500 DefaultAsyncMaxDelay = 5 * time.Second )
const ( // BigQuery has a quota policy regarding how big and often inserts should // be. See the following article for more info: // // https://cloud.google.com/bigquery/quota-policy#streaminginserts DefaultSyncMaxRetries = 3 DefaultSyncRetryInterval = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func NewJWTConfig ¶
NewJWTConfig returns a new JWT configuration from a JSON key, acquired via https://console.developers.google.com.
It returns a jwt.Config, used to authenticate with Google OAuth2.
Types ¶
type AsyncOptionFunc ¶
type AsyncOptionFunc func(*AsyncWorkerGroup) error
func SetAsyncErrorChannel ¶
func SetAsyncErrorChannel(errChan chan *InsertErrors) AsyncOptionFunc
SetAsyncErrorChannel sets the asynchronous workers' error channel.
Use this option when you want all workers to report errors to a unified channel.
NOTE the error channel is not closed when the AsyncWorkerGroup closes. It is the responsibilty of the user to close it.
func SetAsyncIgnoreUnknownValues ¶
func SetAsyncIgnoreUnknownValues(ignore bool) AsyncOptionFunc
SetAsyncIgnoreUnknownValues sets whether to accept rows that contain values that do not match the table schema. The unknown values are ignored.
Default is false, which treats unknown values as errors.
func SetAsyncMaxDelay ¶
func SetAsyncMaxDelay(delay time.Duration) AsyncOptionFunc
SetAsyncMaxDelay sets the maximum time delay a worker should wait before an insert operation is executed.
NOTE value must be a positive time.Duration.
func SetAsyncMaxRetries ¶
func SetAsyncMaxRetries(retries int) AsyncOptionFunc
SetAsyncMaxRetries sets the maximum amount of retries a failed insert operation can be retried, before dropping the rows and giving up on the insert operation entirely.
NOTE value must be a non-negative int.
func SetAsyncMaxRows ¶
func SetAsyncMaxRows(rowLen int) AsyncOptionFunc
SetAsyncMaxRows sets the maximum amount of rows a worker can enqueue before an insert operation is executed.
NOTE this threshold is not per-table, but the entire amount of rows overall enqueued by a single worker.
NOTE value must be a non-negative int.
func SetAsyncNumWorkers ¶
func SetAsyncNumWorkers(workers int) AsyncOptionFunc
SetAsyncNumWorkers sets the amount of background workers.
NOTE value must be a positive int.
func SetAsyncRetryInterval ¶
func SetAsyncRetryInterval(sleep time.Duration) AsyncOptionFunc
SetAsyncRetryInterval sets the time delay before retrying a failed insert operation (if required).
NOTE value must be a positive time.Duration.
func SetAsyncSkipInvalidRows ¶
func SetAsyncSkipInvalidRows(skip bool) AsyncOptionFunc
SetAsyncSkipInvalidRows sets whether to insert all valid rows of a request, even if invalid rows exist.
The default value is false, which causes the entire request to fail if any invalid rows exist.
type AsyncWorkerGroup ¶
type AsyncWorkerGroup struct {
// contains filtered or unexported fields
}
AsyncWorkerGroup asynchronously streams rows to BigQuery in bulk.
Example ¶
This example initializes an AsyncWorkerGroup, sets up an error handling goroutine, and enqueues a single row.
An insert operation to BigQuery will be executed once either a maximum delay time has passed, maximum rows have been enqueued, or the AsyncWorkerGroup has been closed.
// Init a new AsyncWorkerGroup: // Initialize an error channel, // into which all AsyncWorkers will report their errors. // // NOTE this channel must be read from, otherwise the workers will block and hang. errChan := make(chan *InsertErrors) // Define a function for processing insert results. // This function only logs insert errors. done := make(chan struct{}) defer close(done) // Error handling goroutine, // which just fetches errors and throws them away. go func() { for range errChan { select { case <-done: // Read all remaining errors (if any are left) // and return. for range errChan { } return case <-errChan: } } }() jwtConfig, err := NewJWTConfig("path_to_key.json") if err != nil { log.Fatalln(err) } // Initialize a worker group. g, err := NewAsyncWorkerGroup( jwtConfig, SetAsyncNumWorkers(10), // Number of background workers in the group. SetAsyncMaxRows(500), // Amount of rows that must be enqueued before executing an insert operation to BigQuery. SetAsyncMaxDelay(1*time.Second), // Time to wait between inserts. SetAsyncRetryInterval(1*time.Second), // Time to wait between failed insert retries. SetAsyncMaxRetries(10), // Maximum amount of retries a failed insert is allowed to be retried. SetAsyncIgnoreUnknownValues(true), // Ignore unknown fields when inserting rows. SetAsyncSkipInvalidRows(true), // Skip bad rows when inserting. SetAsyncErrorChannel(errChan), // Set unified error channel. ) if err != nil { log.Fatalln(err) } // Start AsyncWorkerGroup. // Start() starts the background workers and returns immediately. g.Start() // Close() blocks until all workers have inserted any remaining rows to // BigQuery and closed. defer g.Close() // Enqueue a single row. // // An insert operation will be executed once the time delay defined by // SetAsyncMaxDelay is reached, // or enough rows have been queued (not shown in this example). g.Enqueue( NewRow( "my-project", "my-dataset", "my-table", map[string]bigquery.JsonValue{"key": "value"}, ))
Output:
func NewAsyncWorkerGroup ¶
func NewAsyncWorkerGroup(jwtConfig *jwt.Config, options ...AsyncOptionFunc) (*AsyncWorkerGroup, error)
New returns a new AsyncWorkerGroup using given OAuth2/JWT configuration.
func (*AsyncWorkerGroup) Close ¶
func (s *AsyncWorkerGroup) Close()
Close inserts any remaining rows enqueue by all workers, then closes them.
NOTE that the AsyncWorkerGroup cannot be restarted. If you wish to perform any additional inserts to BigQuery, a new one must be initialized.
func (*AsyncWorkerGroup) Enqueue ¶
func (s *AsyncWorkerGroup) Enqueue(row Row)
func (*AsyncWorkerGroup) Start ¶
func (s *AsyncWorkerGroup) Start()
Start starts all background workers.
Workers read enqueued rows, and insert them to BigQuery until one of the following happens:
- Enough time has passed according to configuration.
- Amount of rows has been enqueued by a worker, also configurable.
Insert errors will be reported to the error channel if set.
type InsertErrors ¶
type InsertErrors struct {
Tables []*TableInsertErrors
}
InsertErrors is returned from an insert attempt. It provides functions to iterate over errors relating to an insert operation.
BigQuery has a complex error hierarchy:
Insert --> Tables --> Table Insert Attempt --> Rows --> Row Error
During an insert operation, multiple tables are inserted in bulk into BigQuery using a separate request for each table. Each table-specific insert operation is comprised from multiple insert attempts (requests). Every table insert can be retried until an attempt is successful or too many attempts have failed. Failures can occur for various reasons e.g. server errors, malformed payload, etc.
This interface allows to iterate over all tables, attempts, rows, and row errors associated with an insert operation.
Example ¶
This example demonstrates how to handle insert errors, returned from an insert operation.
jwtConfig, err := NewJWTConfig("path_to_key.json") if err != nil { log.Fatalln(err) } w, err := NewSyncWorker(jwtConfig.Client(oauth2.NoContext)) if err != nil { log.Fatalln(err) } // Enqueue rows for multiple tables. w.Enqueue(NewRow("my-project", "my-dataset", "my-table-1", map[string]bigquery.JsonValue{"key-1": "value-1"})) w.Enqueue(NewRow("my-project", "my-dataset", "my-table-2", map[string]bigquery.JsonValue{"key-2": "value-2"})) // Perform an insert operation insertErrs := w.Insert() // Go over all tables' insert attempts, and log all errors. for _, table := range insertErrs.All() { for _, attempt := range table.Attempts() { // Log insert attempt error. if err := attempt.Error(); err != nil { log.Printf("%s.%s.%s: %s\n", attempt.Project, attempt.Dataset, attempt.Table, err) } // Iterate over all rows in attempt. for _, row := range attempt.All() { // Iterate over all errors in row and log. for _, err := range row.All() { log.Printf("%s.%s.%s[%s]: %s\n", attempt.Project, attempt.Dataset, attempt.Table, row.InsertID, err) } } } }
Output:
func (*InsertErrors) All ¶
func (insert *InsertErrors) All() []*TableInsertErrors
All returns all remaining tables (those that have not been iterated over using Next()).
Calling Next() or All() again afterwards will yield a failed (empty) result.
func (*InsertErrors) Next ¶
func (insert *InsertErrors) Next() (*TableInsertErrors, bool)
Next iterates over all inserted tables once, returning a single TableInsertErrors every call. Calling Next() multiple times will consequently return more tables, until all have been returned.
The function returns true if a non-nil value was fetched. Once the iterator has been exhausted, (nil, false) will be returned on every subsequent call.
type Row ¶
type Row struct { ProjectID, DatasetID, TableID string Data map[string]bigquery.JsonValue // Used for deduplication: // https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency InsertID string }
Row associates a single BigQuery table row to a project, dataset and table.
type RowErrors ¶
type RowErrors struct { // A table insert operation can be split into multiple requests // if too many rows have been queued. This means that rows // containing errors cannot be identified by their table index. // Therefore, each row can be identified by its insert ID instead. InsertID string // contains filtered or unexported fields }
RowErrors contains errors relating to a single row. Each row can have multiple errors associated with it.
func (*RowErrors) All ¶
func (row *RowErrors) All() []*bigquery.ErrorProto
All returns all remaining row errors (those that have not been iterated over using Next()).
Calling Next() or All() again afterwards will yield a failed (empty) result.
func (*RowErrors) Next ¶
func (row *RowErrors) Next() (*bigquery.ErrorProto, bool)
Next iterates over all row errors once, returning a single row error every call. Calling Next() multiple times will consequently return more row errors, until all row errors have been returned.
The function returns true if a non-nil value was fetched. Once the iterator has been exhausted, (nil, false) will be returned on every subsequent call.
type SyncOptionFunc ¶
type SyncOptionFunc func(*SyncWorker) error
func SetSyncIgnoreUnknownValues ¶
func SetSyncIgnoreUnknownValues(ignore bool) SyncOptionFunc
SetSyncIgnoreUnknownValues sets whether to accept rows that contain values that do not match the table schema. The unknown values are ignored. Default is false, which treats unknown values as errors.
func SetSyncMaxRetries ¶
func SetSyncMaxRetries(retries int) SyncOptionFunc
SetSyncMaxRetries sets the maximum amount of retries a failed insert operation is allowed to retry, before dropping the rows and giving up on the insert operation entirely.
NOTE value must be a non-negative int.
func SetSyncRetryInterval ¶
func SetSyncRetryInterval(sleep time.Duration) SyncOptionFunc
SetSyncRetryInterval sets the time delay before retrying a failed insert operation (if required).
NOTE value must be a positive time.Duration.
func SetSyncSkipInvalidRows ¶
func SetSyncSkipInvalidRows(skip bool) SyncOptionFunc
SetSyncSkipInvalidRows sets whether to insert all valid rows of a request, even if invalid rows exist. The default value is false, which causes the entire request to fail if any invalid rows exist.
type SyncWorker ¶
type SyncWorker struct {
// contains filtered or unexported fields
}
SyncWorker streams rows to BigQuery in bulk using synchronous calls.
Example ¶
This example initializes a single SyncWorker, enqueues a single row, and execute an insert operation which inserts this row into its associated table in BigQuery.
// Init OAuth2/JWT. This is required for authenticating with BigQuery. // See the following URLs for more info: // https://cloud.google.com/bigquery/authorization // https://developers.google.com/console/help/new/#generatingoauth2 jwtConfig, err := NewJWTConfig("path_to_key.json") if err != nil { log.Fatalln(err) } // Init a new SyncWorker. w, err := NewSyncWorker( jwtConfig.Client(oauth2.NoContext), // http.Client authenticated via OAuth2. SetSyncRetryInterval(1*time.Second), // Time to wait between failed insert retries. SetSyncMaxRetries(10), // Maximum amount of retries a failed insert is allowed to be retried. SetSyncIgnoreUnknownValues(true), // Ignore unknown fields when inserting rows. SetSyncSkipInvalidRows(true), // Skip bad rows when inserting. ) if err != nil { log.Fatalln(err) } // Enqueue a single row. w.Enqueue( NewRow( "my-project", "my-dataset", "my-table", map[string]bigquery.JsonValue{"key": "value"}, )) // Alternatively, you can supply your own unique identifier to the row, // for de-duplication purposes. // // See the following article for more info: // https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency // // w.Enqueue( // NewRowWithID( // "my-project", // "my-dataset", // "my-table", // "my-unique-row-id", // map[string]bigquery.JsonValue{"key": "value"}, // )) // Execute an insert operation. // // NOTE this function returns insert errors, // demonstrated in another example. w.Insert() // Alternatively, you can use InsertWithRetry, // which will retry failed inserts in case of BigQuery server errors. // // insertErrs := w.InsertWithRetry()
Output:
func NewSyncWorker ¶
func NewSyncWorker(client *http.Client, options ...SyncOptionFunc) (*SyncWorker, error)
NewSyncWorker returns a new SyncWorker.
func (*SyncWorker) Enqueue ¶
func (w *SyncWorker) Enqueue(row Row)
Enqueue enqueues rows for insert in bulk.
func (*SyncWorker) Insert ¶
func (w *SyncWorker) Insert() *InsertErrors
Insert executes an insert operation in bulk. It sorts rows by tables, and inserts them using separate insert requests. It also splits rows for the same table if too many rows have been queued, according to BigQuery quota policy.
The insert blocks until a response is returned. The response contains insert and row errors for the inserted tables.
func (*SyncWorker) InsertWithRetry ¶
func (w *SyncWorker) InsertWithRetry() *InsertErrors
InsertWithRetry is similar to Insert(), but retries an insert operation multiple times on BigQuery server errors.
See the following article for more info: https://cloud.google.com/bigquery/troubleshooting-errors
func (*SyncWorker) RowLen ¶
func (w *SyncWorker) RowLen() int
RowLen returns the number of enqueued rows in the worker, which haven't been inserted into BigQuery yet.
type TableInsertAttemptErrors ¶
type TableInsertAttemptErrors struct { // The table name associated with the insert attempt. Table string // The dataset name associated with the insert attempt. Dataset string // The project associated with the insert attempt. Project string // contains filtered or unexported fields }
TableInsertAttemptErrors contains errors relating to a single table insert attempt.
It implements the error interface.
func (*TableInsertAttemptErrors) All ¶
func (table *TableInsertAttemptErrors) All() []*RowErrors
All returns all remaining row errors (those that have not been interated over using Next()).
Calling Next() or All() again afterwards will yield a failed (empty) result.
func (*TableInsertAttemptErrors) Error ¶
func (table *TableInsertAttemptErrors) Error() error
Error returns a non-nil value when the table's insert attempt has failed completely.
NOTE that an attempt can have no error, but still not insert the rows. This can happen for example if the request includes malformed rows with SkipInvalidRows set to false.
func (*TableInsertAttemptErrors) Next ¶
func (table *TableInsertAttemptErrors) Next() (*RowErrors, bool)
Next iterates over the attempt's rows once, returning a single row every call. Calling Next() multiple times will consequently return more rows, until all have been returned.
The function returns true if a non-nil value was fetched. Once the iterator has been exhausted, (nil, false) will be returned on every subsequent call.
type TableInsertErrors ¶
type TableInsertErrors struct {
InsertAttempts []*TableInsertAttemptErrors
}
TableInsertErrors contains errors relating to a specific table from a bulk insert operation.
func (*TableInsertErrors) Attempts ¶
func (table *TableInsertErrors) Attempts() []*TableInsertAttemptErrors
Attempts returns all insert attempts for a single table, in the order they were executed. All but the last attempts in the returned slice have failed. Only the last one might have succeeded, or failed as well, as indicated by Error().
type TooManyFailedInsertRetriesError ¶
type TooManyFailedInsertRetriesError struct { // Number of failed retries. NumFailedRetries int // The table name associated with the insert attempt. Table string // The dataset name associated with the insert attempt. Dataset string // The project associated with the insert attempt. Project string }
TooManyFailedInsertAttemptsError is returned when a specific insert attempt has been retried and failed multiple times, causing the worker to stop retrying and drop that table's insert operation entirely.
It implements the error interface.
func (*TooManyFailedInsertRetriesError) Error ¶
func (err *TooManyFailedInsertRetriesError) Error() string