Documentation
¶
Overview ¶
Package bqwriter provides a compact Streamer API in order to write data concurrently to BigQuery using the insertAll or Storage API.
Index ¶
Constants ¶
const ( // DefaultWorkerCount is used as the default for the WorkerCount property of // StreamerConfig, used in case the property is 0 (e.g. when undefined). DefaultWorkerCount = 2 // DefaultMaxRetries is used as the default for the MaxRetries property of // the StreamerConfig's WriteRetryConfig, used in case the property is 0 (e.g. when undefined). DefaultMaxRetries = 3 // DefaultInitialRetryDelay is used as the default for the InitialRetryDuration property // of the StreamerConfig's WriteRetryConfig, used in case the property is 0 (e.g. when undefined), // and only if the WriteRetryConfig is actually defined. // // Default based on suggestions made in https://cloud.google.com/bigquery/sla. DefaultInitialRetryDelay = time.Second * 1 // DefaultMaxRetryDeadlineOffset is the default max amount of the the streamer will // allow the retry back off logic to retry, as to ensure a goroutine isn't blocked for too long on a faulty write. // Used in case the property is 0 (e.g. when undefined), and only if the WriteRetryConfig is actually defined. // // Default based on suggestions made in https://cloud.google.com/bigquery/sla. DefaultMaxRetryDeadlineOffset = time.Second * 32 // DefaultRetryDelayMultiplier is the default retry delay multipler used by the streamer's // back off algorithm in order to increase the delay in between each sequential write-retry of the // same back off sequence. Used in case the property is < 2, as 2 is also the lowest possible multiplier accepted. DefaultRetryDelayMultiplier = 2 // DefaultBatchSize defines amount of rows of data a worker // will collect prior to writing it to BQ. Used in case the property is 0 (e.g. when undefined). DefaultBatchSize = 200 // DefaultMaxBatchDelay defines the max amount of time a worker batches rows, prior to writing the batched rows, // even when not yet full. Used in case the property is 0 (e.g. when undefined). DefaultMaxBatchDelay = 5 * time.Second // DefaultWorkerQueueSize defines the default size of the job queue per worker used // in order to allow the Streamer's users to write rows even if all workers are currently // too busy to accept new incoming rows. Used in case the property is 0 (e.g. when undefined). DefaultWorkerQueueSize = 100 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InsertAllClientConfig ¶
type InsertAllClientConfig struct { // FailOnInvalidRows causes rows containing invalid data to fail // if there is an attempt to insert an invalid row. // // Defaults to false, making it ignore any invalid rows, silently ignoring these errors. FailOnInvalidRows bool // FailForUnknownValues causes records containing such values // to be treated as invalid records. // // Defaults to false, making it ignore any invalid values, silently ignoring these errors, // and publishing the rows with the unknown values removed from them. FailForUnknownValues bool // BatchSize defines the amount of rows (data) by a worker, prior to a worker // actually writing it to BQ. Should a worker have rows left in its local cache when closing, // it will flush/write these rows prior to closing. // // Defaults to DefaultBatchSize if n == 0, // use a negative value or an explicit value of 1 // in case you want to write each row directly. BatchSize int // MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take // for its initial as well as all retry attempts. No retry should be attempted when already over this limit. // This Offset is to be seen as a maximum, which can be stepped over but not by too much. // // Defaults to DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0. MaxRetryDeadlineOffset time.Duration }
InsertAllClientConfig is used to configure an InsertAll client API driven Streamer Client. All properties have sane defaults as defined and used by this Go package.
type Logger ¶
type Logger interface { // Debug logs the arguments as a single debug message. Debug(args ...interface{}) // Debugf logs a formatted debug message, injecting the arguments into the template string. Debugf(template string, args ...interface{}) // Error logs the arguments as a single error message. Error(args ...interface{}) // Errorf logs a formatted error message, injecting the arguments into the template string. Errorf(template string, args ...interface{}) }
Logger is the interface used by this module in order to support logging. By default the error messages are printed to the STDERR and debug messages are ignored, but you can inject any logger you wish into a Streamer.
NOTE that it is assumed by this module for a Logger implementation to be safe for concurrent use.
type StorageClientConfig ¶
type StorageClientConfig struct { // MaxRetries is the max amount of times that the retry logic will retry a retryable // BQ write error, prior to giving up. Note that non-retryable errors will immediately stop // and that there is also an upper limit of MaxTotalElpasedRetryTime to execute in worst case these max retries. // // Defaults to DefaultMaxRetries if MaxRetries == 0, // or use MaxRetries < 0 if you want to explicitly disable Retrying, // but in that case you might as well not pass in a WriteRetryConfig at all. MaxRetries int // InitialRetryDelay is the initial time the back off algorithm will wait and which will // be used as the base value to be multiplied for any possible sequential retries. // // Defaults to DefaultInitialRetryDelay if InitialRetryDelay == 0. InitialRetryDelay time.Duration // MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take // for its initial as well as all retry attempts. No retry should be attempted when already over this limit. // This Offset is to be seen as a maximum, which can be stepped over but not by too much. // // Defaults to DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0. MaxRetryDeadlineOffset time.Duration // RetryDelayMultiplier is the retry delay multipler used by the retry // back off algorithm in order to increase the delay in between each sequential write-retry of the // same back off sequence. // // Defaults to DefaultRetryDelayMultiplier if RetryDelayMultiplier < 1, as 2 is also the lowest possible multiplier accepted. RetryDelayMultiplier float64 }
StorageClientConfig is used to configure a storage client API driven Streamer Client. All properties have sane defaults as defined and used by this Go package.
A non-nil StorageClientConfig instance has to be passed in to the StorageClient property of a StreamerConfig in order to indicate the Streamer should be build using the Storage API Client under the hood.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer is a simple BQ stream-writer, allowing you write data to a BQ table concurrently.
func NewStreamer ¶
func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error)
NewStreamer creates a new Streamer Client. StreamerConfig is optional, all other parameters are required.
An error is returned in case the Streamer Client couldn't be created for some unexpected reason, most likely something going wrong within the layer of actually interacting with GCloud.
func (*Streamer) Close ¶
func (s *Streamer) Close()
Close closes the streamer and all its worker goroutines.
func (*Streamer) Write ¶
Write a row of data to a BQ table within the streamer's project. The row will be written as soon as all previous rows has been written and a worker goroutine becomes available to write it.
Jobs that failed to write but which are retryable can be retried on the same goroutine in an exponential back-off approach, should the streamer be configured to do so.
type StreamerConfig ¶
type StreamerConfig struct { // WorkerCount defines the amount of workers to be used, // each on their own goroutine and with an opt-out channel buffer per routine. // Use a negative value in order to just want a single worker (same as defining it as 1 explicitly). // // Defaults to DefaultWorkerCount if not defined explicitly. WorkerCount int // WorkerQueueSize defines the size of the job queue per worker used // in order to allow the Streamer's users to write rows even if all workers are currently // too busy to accept new incoming rows. // // Use a negative value in order to provide no buffer for the workers at all, // not rcommended but a possibility for you to choose non the less. // // Defaults to MaxTotalElapsedRetryTime if not defined explicitly WorkerQueueSize int // MaxBatchDelay defines the max amount of time a worker batches rows, // prior to writing the batched rows, even when not yet full. // // Defaults to DefaultMaxBatchDelay if d == 0. MaxBatchDelay time.Duration // Logger allows you to attach a logger to be used by the streamer, // instead of the default built-in STDERR logging implementation, // with the latter being used as the default in case this logger isn't defined explicitly. Logger Logger // InsertAllClient allows you to overwrite any or all of the defaults used to configure an // InsertAll client API driven Streamer Client. Note that this optional configuration is ignored // all together in case StorageClient is defined as a non-nil value. InsertAllClient *InsertAllClientConfig // StorageClient allows you to create a Storage API driven Streamer Client. // You can do so using `new(StorageClientConfig)` in order to create a StorageClient // with all possible configurations configured using their defaults as defined by this Go package. StorageClient *StorageClientConfig }
StreamerConfig is used to build a Streamer (client). All configurations found in this structure are optional and have sane defaults defined for them. All required parameters are to be passed in as separate arguments to the NewStreamer constructor method.
type WriteRetryConfig ¶
type WriteRetryConfig struct { // MaxRetries is the max amount of times that the retry logic will retry a retryable // BQ write error, prior to giving up. Note that non-retryable errors will immediately stop // and that there is also an upper limit of MaxTotalElpasedRetryTime to execute in worst case these max retries. // // Defaults to DefaultMaxRetries if MaxRetries == 0, // or use MaxRetries < 0 if you want to explicitly disable Retrying, // but in that case you might as well not pass in a WriteRetryConfig at all. MaxRetries int // InitialRetryDelay is the initial time the back off algorithm will wait and which will // be used as the base value to be multiplied for any possible sequential retries. // // Defaults to DefaultInitialRetryDelay if InitialRetryDelay == 0. InitialRetryDelay time.Duration // MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take // for its initial as well as all retry attempts. No retry should be attempted when already over this limit. // This Offset is to be seen as a maximum, which can be stepped over but not by too much. // // Defaults to DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0. MaxRetryDeadlineOffset time.Duration // RetryDelayMultiplier is the retry delay multipler used by the retry // back off algorithm in order to increase the delay in between each sequential write-retry of the // same back off sequence. // // Defaults to DefaultRetryDelayMultiplier if RetryDelayMultiplier < 1, as 2 is also the lowest possible multiplier accepted. RetryDelayMultiplier float64 }
WriteRetryConfig is an optional configuration that can be used in order to ensure retry-able write errors are automatically retried using a built-in back off algorithm.