Documentation ¶
Overview ¶
Package bqloader is a simple ETL framework running on Cloud Functions to load data from Cloud Storage into BigQuery.
Getting started with pre-configured handlers ¶
See the example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/pre_configured_handlers
To load some types of CSV formats, you can use pre-configured handlers. See the full list on GitHub. https://github.com/nownabe/go-bqloader/tree/main/contrib/handlers
package myfunc import ( "context" "go.nownabe.dev/bqloader" "go.nownabe.dev/bqloader/contrib/handlers" ) var loader bqloader.BQLoader func init() { loader, _ = bqloader.New() t := handlers.TableGenerator(os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID")) n := &bqloader.SlackNotifier{ Token: os.Getenv("SLACK_TOKEN"), Channel: os.Getenv("SLACK_CHANNEL"), } handlers.MustAddHandlers(context.Background(), loader, // These build handlers to load CSVs, given four arguments: // handler name, a pattern to file path on Cloud Storage, a BigQuery table and a notifier. handlers.SBISumishinNetBankStatement("SBI Bank", `^sbi_bank/`, t("sbi_bank"), n), handlers.SMBCCardStatement("SMBC Card", `^smbc_card/`, t("smbc_card"), n), ) } // BQLoad is the entrypoint for Cloud Functions. func BQLoad(ctx context.Context, e bqloader.Event) error { return loader.Handle(ctx, e) }
Getting started with custom handlers ¶
See Quickstart example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/quickstart
For simple transforming and loading CSV, import the package `go.nownabe.dev/bqloader` and write your handler.
package myfunc import ( "context" "os" "regexp" "strings" "time" "golang.org/x/text/encoding/japanese" "golang.org/x/xerrors" "go.nownabe.dev/bqloader" ) var loader bqloader.BQLoader func init() { loader, _ = bqloader.New() loader.MustAddHandler(context.Background(), newHandler()) } func newHandler() *bqloader.Handler { // This projector converts date fields formatted as "2006/01/02" // at the first column into strings like "2006-01-02" that satisfies // BigQuery date type. projector := func(_ context.Context, r []string) ([]string, error) { t, err := time.Parse("2006/01/02", r[0]) if err != nil { return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err) } r[0] = t.Format("2006-01-02") return r, nil } return &bqloader.Handler{ Name: "quickstart", // Handler name used in logs and notifications. Pattern: regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern. Encoding: japanese.ShiftJIS, // Source file encoding. Parser: bqloader.CSVParser(), // Parser parses source file into records. Notifier: &bqloader.SlackNotifier{ Token: os.Getenv("SLACK_TOKEN"), Channel: os.Getenv("SLACK_CHANNEL"), }, Projector: projector, // Projector transforms each row. SkipLeadingRows: 1, // Skip header row. // Destination. Project: os.Getenv("BIGQUERY_PROJECT_ID"), Dataset: os.Getenv("BIGQUERY_DATASET_ID"), Table: os.Getenv("BIGQUERY_TABLE_ID"), } } // BQLoad is the entrypoint for Cloud Functions. func BQLoad(ctx context.Context, e bqloader.Event) error { return loader.Handle(ctx, e) }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BQLoader ¶
type BQLoader interface { AddHandler(context.Context, *Handler) error Handle(context.Context, Event) error MustAddHandler(context.Context, *Handler) }
BQLoader loads data from Cloud Storage to BigQuery table.
type Event ¶
type Event struct { Name string `json:"name"` Bucket string `json:"bucket"` TimeCreated time.Time `json:"timeCreated"` // contains filtered or unexported fields }
Event is an event from Cloud Storage.
type Handler ¶
type Handler struct { // Name is the handler's name. Name string Pattern *regexp.Regexp Encoding encoding.Encoding Parser Parser Notifier Notifier Projector Projector SkipLeadingRows uint Preprocessor Preprocessor // BatchSize specifies how much records are processed in a groutine. // Default is 10000. BatchSize int // Project specifies GCP project name of destination BigQuery table. Project string // Dataset specifies BigQuery dataset ID of destination table Dataset string // Table specifies BigQuery table ID as destination. Table string Extractor Extractor Loader Loader // contains filtered or unexported fields }
Handler defines how to handle events which match specified pattern.
func (*Handler) SetConcurrency ¶ added in v1.3.0
SetConcurrency sets handler's concurrency directly. Normally set concurrency to BQLoader with WithConcurrency option.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option configures BQLoader.
func WithConcurrency ¶ added in v0.5.0
WithConcurrency configures the concurrency of projectors. Before setting this, confirm GOMAXPROCS.
func WithLogLevel ¶
WithLogLevel configures log level to print logs. Allowed values are trace, debug, info, warn, error, fatal or panic.
func WithPrettyLogging ¶
func WithPrettyLogging() Option
WithPrettyLogging configures BQLoader to print human friendly logs.
type Preprocessor ¶ added in v1.2.0
Preprocessor preprocesses event and store data into a map.
type SlackNotifier ¶ added in v0.4.0
type SlackNotifier struct { Channel string Token string // Optional. IconEmoji string // Optional. Username string // Optional. HTTPClient *http.Client // contains filtered or unexported fields }
SlackNotifier is a notifier for Slack. SlackNotifier requires bot token and permissions. Recommended permissions are chat:write, chat:write.customize and chat:write.public.