Documentation ¶
Overview ¶
Package leech contains an Apache Beam data pipeline that will read workflow event records from BigQuery and ingest any available logs into cloud storage. A mapping from the original GitHub event to the cloud storage location is persisted in BigQuery along with an indicator for the status of the copy. The pipeline acts as a GitHub App for authentication purposes.
Index ¶
Constants ¶
const SourceQuery = `
SELECT
delivery_id,
JSON_VALUE(payload, "$.repository.full_name") repo_slug,
JSON_VALUE(payload, "$.repository.name") repo_name,
JSON_VALUE(payload, "$.repository.owner.login") org_name,
JSON_VALUE(payload, "$.workflow_run.logs_url") logs_url,
JSON_VALUE(payload, "$.workflow_run.actor.login") github_actor,
JSON_VALUE(payload, "$.workflow_run.html_url") workflow_url
FROM ` + "`%s`" + `
WHERE
event = "workflow_run"
AND JSON_VALUE(payload, "$.workflow_run.status") = "completed"
AND delivery_id NOT IN (
SELECT
delivery_id
FROM ` + "`%s`" + `)
LIMIT %d
`
sourceQuery is the driving BigQuery query that selects events that need to be processed.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventRecord ¶
type EventRecord struct { DeliveryID string `bigquery:"delivery_id" json:"delivery_id"` RepositorySlug string `bigquery:"repo_slug" json:"repo_slug"` RepositoryName string `bigquery:"repo_name" json:"repo_name"` OrganizationName string `bigquery:"org_name" json:"org_name"` LogsURL string `bigquery:"logs_url" json:"logs_url"` GitHubActor string `bigquery:"github_actor" json:"github_actor"` WorkflowURL string `bigquery:"workflow_url" json:"workflow_url"` }
EventRecord maps the columns from the driving BigQuery query to a usable structure.
type IngestLogsFn ¶
type IngestLogsFn struct { LogsBucketName string `beam:"logsBucketName"` GitHubAppID string `beam:"githubAppID"` GitHubInstallID string `beam:"githubInstallID"` GitHubPrivateKey string `beam:"githubPrivateKey"` // contains filtered or unexported fields }
IngestLogsFn is an object that implements beams "DoFn" interface to provide the main processing of the event.
func (*IngestLogsFn) ProcessElement ¶
func (f *IngestLogsFn) ProcessElement(ctx context.Context, event EventRecord) LeechRecord
ProcessElement is a DoFn implementation that reads workflow logs from GitHub and stores them in Cloud Storage.
func (*IngestLogsFn) StartBundle ¶
func (f *IngestLogsFn) StartBundle(ctx context.Context) error
StartBundle is called by Beam when the DoFn function is initialized. With a local runner this is called from the running version of the application. For Dataflow this is called on each worker node after the binary is provisioned. Remote Dataflow workers do not have the same environment or runtime arguments as the launcher process. The IngestLogsFn struct is serialized to the worker along with all public attributes that can be serialized. This causes us to have to initialize the object store, GitHub app and http client from this method once it has materialized on the remote host.
type LeechRecord ¶
type LeechRecord struct { DeliveryID string `bigquery:"delivery_id" json:"delivery_id"` ProcessedAt time.Time `bigquery:"processed_at" json:"processed_at"` Status string `bigquery:"status" json:"status"` WorkflowURI string `bigquery:"workflow_uri" json:"workflow_uri"` LogsURI string `bigquery:"logs_uri" json:"logs_uri"` GitHubActor string `bigquery:"github_actor" json:"github_actor"` OrganizationName string `bigquery:"organization_name" json:"organization_name"` RepositoryName string `bigquery:"repository_name" json:"repository_name"` RepositorySlug string `bigquery:"repository_slug" json:"repository_slug"` JobName string `bigquery:"job_name" json:"job_name"` }
LeechRecord is the output data structure that maps to the leech pipeline's output table schema.
type ObjectStore ¶
type ObjectStore struct {
// contains filtered or unexported fields
}
ObjectStore is an implementation of the ObjectWriter interface that writes to Cloud Storage.
func NewObjectStore ¶
func NewObjectStore(ctx context.Context) (*ObjectStore, error)
NewObjectStore creates a ObjectWriter implementation that uses cloud storage to store its objects.