Documentation ¶
Overview ¶
Package bqstreamer implements a concurrent stream (bulk) inserter to Google BigQuery.
Index ¶
- func NewBigQueryService(c *jwt.Config) (service *bigquery.Service, err error)
- func NewJWTConfig(keyPath string) (c *jwt.Config, err error)
- type AllRowsRejectedError
- type MultiStreamer
- type RowError
- func (err *RowError) BQError() bigquery.ErrorProto
- func (err *RowError) DatasetID() string
- func (err *RowError) Error() string
- func (err *RowError) Index() int64
- func (err *RowError) JsonValue() map[string]bigquery.JsonValue
- func (err *RowError) ProjectID() string
- func (err *RowError) TableID() string
- type Streamer
- type TooManyFailedInsertRetriesError
- func (err *TooManyFailedInsertRetriesError) DatasetID() string
- func (err *TooManyFailedInsertRetriesError) Error() string
- func (err *TooManyFailedInsertRetriesError) NumFailedRetries() int
- func (err *TooManyFailedInsertRetriesError) ProjectID() string
- func (err *TooManyFailedInsertRetriesError) TableID() string
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewBigQueryService ¶
NewBigQueryService returns a new BigQuery service (client), authenticated via OAuth2/JWT.
NOTE: This function authenticates with Google OAuth2 service, thus susceptible to network delays and blocks.
func NewJWTConfig ¶
NewJWTConfig returns a new JWT configuration from a JSON key, acquired via https://console.developers.google.com.
A config is used to authenticate with Google OAuth2.
Types ¶
type AllRowsRejectedError ¶
type AllRowsRejectedError struct {
// contains filtered or unexported fields
}
AllRowsRejectedError is returned when all rows in an insert have been rejected, meaning no insert retry will occur.
func (*AllRowsRejectedError) DatasetID ¶
func (err *AllRowsRejectedError) DatasetID() string
func (*AllRowsRejectedError) Error ¶
func (err *AllRowsRejectedError) Error() string
func (*AllRowsRejectedError) ProjectID ¶
func (err *AllRowsRejectedError) ProjectID() string
func (*AllRowsRejectedError) TableID ¶
func (err *AllRowsRejectedError) TableID() string
type MultiStreamer ¶
type MultiStreamer struct { // Errors are reported to this channel. Errors chan error // contains filtered or unexported fields }
A MultiStreamer operates multiple Streamers, also called workers, or sub-streamers. The MultiStreamer feeds rows to a single rowChannel, and all sub-streamers read from it together. This improves scalability by allowing a higher message throuhput.
TODO Improve by managing a channel of sub-streamers, notifying multi-streamer when they're ready to read rows, and then letting them read one-by-one (the first will read a chunk and go streaming, then the next one will read and stream, etc.) Right now they're all reading together simultaniously, reacing for messages. See here: http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html
Example ¶
This example uses a MultiStreamer. A single row is queued, and will be flushed once a time threshold has passed, or if the MultiStreamer is explicitly closed.
Starting a MultiStreamer is a non-blocking operation (unlike Streamer), so there's no need to run it in its own goroutine.
You should probably use it instead of Streamer, as it provides better concurrency and speed.
// Init OAuth2/JWT. This is required for authenticating with BigQuery. // https://cloud.google.com/bigquery/authorization // https://developers.google.com/console/help/new/#generatingoauth2 jwtConfig, err := NewJWTConfig("path_to_key.json") if err != nil { log.Fatalln(err) } // Set MultiStreamer configuration. numStreamers := 10 // Number of concurrent sub-streamers (workers) to use. maxRows := 500 // Amount of rows queued before forcing insert to BigQuery. maxDelay := 1 * time.Second // Time to pass between forcing insert to BigQuery. sleepBeforeRetry := 1 * time.Second // Time to wait between failed insert retries. maxRetryInsert := 10 // Maximum amount of failed insert retries before discarding rows and moving on. // Init a new multi-streamer. ms, err := NewMultiStreamer( jwtConfig, numStreamers, maxRows, maxDelay, sleepBeforeRetry, maxRetryInsert) // Start multi-streamer and workers. ms.Start() defer ms.Stop() // Worker errors are reported to MultiStreamer.Errors channel. // This inits a goroutine the reads from this channel and logs errors. // // It can be closed by sending "true" to the shutdownErrorChan channel. shutdownErrorChan := make(chan bool) go func() { var err error readErrors := true for readErrors { select { case <-shutdownErrorChan: readErrors = false case err = <-ms.Errors: log.Println(err) } } }() defer func() { shutdownErrorChan <- true }() // Queue a single row. // Insert will happen once maxDelay time has passed, // or maxRows rows have been queued. ms.QueueRow( "project-id", "dataset-id", "table-id", map[string]bigquery.JsonValue{"key": "value"}, )
Output:
func NewMultiStreamer ¶
func NewMultiStreamer( jwtConfig *jwt.Config, numStreamers int, maxRows int, maxDelay time.Duration, sleepBeforeRetry time.Duration, maxRetryInsert int) (*MultiStreamer, error)
NewMultiStreamer returns a new MultiStreamer.
func (*MultiStreamer) QueueRow ¶
func (b *MultiStreamer) QueueRow(projectID, datasetID, tableID string, jsonRow map[string]bigquery.JsonValue)
QueueRow queues a single row, which will be read and inserted by one of the sub-streamers.
func (*MultiStreamer) Start ¶
func (b *MultiStreamer) Start()
Start starts the sub-streamers, making them read from a common row channel, and output to the same error channel.
func (*MultiStreamer) Stop ¶
func (b *MultiStreamer) Stop()
Stop stops all sub-streamers. Note all sub-streamers will flush to BigQuery before stopping.
type RowError ¶
type RowError struct {
// contains filtered or unexported fields
}
RowError is a specific row insert error, returned after inserting multiple rows.
func (*RowError) BQError ¶
func (err *RowError) BQError() bigquery.ErrorProto
type Streamer ¶
type Streamer struct { // Max delay between flushes to BigQuery. MaxDelay time.Duration `validate:"min=1"` // Sleep delay after a rejected insert and before retry. SleepBeforeRetry time.Duration `validate:"min=1"` // Maximum retry insert attempts for non-rejected row insert errors. // e.g. GoogleAPI HTTP errors, generic HTTP errors, etc. MaxRetryInsert int `validate:"min=0"` // Errors are reported to this channel. Errors chan error // Start read-queue-stream loop function. Start func() // Stop read-queue-stream loop function. Stop func() // contains filtered or unexported fields }
A Streamer is a BigQuery stream inserter, queuing rows and stream inserts to BigQuery in bulk by calling InsertAll().
Example ¶
This example uses a single Streamer. A single row is queued, and will be flushed once a time threshold has passed, or if the streamer is explicitly closed.
Note starting a Streamer is a blocking operation, so it needs to run in its own goroutine.
You should probably use MultiStreamer, as it provides better concurrency and speed, but Streamer is there if you need to.
// Init OAuth2/JWT. This is required for authenticating with BigQuery. // See the following URLs for more info: // https://cloud.google.com/bigquery/authorization // https://developers.google.com/console/help/new/#generatingoauth2 jwtConfig, err := NewJWTConfig("path_to_key.json") if err != nil { log.Fatalln(err) } // Init a BigQuery client. service, err := NewBigQueryService(jwtConfig) if err != nil { log.Fatalln(err) } // Set streamer configuration. maxRows := 500 // Amount of rows queued before forcing insert to BigQuery. maxDelay := 1 * time.Second // Time to pass between forcing insert to BigQuery. sleepBeforeRetry := 1 * time.Second // Time to wait between failed insert retries. maxRetryInsert := 10 // Maximum amount of failed insert retries before discarding rows and moving on. // Init a new streamer. s, err := NewStreamer(service, maxRows, maxDelay, sleepBeforeRetry, maxRetryInsert) if err != nil { log.Fatalln(err) } // Start multi-streamer and workers. // A Streamer (NOT a MultiStreamer) is blocking, // so it needs to be start in its own goroutine. go s.Start() defer s.Stop() // Queue a single row. // Insert will happen once maxDelay time has passed, // or maxRows rows have been queued. s.QueueRow( "project-id", "dataset-id", "table-id", map[string]bigquery.JsonValue{"key": "value"}, )
Output:
type TooManyFailedInsertRetriesError ¶
type TooManyFailedInsertRetriesError struct {
// contains filtered or unexported fields
}
TooManyFailedInsertRetriesError is returned when an insert failed several time, and the streamer stops retrying it.
func (*TooManyFailedInsertRetriesError) DatasetID ¶
func (err *TooManyFailedInsertRetriesError) DatasetID() string
func (*TooManyFailedInsertRetriesError) Error ¶
func (err *TooManyFailedInsertRetriesError) Error() string
func (*TooManyFailedInsertRetriesError) NumFailedRetries ¶
func (err *TooManyFailedInsertRetriesError) NumFailedRetries() int
func (*TooManyFailedInsertRetriesError) ProjectID ¶
func (err *TooManyFailedInsertRetriesError) ProjectID() string
func (*TooManyFailedInsertRetriesError) TableID ¶
func (err *TooManyFailedInsertRetriesError) TableID() string