bqin

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2020 License: MIT Imports: 23 Imported by: 0

README

BQin

BQin is a BigQuery data importer with AWS S3 and SQS messaging.
Respected to http://github.com/fujiwara/Rin

Architecture

  1. (Someone) creates a S3 object.
  2. S3 event notifications will send to a message to SQS.
  3. BQin will fetch messages from SQS
  4. BQin copy S3 object to Google Cloud Storage [this is temporary bucket], and create BigQuery Load Job

Configuration

Configuring Amazon S3 Event Notifications.

  1. Create SQS queue.
  2. Attach SQS access policy to the queue. Example Walkthrough 1:
  3. Enable Event Notifications on a S3 bucket.
  4. Create a temporary bucket on Google Cloud Storage and create the target dataset on BigQuery.
  5. Run bqin process with configuration for using the SQS and S3.
config.yaml
queue_name: my_queue_name    # SQS queue name
gcs_temporary_bucket: my_bucket_name # GCP temporary bucket

cloud:
  aws:
    region: ap-northeast-1

s3:
  bucket: bqin.bucket.test
  region: ap-northeast-1

big_query:
  project_id: bqin-test
  dataset: test

# define load rule
rules:
  - big_query: # standard rule
      table: user
    s3:
      key_prefix: data/user

  - big_query:  # expand by key_regexp captured value. for date-sharded tables.
      table: $1_$2
    s3:
      key_regexp: data/(.+)/part-([0-9]+).csv

  - big_query: # override default section in this rule
      project_id: hoge
      dataset: bqin_test
      table: role
    s3:
      bucket: bqin.bucket.test
      key_prefix: data/role

A configuration file is parsed by kayac/go-config.

go-config expands environment variables using syntax {{ env "FOO" }} or {{ must_env "FOO" }} in a configuration file.

Credentials

BQin requires some credentials.

Run

normally

BQin waits new SQS messages and processing it continually.

$ bqin run -config config.yaml [-debug]
maual load with request file (json)

BQin read request file and processing it.

$ bqin request -config config.yaml [-debug] request.json

request file format as

{
    "records":[
        {
            "source":{
                "bucket":"bqin.bucket.test",
                "object":"data/user/part-0001.csv"
            },
            "target":{
               "dataset":"bqin",
               "table":"user_20200101"
            }
        }
    ]
}

LICENCE

MIT

Author

KAYAC Inc.

Documentation

Index

Constants

View Source
const (
	S3URITemplate         = "s3://%s/%s"
	BigQueryTableTemplate = "%s.%s"
)

Variables

View Source
var (
	ErrMaxRetry = errors.New("max retry count reached")
)
View Source
var (
	ErrNoRequest = errors.New("no import request")
)

Functions

This section is empty.

Types

type App

type App struct {
	Receiver
	Processor
	// contains filtered or unexported fields
}

func NewApp

func NewApp(conf *Config) (*App, error)

func (*App) ReceiveAndProcess

func (app *App) ReceiveAndProcess() error

func (*App) Shutdown

func (app *App) Shutdown(ctx context.Context) error

type BigQueryDestination

type BigQueryDestination struct {
	ProjectID string `yaml:"project_id"`
	Table     string `yaml:"table"`
	Dataset   string `yaml:"dataset"`
}

func (BigQueryDestination) String

func (bq BigQueryDestination) String() string

type BigQueryTransporter

type BigQueryTransporter struct {
	// contains filtered or unexported fields
}

func NewBigQueryTransporter

func NewBigQueryTransporter(conf *Config, c *cloud.Cloud) *BigQueryTransporter

func (*BigQueryTransporter) Process

func (t *BigQueryTransporter) Process(ctx context.Context, req *ImportRequest) error

type Config

type Config struct {
	QueueName          string               `yaml:"queue_name"`
	GCSTemporaryBucket string               `yaml:"gcs_temporary_bucket"`
	Cloud              *cloud.Config        `yaml:"cloud"`
	S3                 *S3Soruce            `yaml:"s3"`
	BigQuery           *BigQueryDestination `yaml:"big_query"`
	Rules              []*Rule              `yaml:"rules"`
}

func LoadConfig

func LoadConfig(path string) (*Config, error)

func (*Config) GetMergedRules

func (c *Config) GetMergedRules() ([]*Rule, error)

type ImportRequest

type ImportRequest struct {
	ID            string                 `json:"id,omitempty"`
	ReceiptHandle string                 `json:"receipt_handle,omitempty"`
	Records       []*ImportRequestRecord `json:"records"`
}

type ImportRequestRecord

type ImportRequestRecord struct {
	Source *ImportSource `json:"source"`
	Target *ImportTarget `json:"target"`
}

type ImportSource

type ImportSource struct {
	Bucket string `json:"bucket"`
	Object string `json:"object"`
}

func (ImportSource) String

func (s ImportSource) String() string

type ImportTarget

type ImportTarget struct {
	ProjectID string `json:"project_id"`
	Dataset   string `json:"dataset"`
	Table     string `json:"table"`
}

type Processor

type Processor interface {
	Process(context.Context, *ImportRequest) error
}

type Receiver

type Receiver interface {
	Receive(context.Context) (*ImportRequest, error)
	Complete(context.Context, *ImportRequest) error
}

type Rule

type Rule struct {
	S3       *S3Soruce            `yaml:"s3"`
	BigQuery *BigQueryDestination `yaml:"big_query"`
	// contains filtered or unexported fields
}

func (*Rule) BuildImportTarget

func (r *Rule) BuildImportTarget(capture []string) *ImportTarget

func (*Rule) Match

func (r *Rule) Match(bucket, key string) (bool, []string)

func (*Rule) MatchEventRecord

func (r *Rule) MatchEventRecord(record events.S3EventRecord) (bool, []string)

func (*Rule) String

func (r *Rule) String() string

type S3Soruce

type S3Soruce struct {
	Region    string `yaml:"region"`
	Bucket    string `yaml:"bucket"`
	KeyPrefix string `yaml:"key_prefix"`
	KeyRegexp string `yaml:"key_regexp"`
}

func (S3Soruce) String

func (s3 S3Soruce) String() string

type SQSReceiver

type SQSReceiver struct {
	// contains filtered or unexported fields
}

func NewSQSReceiver

func NewSQSReceiver(conf *Config, c *cloud.Cloud) (*SQSReceiver, error)

func (*SQSReceiver) Complete

func (r *SQSReceiver) Complete(_ context.Context, req *ImportRequest) error

func (*SQSReceiver) Receive

func (r *SQSReceiver) Receive(ctx context.Context) (*ImportRequest, error)

Directories

Path Synopsis
cmd
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL