bbq

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2019 License: BSD-3-Clause Imports: 13 Imported by: 0

README

BBQ

BBQ is a library that writes messages from a Kafka topic directly into BigQuery. To use it, we need to define a list of TableOptions which specifies the input topic we wish to write, the topic's object and its codec, and the expiration time of the table in which the message will be written.

The table will be automatically created, and its name will be equal to the topic's name. If the structure of the object being written changes, the table's schema will be automatically updated. However, and this is a BigQuery limitation, if a field of the object is removed, the schema of the table won't be updated, and it will produce an error.

Example


The following code is a working example of BBQ using Goka as our Kafka processing library.

package main

import (
	"context"
	"log"
	"time"

	"cloud.google.com/go/bigquery"
	"github.com/lovoo/goka"
	"github.com/lovoo/goka-tools/bbq"
	"github.com/lovoo/goka/codec"
)

// tableOptions retypes the bbq.TableOptions-Type to allow extracting
// a list of goka-edges to initialize the processor.
type tableOptions []*bbq.TableOptions

func (tt tableOptions) edges(consumer goka.ProcessCallback) []goka.Edge {
	var edges []goka.Edge
	for _, option := range tt {
		edges = append(edges, goka.Input(option.Input, option.Codec, consumer))
	}
	return edges
}

func main() {
	tables := []*bbq.TableOptions{
		&bbq.TableOptions{
			Input:            "topic_name",
			Obj:              []byte{},
			TimePartitioning: &bigquery.TimePartitioning{Expiration: 14 * 24 * time.Hour},
			Codec:            new(codec.Bytes),
		},
	}

	bbq, err := bbq.NewBbq("gcp-project", "target-dataset", tables)
	if err != nil {
		log.Fatalf("Unable to create new BBQ: %v", err)
	}

	proc, err := goka.NewProcessor([]string{"kafka", "brokers"}, goka.DefineGroup(
		"bbq-group",
		tableOptions(tables).edges(bbq.Consume)...,
	), goka.WithClientID("bbq"))

	if err != nil {
		log.Fatalf("Unable to create Goka processor: %v", err)
	}

	done := make(chan bool)
	go func() {
		defer close(done)
		if err = proc.Run(context.Background()); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	bbq.Stop(10 * time.Second)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bbq

type Bbq struct {
	// contains filtered or unexported fields
}

Bbq writes the contents of kafka topics to bigquery

func NewBbq

func NewBbq(gcpproject string, datesetName string, tables []*TableOptions) (*Bbq, error)

NewBbq creates a new Bbq struct.

func (*Bbq) Consume

func (b *Bbq) Consume(ctx goka.Context, msg interface{})

Consume consumes the streams

func (*Bbq) Stop

func (b *Bbq) Stop(timeout time.Duration)

Stop drains the batches in the bbq-uploaders and blocks until they're done

type TableOptions

type TableOptions struct {
	Obj              interface{}
	TimePartitioning *bigquery.TimePartitioning
	Input            goka.Stream
	Codec            goka.Codec
}

TableOptions represents one kafka topic and the connected codec

func (*TableOptions) Name

func (to *TableOptions) Name() string

Name returns the name of the topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL