bqwriter

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2021 License: Apache-2.0 Imports: 10 Imported by: 1

README

bqwriter Go Workflow Status GoDoc Go Report Card license

A Go package to write data into Google BigQuery concurrently with a high throughput. By default the InsertAll() API is used (REST API under the hood), but you can configure to use the Storage Write API (GRPC under the hood) as well.

The InsertAll API is easier to configure and can work pretty much out of the box without any configuration. It is recommended to use the Storage API as it is faster and comes with a lower cost. The latter does however require a bit more configuration on your side, including a Proto schema file as well. See the Storage example below on how to do (TODO).

import "github.com/OTA-Insight/bqwriter"

To install the packages on your system, do not clone the repo. Instead:

  1. Change to your project directory:
cd /path/to/my/project
  1. Get the package using the official Go tooling, which will also add it to your Go.mod file for you:
go get github.com/OTA-Insight/bqwriter

NOTE: This package is under development, and may occasionally make backwards-incompatible changes.

Go Versions Supported

We currently support Go versions 1.13 and newer.

Examples

In this section you'll find some quick examples to help you get started together with the official documentation which you can find at https://pkg.go.dev/github.com/OTA-Insight/bqwriter.

The Streamer client is safe for concurrent use and can be used from as many go routines as you wish. No external locking or other concurrency-safe mechanism is required from your side. To keep these examples as small as possible however they are written in a linear synchronous fashion, but it is encouraged to use the Streamer client from multiple go routines, in order to be able to write rows at a sufficiently high throughput.

Please also note that errors are not handled gracefully in these examples as ot keep them small and narrow in scope.

Basic InsertAll Streamer
import (
    "context"

    "github.com/OTA-Insight/bqwriter"
)

// TODO: use more specific context
ctx := context.Background()

// create a BQ (stream) writer thread-safe client,
bqWriter, err := bqwriter.NewStreamer(
    ctx,
    "my-gcloud-project",
    "my-bq-dataset",
    "my-bq-table",
    nil, // use default config
)
if err != nil {
    // TODO: handle error gracefully
    panic(err)
}
// do not forget to close, to close all background resources opened
// when creating the BQ (stream) writer client
defer bqWriter.Close()

// You can now start writing data to your BQ table
bqWriter.Write(&myRow{Timestamp: time.UTC().Now(), Username: "test"})
// NOTE: only write one row at a time using `(*Streamer).Write`,
// multiple rows can be written using one `Write` call per row.

You build a Streamer client using optionally the StreamerConfig as you can see in the above example. The entire config is optional and has sane defaults, but note that there is a lot you can configure in this config prior to actually building the streamer. Please consult the https://pkg.go.dev/github.com/OTA-Insight/bqwriter#StreamerConfig for more information.

The myRow structure used in this example is one way to pass in the information of a single row to the (*Streamer).Write method. This structure implements the ValueSaver interface. An example of this:

import (
	"cloud.google.com/go/bigquery"
	"cloud.google.com/go/civil"
)

type myRow struct {
	Timestamp time.Time
	Username  string
}

func (mr *myRow) Save() (row map[string]bigquery.Value, insertID string, err error) {
	return map[string]bigquery.Value{
		"timestamp": civil.DateTimeOf(rr.Timestamp),
		"username":  mr.Username,
	}, "", nil
}

You can also pass in a struct directly and the schema will be inferred automatically based on its public items. This flexibility has a runtime cost by having to apply reflection.

A raw struct can also be stored by using the StructSaver interface, in which case you get the benefit of being able to write any kind of struct while at the same time being able to pass in the to be used scheme already such that it doesn't have to be inferred and giving you exact controls for each field on top of that.

If you have the choice however than we do recommend to implement the ValueSaver for your row struct as this gives you the best of both worlds, while at the same time also giving you the easy built-in ability to define a unique insertID per row which will help prevent potential duplicates that can otherwise happen while retrying to write rows which have failed temporarily.

Custom InsertAll Streamer

Using the same myRow structure from previous example, here is how we create a Streamer client with a more custom configuration:

import (
    "context"

    "github.com/OTA-Insight/bqwriter"
)

// TODO: use more specific context
ctx := context.Background()

// create a BQ (stream) writer thread-safe client,
bqWriter, err := bqwriter.NewStreamer(
    ctx,
    "my-gcloud-project",
    "my-bq-dataset",
    "my-bq-table",
    &bqwriter.StreamerConfig{
        // use 5 background worker threads
        WorkerCount: 5,
        // ignore errors for invalid/unknown rows/values,
        // by default these errors make a write fail
        InsertAllClient: &bqwriter.InsertAllClientConfig{
             // Write rows fail for invalid/unknown rows/values errors,
             // rather than ignoring these errors and skipping the faulty rows/values.
             // These errors are logged using the configured logger,
             // and the faulty (batched) rows are dropped silently.
            FailOnInvalidRows:    true,
            FailForUnknownValues: true, 
        },
    },
)
if err != nil {
    // TODO: handle error gracefully
    panic(err)
}
// do not forget to close, to close all background resources opened
// when creating the BQ (stream) writer client
defer bqWriter.Close()

// You can now start writing data to your BQ table
bqWriter.Write(&myRow{Timestamp: time.UTC().Now(), Username: "test"})
// NOTE: only write one row at a time using `(*Streamer).Write`,
// multiple rows can be written using one `Write` call per row.
Storage Streamer

TODO

Authorization

The streamer client will use Google Application Default Credentials for authorization credentials used in calling the API endpoints. This will allow your application to run in many environments without requiring explicit configuration.

Please open an issue should you require more advanced forms of authorization. The issue should come with an example, a clear statement of intention and motivation on why this is a useful contribution to this package. Even if you wish to contribute to this project by implementing this patch yourself, it is none the less best to create an issue prior to it, such that we can all be aligned on the specifics. Good communication is key here.

It was a choice to not support these advanced authorization methods for now. The reasons being that the package authors didn't have a need for it and it allowed to keep the API as simple and small as possible. There however some advanced authorizations still possible:

To conclude. We currently do not support advanced ways for Authorization, but we're open to include support for these, if there is sufficient interest for it.

Contributing

Contributions are welcome. Please, see the CONTRIBUTING document for details.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Contributor Code of Conduct for more information.

Documentation

Overview

Package bqwriter provides a compact Streamer API in order to write data concurrently to BigQuery using the insertAll or Storage API.

Index

Constants

View Source
const (
	// DefaultWorkerCount is used as the default for the WorkerCount property of
	// StreamerConfig, used in case the property is 0 (e.g. when undefined).
	DefaultWorkerCount = 2

	// DefaultMaxRetries is used as the default for the MaxRetries property of
	// the StreamerConfig's WriteRetryConfig, used in case the property is 0 (e.g. when undefined).
	DefaultMaxRetries = 3

	// DefaultInitialRetryDelay is used as the default for the InitialRetryDuration property
	// of the StreamerConfig's WriteRetryConfig, used in case the property is 0 (e.g. when undefined),
	// and only if the WriteRetryConfig is actually defined.
	//
	// Default based on suggestions made in https://cloud.google.com/bigquery/sla.
	DefaultInitialRetryDelay = time.Second * 1

	// DefaultMaxRetryDeadlineOffset is the default max amount of the the streamer will
	// allow the retry back off logic to retry, as to ensure a goroutine isn't blocked for too long on a faulty write.
	// Used in case the property is 0 (e.g. when undefined), and only if the WriteRetryConfig is actually defined.
	//
	// Default based on suggestions made in https://cloud.google.com/bigquery/sla.
	DefaultMaxRetryDeadlineOffset = time.Second * 32

	// DefaultRetryDelayMultiplier is the default retry delay multipler used by the streamer's
	// back off algorithm in order to increase the delay in between each sequential write-retry of the
	// same back off sequence. Used in case the property is < 2, as 2 is also the lowest possible multiplier accepted.
	DefaultRetryDelayMultiplier = 2

	// DefaultBatchSize defines amount of rows of data a worker
	// will collect prior to writing it to BQ. Used in case the property is 0 (e.g. when undefined).
	DefaultBatchSize = 200

	// DefaultMaxBatchDelay defines the max amount of time a worker batches rows, prior to writing the batched rows,
	// even when not yet full. Used in case the property is 0 (e.g. when undefined).
	DefaultMaxBatchDelay = 5 * time.Second

	// DefaultWorkerQueueSize defines the default size of the job queue per worker used
	// in order to allow the Streamer's users to write rows even if all workers are currently
	// too busy to accept new incoming rows. Used in case the property is 0 (e.g. when undefined).
	DefaultWorkerQueueSize = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type InsertAllClientConfig

type InsertAllClientConfig struct {
	// FailOnInvalidRows causes rows containing invalid data to fail
	// if there is an attempt to insert an invalid row.
	//
	// Defaults to false, making it ignore any invalid rows, silently ignoring these errors.
	FailOnInvalidRows bool

	// FailForUnknownValues causes records containing such values
	// to be treated as invalid records.
	//
	// Defaults to false, making it ignore any invalid values, silently ignoring these errors,
	// and publishing the rows with the unknown values removed from them.
	FailForUnknownValues bool

	// BatchSize defines the amount of rows (data) by a worker, prior to a worker
	// actually writing it to BQ. Should a worker have rows left in its local cache when closing,
	// it will flush/write these rows prior to closing.
	//
	// Defaults to DefaultBatchSize if n == 0,
	// use a negative value or an explicit value of 1
	// in case you want to write each row directly.
	BatchSize int

	// MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take
	// for its initial as well as all retry attempts. No retry should be attempted when already over this limit.
	// This Offset is to be seen as a maximum, which can be stepped over but not by too much.
	//
	// Defaults to DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0.
	MaxRetryDeadlineOffset time.Duration
}

InsertAllClientConfig is used to configure an InsertAll client API driven Streamer Client. All properties have sane defaults as defined and used by this Go package.

type Logger

type Logger interface {
	// Debug logs the arguments as a single debug message.
	Debug(args ...interface{})
	// Debugf logs a formatted debug message, injecting the arguments into the template string.
	Debugf(template string, args ...interface{})
	// Error logs the arguments as a single error message.
	Error(args ...interface{})
	// Errorf logs a formatted error message, injecting the arguments into the template string.
	Errorf(template string, args ...interface{})
}

Logger is the interface used by this module in order to support logging. By default the error messages are printed to the STDERR and debug messages are ignored, but you can inject any logger you wish into a Streamer.

NOTE that it is assumed by this module for a Logger implementation to be safe for concurrent use.

type StorageClientConfig

type StorageClientConfig struct {
	// MaxRetries is the max amount of times that the retry logic will retry a retryable
	// BQ write error, prior to giving up. Note that non-retryable errors will immediately stop
	// and that there is also an upper limit of MaxTotalElpasedRetryTime to execute in worst case these max retries.
	//
	// Defaults to DefaultMaxRetries if MaxRetries == 0,
	// or use MaxRetries < 0 if you want to explicitly disable Retrying,
	// but in that case you might as well not pass in a WriteRetryConfig at all.
	MaxRetries int

	// InitialRetryDelay is the initial time the back off algorithm will wait and which will
	// be used as the base value to be multiplied for any possible sequential retries.
	//
	// Defaults to DefaultInitialRetryDelay if InitialRetryDelay == 0.
	InitialRetryDelay time.Duration

	// MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take
	// for its initial as well as all retry attempts. No retry should be attempted when already over this limit.
	// This Offset is to be seen as a maximum, which can be stepped over but not by too much.
	//
	// Defaults to DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0.
	MaxRetryDeadlineOffset time.Duration

	// RetryDelayMultiplier is the retry delay multipler used by the retry
	// back off algorithm in order to increase the delay in between each sequential write-retry of the
	// same back off sequence.
	//
	// Defaults to DefaultRetryDelayMultiplier if RetryDelayMultiplier < 1, as 2 is also the lowest possible multiplier accepted.
	RetryDelayMultiplier float64
}

StorageClientConfig is used to configure a storage client API driven Streamer Client. All properties have sane defaults as defined and used by this Go package.

A non-nil StorageClientConfig instance has to be passed in to the StorageClient property of a StreamerConfig in order to indicate the Streamer should be build using the Storage API Client under the hood.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer is a simple BQ stream-writer, allowing you write data to a BQ table concurrently.

func NewStreamer

func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error)

NewStreamer creates a new Streamer Client. StreamerConfig is optional, all other parameters are required.

An error is returned in case the Streamer Client couldn't be created for some unexpected reason, most likely something going wrong within the layer of actually interacting with GCloud.

func (*Streamer) Close

func (s *Streamer) Close()

Close closes the streamer and all its worker goroutines.

func (*Streamer) Write

func (s *Streamer) Write(data interface{}) error

Write a row of data to a BQ table within the streamer's project. The row will be written as soon as all previous rows has been written and a worker goroutine becomes available to write it.

Jobs that failed to write but which are retryable can be retried on the same goroutine in an exponential back-off approach, should the streamer be configured to do so.

type StreamerConfig

type StreamerConfig struct {
	// WorkerCount defines the amount of workers to be used,
	// each on their own goroutine and with an opt-out channel buffer per routine.
	// Use a negative value in order to just want a single worker (same as defining it as 1 explicitly).
	//
	// Defaults to DefaultWorkerCount if not defined explicitly.
	WorkerCount int

	// WorkerQueueSize defines the size of the job queue per worker used
	// in order to allow the Streamer's users to write rows even if all workers are currently
	// too busy to accept new incoming rows.
	//
	// Use a negative value in order to provide no buffer for the workers at all,
	// not rcommended but a possibility for you to choose non the less.
	//
	// Defaults to MaxTotalElapsedRetryTime if not defined explicitly
	WorkerQueueSize int

	// MaxBatchDelay defines the max amount of time a worker batches rows,
	// prior to writing the batched rows, even when not yet full.
	//
	// Defaults to DefaultMaxBatchDelay if d == 0.
	MaxBatchDelay time.Duration

	// Logger allows you to attach a logger to be used by the streamer,
	// instead of the default built-in STDERR logging implementation,
	// with the latter being used as the default in case this logger isn't defined explicitly.
	Logger Logger

	// InsertAllClient allows you to overwrite any or all of the defaults used to configure an
	// InsertAll client API driven Streamer Client. Note that this optional configuration is ignored
	// all together in case StorageClient is defined as a non-nil value.
	InsertAllClient *InsertAllClientConfig

	// StorageClient allows you to create a Storage API driven Streamer Client.
	// You can do so using `new(StorageClientConfig)` in order to create a StorageClient
	// with all possible configurations configured using their defaults as defined by this Go package.
	StorageClient *StorageClientConfig
}

StreamerConfig is used to build a Streamer (client). All configurations found in this structure are optional and have sane defaults defined for them. All required parameters are to be passed in as separate arguments to the NewStreamer constructor method.

type WriteRetryConfig

type WriteRetryConfig struct {
	// MaxRetries is the max amount of times that the retry logic will retry a retryable
	// BQ write error, prior to giving up. Note that non-retryable errors will immediately stop
	// and that there is also an upper limit of MaxTotalElpasedRetryTime to execute in worst case these max retries.
	//
	// Defaults to DefaultMaxRetries if MaxRetries == 0,
	// or use MaxRetries < 0 if you want to explicitly disable Retrying,
	// but in that case you might as well not pass in a WriteRetryConfig at all.
	MaxRetries int

	// InitialRetryDelay is the initial time the back off algorithm will wait and which will
	// be used as the base value to be multiplied for any possible sequential retries.
	//
	// Defaults to DefaultInitialRetryDelay if InitialRetryDelay == 0.
	InitialRetryDelay time.Duration

	// MaxRetryDeadlineOffset is the max amount of time the back off algorithm is allowed to take
	// for its initial as well as all retry attempts. No retry should be attempted when already over this limit.
	// This Offset is to be seen as a maximum, which can be stepped over but not by too much.
	//
	// Defaults to DefaultMaxRetryDeadlineOffset if MaxRetryDeadlineOffset == 0.
	MaxRetryDeadlineOffset time.Duration

	// RetryDelayMultiplier is the retry delay multipler used by the retry
	// back off algorithm in order to increase the delay in between each sequential write-retry of the
	// same back off sequence.
	//
	// Defaults to DefaultRetryDelayMultiplier if RetryDelayMultiplier < 1, as 2 is also the lowest possible multiplier accepted.
	RetryDelayMultiplier float64
}

WriteRetryConfig is an optional configuration that can be used in order to ensure retry-able write errors are automatically retried using a built-in back off algorithm.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL