bqlog

package
v0.0.0-...-feb63dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: Apache-2.0 Imports: 25 Imported by: 20

Documentation

Overview

Package bqlog provides a mechanism to asynchronously log rows to BigQuery.

Deprecated: this package depends on Pull Task Queues which are deprecated and not available from the GAE second-gen runtime or from Kubernetes. The replacement is go.chromium.org/luci/server/tq PubSub tasks, plus a PubSub push subscription with a handler that simply inserts rows into BigQuery.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Log

type Log struct {
	// QueueName is a name of a pull queue to use as a buffer for inserts.
	//
	// Required. It must be defined in queue.yaml file and it must not be used by
	// any other Log object.
	QueueName string

	// ProjectID is Cloud Project that owns the dataset.
	//
	// If empty, will be derived from the current app ID.
	ProjectID string

	// DatasetID identifies the already existing dataset that contains the table.
	//
	// Required.
	DatasetID string

	// TableID identifies the name of the table in the dataset.
	//
	// Required. The table must exist already.
	TableID string

	// BatchesPerRequest is how many batches of entries to send in one BQ insert.
	//
	// A call to 'Insert' generates one batch of entries, thus BatchesPerRequest
	// essentially specifies how many 'Insert's to clump together when sending
	// data to BigQuery. If your Inserts are known to be huge, lowering this value
	// may help to avoid hitting memory limits.
	//
	// Default is 250. It assumes your batches are very small (1-3 rows), which
	// is usually the case if events are generated by online RPC handlers.
	BatchesPerRequest int

	// MaxParallelUploads is how many parallel ops to do when flushing.
	//
	// We limit it to avoid hitting OOM errors on GAE.
	//
	// Default is 64.
	MaxParallelUploads int

	// FlushTimeout is maximum duration to spend in fetching from Pull Queue in
	// 'Flush'.
	//
	// We limit it to make sure 'Flush' has a chance to finish running before
	// GAE kills it by deadline. Next time 'Flush' is called, it will resume
	// flushing from where it left off.
	//
	// Note that 'Flush' can run for slightly longer, since it waits for all
	// pulled data to be flushed before returning.
	//
	// Default is 1 min.
	FlushTimeout time.Duration

	// DumpEntriesToLogger makes 'Insert' log all entries (at debug level).
	DumpEntriesToLogger bool

	// DryRun disables the actual uploads (keeps the local logging though).
	DryRun bool
	// contains filtered or unexported fields
}

Log can be used to insert entries into a BigQuery table.

func (*Log) Flush

func (l *Log) Flush(ctx context.Context) (int, error)

Flush pulls buffered rows from Pull Queue and sends them to BigQuery.

Must be called periodically from some cron job. It is okay to call 'Flush' concurrently from multiple processes to speed up the upload.

It succeeds if all entries it attempted to send were successfully handled by BigQuery. If some entries are malformed, it logs the error and skip them, so they don't get stuck in the pending buffer forever. This corresponds to 'skipInvalidRows=true' in 'insertAll' BigQuery call.

Returns number of rows sent to BigQuery. May return both non zero number of rows and an error if something bad happened midway.

func (*Log) Insert

func (l *Log) Insert(ctx context.Context, rows ...bigquery.ValueSaver) (err error)

Insert adds a bunch of entries to the buffer of pending entries.

It will reuse existing datastore transaction (if any). This allows to log entries transactionally when changing something in the datastore.

Empty inserts IDs will be replaced with autogenerated ones (they start with 'bqlog:'). Entries not matching the schema are logged and skipped during the flush.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL