jobs

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2022 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WhJobWaiting   string = "waiting"
	WhJobExecuting string = "executing"
	WhJobSucceeded string = "succeeded"
	WhJobAborted   string = "aborted"
	WhJobFailed    string = "failed"
	AsyncJobType   string = "async_job"
)
View Source
const (
	MaxBatchSizeToProcess int = 10
	MaxCleanUpRetries     int = 5
	MaxQueryRetries       int = 3
	RetryTimeInterval         = 10 * time.Second
	MaxAttemptsPerJob     int = 3
	WhAsyncJobTimeOut         = 10 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncJobPayloadT

type AsyncJobPayloadT struct {
	Id            string          `json:"id"`
	SourceID      string          `json:"source_id"`
	DestinationID string          `json:"destination_id"`
	TableName     string          `json:"tablename"`
	AsyncJobType  string          `json:"async_job_type"`
	MetaData      json.RawMessage `json:"metadata"`
}

For creating job payload to wh_async_jobs table

type AsyncJobWhT

type AsyncJobWhT struct {
	// contains filtered or unexported fields
}

func InitWarehouseJobsAPI

func InitWarehouseJobsAPI(ctx context.Context, dbHandle *sql.DB, notifier *pgnotifier.PgNotifierT) *AsyncJobWhT

Initializes AsyncJobWh structure with appropriate variabless

func (*AsyncJobWhT) AddWarehouseJobHandler

func (asyncWhJob *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *http.Request)

The following handler gets called for adding async

func (*AsyncJobWhT) InitAsyncJobRunner

func (asyncWhJob *AsyncJobWhT) InitAsyncJobRunner() error

Async Job runner's main job is to 1) Scan the database for entries into wh_async_jobs 2) Publish data to pg_notifier queue 3) Move any executing jobs to waiting

func (*AsyncJobWhT) StatusWarehouseJobHandler

func (asyncWhJob *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter, r *http.Request)

type AsyncJobsStatusMap

type AsyncJobsStatusMap struct {
	Id     string
	Status string
	Error  error
}

type PGNotifierOutput

type PGNotifierOutput struct {
	Id string `json:"id"`
}

type StartJobReqPayload

type StartJobReqPayload struct {
	SourceID      string `json:"source_id"`
	Type          string `json:"type"`
	Channel       string `json:"channel"`
	DestinationID string `json:"destination_id"`
	StartTime     string `json:"start_time"`
	JobRunID      string `json:"job_run_id"`
	TaskRunID     string `json:"task_run_id"`
	AsyncJobType  string `json:"async_job_type"`
}

For processing requests payload in handlers.go

type WhAddJobResponse

type WhAddJobResponse struct {
	JobIds []int64 `json:"jobids"`
	Err    error   `json:"error"`
}

type WhAsyncJob

type WhAsyncJob struct{}

func (*WhAsyncJob) GetFirstLastEvent

func (*WhAsyncJob) GetFirstLastEvent() (time.Time, time.Time)

func (*WhAsyncJob) GetLoadFileGenStartTIme

func (*WhAsyncJob) GetLoadFileGenStartTIme() time.Time

func (*WhAsyncJob) GetLoadFileType

func (*WhAsyncJob) GetLoadFileType() string

func (*WhAsyncJob) GetLocalSchema

func (*WhAsyncJob) GetLocalSchema() warehouseutils.SchemaT

func (*WhAsyncJob) GetSampleLoadFileLocation

func (*WhAsyncJob) GetSampleLoadFileLocation(string) (string, error)

func (*WhAsyncJob) GetSchemaInWarehouse

func (*WhAsyncJob) GetSchemaInWarehouse() warehouseutils.SchemaT

func (*WhAsyncJob) GetSingleLoadFile

func (*WhAsyncJob) GetSingleLoadFile(string) (warehouseutils.LoadFileT, error)

func (*WhAsyncJob) GetTableSchemaInUpload

func (*WhAsyncJob) GetTableSchemaInUpload(string) warehouseutils.TableSchemaT

func (*WhAsyncJob) GetTableSchemaInWarehouse

func (*WhAsyncJob) GetTableSchemaInWarehouse(string) warehouseutils.TableSchemaT

func (*WhAsyncJob) ShouldOnDedupUseNewRecord

func (*WhAsyncJob) ShouldOnDedupUseNewRecord() bool

func (*WhAsyncJob) UpdateLocalSchema

func (*WhAsyncJob) UpdateLocalSchema(warehouseutils.SchemaT) error

func (*WhAsyncJob) UseRudderStorage

func (*WhAsyncJob) UseRudderStorage() bool

type WhAsyncJobRunner

type WhAsyncJobRunner interface {
	// contains filtered or unexported methods
}

type WhJobsMetaData

type WhJobsMetaData struct {
	JobRunID  string `json:"job_run_id"`
	TaskRunID string `json:"task_run_id"`
	JobType   string `json:"jobtype"`
	StartTime string `json:"start_time"`
}

type WhStatusResponse

type WhStatusResponse struct {
	Status string
	Err    string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL