leech

package
v0.0.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package leech contains an Apache Beam data pipeline that will read workflow event records from BigQuery and ingest any available logs into cloud storage. A mapping from the original GitHub event to the cloud storage location is persisted in BigQuery along with an indicator for the status of the copy. The pipeline acts as a GitHub App for authentication purposes.

Index

Constants

View Source
const SourceQuery = `
SELECT
	delivery_id,
	JSON_VALUE(payload, "$.repository.full_name") repo_slug,
	JSON_VALUE(payload, "$.repository.name") repo_name,
	JSON_VALUE(payload, "$.repository.owner.login") org_name,
	JSON_VALUE(payload, "$.workflow_run.logs_url") logs_url,
	JSON_VALUE(payload, "$.workflow_run.actor.login") github_actor,
	JSON_VALUE(payload, "$.workflow_run.html_url") workflow_url
FROM ` + "`%s`" + `
WHERE
event = "workflow_run"
AND JSON_VALUE(payload, "$.workflow_run.status") = "completed"
AND delivery_id NOT IN (
SELECT
  delivery_id
FROM ` + "`%s`" + `)
LIMIT %d
`

sourceQuery is the driving BigQuery query that selects events that need to be processed.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventRecord

type EventRecord struct {
	DeliveryID       string `bigquery:"delivery_id" json:"delivery_id"`
	RepositorySlug   string `bigquery:"repo_slug" json:"repo_slug"`
	RepositoryName   string `bigquery:"repo_name" json:"repo_name"`
	OrganizationName string `bigquery:"org_name" json:"org_name"`
	LogsURL          string `bigquery:"logs_url" json:"logs_url"`
	GitHubActor      string `bigquery:"github_actor" json:"github_actor"`
	WorkflowURL      string `bigquery:"workflow_url" json:"workflow_url"`
}

EventRecord maps the columns from the driving BigQuery query to a usable structure.

type IngestLogsFn

type IngestLogsFn struct {
	LogsBucketName   string `beam:"logsBucketName"`
	GitHubAppID      string `beam:"githubAppID"`
	GitHubInstallID  string `beam:"githubInstallID"`
	GitHubPrivateKey string `beam:"githubPrivateKey"`
	// contains filtered or unexported fields
}

IngestLogsFn is an object that implements beams "DoFn" interface to provide the main processing of the event.

func (*IngestLogsFn) ProcessElement

func (f *IngestLogsFn) ProcessElement(ctx context.Context, event EventRecord) LeechRecord

ProcessElement is a DoFn implementation that reads workflow logs from GitHub and stores them in Cloud Storage.

func (*IngestLogsFn) StartBundle

func (f *IngestLogsFn) StartBundle(ctx context.Context) error

StartBundle is called by Beam when the DoFn function is initialized. With a local runner this is called from the running version of the application. For Dataflow this is called on each worker node after the binary is provisioned. Remote Dataflow workers do not have the same environment or runtime arguments as the launcher process. The IngestLogsFn struct is serialized to the worker along with all public attributes that can be serialized. This causes us to have to initialize the object store, GitHub app and http client from this method once it has materialized on the remote host.

type LeechRecord

type LeechRecord struct {
	DeliveryID       string    `bigquery:"delivery_id" json:"delivery_id"`
	ProcessedAt      time.Time `bigquery:"processed_at" json:"processed_at"`
	Status           string    `bigquery:"status" json:"status"`
	WorkflowURI      string    `bigquery:"workflow_uri" json:"workflow_uri"`
	LogsURI          string    `bigquery:"logs_uri" json:"logs_uri"`
	GitHubActor      string    `bigquery:"github_actor" json:"github_actor"`
	OrganizationName string    `bigquery:"organization_name" json:"organization_name"`
	RepositoryName   string    `bigquery:"repository_name" json:"repository_name"`
	RepositorySlug   string    `bigquery:"repository_slug" json:"repository_slug"`
	JobName          string    `bigquery:"job_name" json:"job_name"`
}

LeechRecord is the output data structure that maps to the leech pipeline's output table schema.

type ObjectStore

type ObjectStore struct {
	// contains filtered or unexported fields
}

ObjectStore is an implementation of the ObjectWriter interface that writes to Cloud Storage.

func NewObjectStore

func NewObjectStore(ctx context.Context) (*ObjectStore, error)

NewObjectStore creates a ObjectWriter implementation that uses cloud storage to store its objects.

func (*ObjectStore) Write

func (s *ObjectStore) Write(ctx context.Context, content io.Reader, objectDescriptor string) error

Write writes an object to Google Cloud Storage.

type ObjectWriter

type ObjectWriter interface {
	Write(ctx context.Context, content io.Reader, descriptor string) error
}

ObjectWriter is an interface for writing a object/blob to a storage medium.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL