batbq

package module
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: Apache-2.0 Imports: 10 Imported by: 0

README

GoDoc Go Report Card cover-badge

Batched BigQuery Inserter

Go-batching Logo

Batbq package implements batching of messages for the bigquery.Inserter and provides the following features:

  1. batching of messages from a channel into a slice to be sent to BigQuery,
  2. time-based flushing of partially filled batches,
  3. row-wise handling of insert errors,
  4. confirmation of messages at the sender (Ack/Nack),
  5. comprehensive pipeline metrics for one or more batchers,
  6. basic autoscaling to create batches in parallel from an input channel.

Usage

package main

import (
	"context"
	"flag"
	"log"
	"os"
	"time"

	"cloud.google.com/go/bigquery"
	"github.com/ubntc/go/batching/batbq"

	custom "github.com/ubntc/go/batching/batbq/_examples/simple/dummy"
)

var schema bigquery.Schema

func init() { schema, _ = bigquery.InferSchema(custom.Message{}) }

// Msg wraps the received data and implements batbq.Message.
type Msg struct {
	m *custom.Message // custom type providing data values and confirmation handlers
}

// Ack acknowledges the original message.
func (msg *Msg) Ack() { msg.m.ConfirmMessage() }

// Nack handles insert errors.
func (msg *Msg) Nack(err error) {
	if err != nil {
		log.Print(err)
	}
}

// Data returns the message as bigquery.StructSaver.
func (msg *Msg) Data() bigquery.ValueSaver {
	return &bigquery.StructSaver{InsertID: msg.m.ID, Struct: msg.m, Schema: schema}
}

func main() {
	source := custom.NewSource("src_name") // custom data source

	ctx := context.Background()
	client, _ := bigquery.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
	output := client.Dataset("tmp").Table("batbq_test").Inserter()

	cfg := batbq.Config{
		Capacity:      1000,
		FlushInterval: time.Second,
	}

	input := make(chan batbq.Message, cfg.Capacity)
	batcher := batbq.NewInsertBatcher("batbq_test", cfg)

	go func() {
		source.Receive(ctx, func(m *custom.Message) { input <- &Msg{m} })
		close(input)
	}()

	batcher.Process(ctx, input, output)
}

Also see the PubSub to BigQuery example.

Batcher Design

The package provides an InsertBatcher that requires an input <-chan batbq.Message to collect individual messages from a streaming data source as shown in the examples. The InsertBatcher also requires a Putter that implements Put(context.Context, interface{}) as provided by the regular bigquery.Inserter. Messages that are Put into the bigquery.Inserter need to implement the bigquery.ValueSaver.

Batbq defines a batbq.Message interface to handle the following requirements.

  • Provide bigquery.ValueSaver data.
  • Handle insert errors.
  • Implement message confirmation using Ack and Nack.
// Message defines an (n)ackable message that contains the data for BigQuery.
type Message interface {
	Data() bigquery.ValueSaver // Data returns a ValueSaver for the bigquery.Inserter
	Ack()                      // Ack confirms successful processing of the message at the sender.
	Nack(err error)            // Nack reports unsuccessful processing and errors to the sender.
}

Setting up a batch pipeline requires the following steps.

  1. Create a custom message type that implements batbq.Message providing Ack(), Nack(error), and Data() bigquery.ValueSaver.

  2. Create a Putter to receive the batches from the InsertBatcher.

  3. Create a chan batbq.Message to pass data to the InsertBatcher.

  4. In a goroutine, receive and wrap messages from a data source and send them to the channel as batbq.Message.

  5. Start the batcher using its Process(context.Context, <-chan batbq.Message, Putter) method.

For instance, if your data source is PubSub, first register a message handler using subscription.Receive(ctx, handler) in a goroutine, with the handler wrapping the pubsub.Message in a batbq.Message and sending it to the input channel. Then start the batcher to receive and batch these messages. The batcher will stop if the context is canceled or the input channel is closed; there is no "stop" method. See the full PubSub to BigQuery example for more details and options.

Scaling Parameters

Internally batbq uses a blocking worker(...) function to process data from the input channel into the current batch. Filled batches are sent to the Putter asynchronously. This introduces a concurrency level that can be controlled on the sender-side by limiting the number of pending (unconfirmed) messages.

For PubSub, this can done by setting MaxOutstandingMessages on a pubsub.Subscription.

For most use cases this should be sufficient and you can leave BatcherConfig.AutoScale = false (default). See SCALING.md for more details and benchmarks.

Multi Batching

The package also provides a MultiBatcher that can be set up to batch data from multiple inputs and outputs in parallel. Please consult the corresponding test case on how to set it up.

Documentation

Index

Constants

View Source
const (
	BATBQ   = "batbq"
	BATCHER = "batcher"
)

metrics prefix and label name

Variables

This section is empty.

Functions

This section is empty.

Types

type BatcherOption added in v0.0.15

type BatcherOption interface {
	// contains filtered or unexported methods
}

BatcherOption configures the batcher.

type Config added in v0.0.16

type Config config.BatcherConfig

Config wraps a config.BatcherConfig to be used as BatcherOption.

type InsertBatcher

type InsertBatcher struct {
	// contains filtered or unexported fields
}

InsertBatcher implements automatic batching with a batch capacity and flushInterval.

func NewInsertBatcher

func NewInsertBatcher(id string, opt ...BatcherOption) *InsertBatcher

NewInsertBatcher returns an InsertBatcher.

func (*InsertBatcher) Metrics added in v0.0.5

func (ins *InsertBatcher) Metrics() *Metrics

Metrics returns the metrics.

func (*InsertBatcher) Process

func (ins *InsertBatcher) Process(ctx context.Context, input <-chan Message, output Putter) error

Process starts the batcher.

type LogMessage

type LogMessage struct {
	bigquery.StructSaver
}

LogMessage implements the `Message` interface. A LogMessage ignores the `Ack()` and logs a given error from `Nack(err error)`. Use it for testing and for naive data pipelines that do not require acknowledging messages.

func (*LogMessage) Ack

func (m *LogMessage) Ack()

Ack does nothing.

func (*LogMessage) Data

func (m *LogMessage) Data() bigquery.ValueSaver

Data returns the embedded StructSaver.

func (*LogMessage) Nack

func (m *LogMessage) Nack(err error)

Nack logs the error.

type Message

type Message interface {
	Data() bigquery.ValueSaver // Data returns a ValueSaver for the bigquery.Inserter
	Ack()                      // Ack confirms successful processing of the message at the sender.
	Nack(err error)            // Nack reports unsuccessful processing and errors to the sender.
}

Message defines an (n)ackable message that contains the data for BigQuery.

type Metrics added in v0.0.5

type Metrics struct {
	// State
	NumWorkers      *prometheus.GaugeVec
	PendingMessages *prometheus.GaugeVec

	// Results
	ReceivedMessages  *prometheus.CounterVec
	ProcessedMessages *prometheus.CounterVec
	ProcessedBatches  *prometheus.CounterVec
	InsertErrors      *prometheus.CounterVec

	// Latencies
	InsertLatency *prometheus.HistogramVec
	AckLatency    *prometheus.HistogramVec
}

Metrics stores Batcher Metrics.

func NewMetrics added in v0.0.16

func NewMetrics(prefix ...string) *Metrics

NewMetrics create returns a new Metrics object.

func (*Metrics) Register added in v0.0.9

func (m *Metrics) Register(reg prometheus.Registerer)

Register registers all metrics.

type Putter

type Putter interface {
	Put(ctx context.Context, src interface{}) error
}

Putter provides a `Put` func as used by the `bigquery.Inserter`.

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL