jobs

package
v1.5.0-rc.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2023 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WhJobWaiting   string = "waiting"
	WhJobExecuting string = "executing"
	WhJobSucceeded string = "succeeded"
	WhJobAborted   string = "aborted"
	WhJobFailed    string = "failed"
	AsyncJobType   string = "async_job"
)

Variables

This section is empty.

Functions

func WithConfig added in v1.4.0

func WithConfig(a *AsyncJobWhT, config *config.Config)

Types

type AsyncJobPayloadT

type AsyncJobPayloadT struct {
	Id            string          `json:"id"`
	SourceID      string          `json:"source_id"`
	DestinationID string          `json:"destination_id"`
	TableName     string          `json:"tablename"`
	AsyncJobType  string          `json:"async_job_type"`
	MetaData      json.RawMessage `json:"metadata"`
}

AsyncJobPayloadT For creating job payload to wh_async_jobs table

type AsyncJobStatus added in v1.3.0

type AsyncJobStatus struct {
	Id     string
	Status string
	Error  error
}

type AsyncJobWhT

type AsyncJobWhT struct {
	MaxBatchSizeToProcess int
	MaxCleanUpRetries     int
	MaxQueryRetries       int
	RetryTimeInterval     time.Duration
	MaxAttemptsPerJob     int
	AsyncJobTimeOut       time.Duration
	// contains filtered or unexported fields
}

func InitWarehouseJobsAPI

func InitWarehouseJobsAPI(
	ctx context.Context,
	dbHandle *sql.DB,
	notifier *pgnotifier.PgNotifierT,
) *AsyncJobWhT

InitWarehouseJobsAPI Initializes AsyncJobWh structure with appropriate variabless

func (*AsyncJobWhT) AddWarehouseJobHandler

func (a *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *http.Request)

AddWarehouseJobHandler The following handler gets called for adding async

func (*AsyncJobWhT) InitAsyncJobRunner

func (a *AsyncJobWhT) InitAsyncJobRunner() error

InitAsyncJobRunner Async Job runner's main job is to 1. Scan the database for entries into wh_async_jobs 2. Publish data to pg_notifier queue 3. Move any executing jobs to waiting

func (*AsyncJobWhT) StatusWarehouseJobHandler

func (a *AsyncJobWhT) StatusWarehouseJobHandler(w http.ResponseWriter, r *http.Request)

type PGNotifierOutput

type PGNotifierOutput struct {
	Id string `json:"id"`
}

type StartJobReqPayload

type StartJobReqPayload struct {
	SourceID      string `json:"source_id"`
	Type          string `json:"type"`
	Channel       string `json:"channel"`
	DestinationID string `json:"destination_id"`
	StartTime     string `json:"start_time"`
	JobRunID      string `json:"job_run_id"`
	TaskRunID     string `json:"task_run_id"`
	AsyncJobType  string `json:"async_job_type"`
}

StartJobReqPayload For processing requests payload in handlers.go

type WhAddJobResponse

type WhAddJobResponse struct {
	JobIds []int64 `json:"jobids"`
	Err    error   `json:"error"`
}

type WhAsyncJob

type WhAsyncJob struct{}

func (*WhAsyncJob) GetFirstLastEvent

func (*WhAsyncJob) GetFirstLastEvent() (time.Time, time.Time)

func (*WhAsyncJob) GetLoadFileGenStartTIme

func (*WhAsyncJob) GetLoadFileGenStartTIme() time.Time

func (*WhAsyncJob) GetLoadFileType

func (*WhAsyncJob) GetLoadFileType() string

func (*WhAsyncJob) GetLocalSchema

func (*WhAsyncJob) GetLocalSchema() warehouseutils.SchemaT

func (*WhAsyncJob) GetSampleLoadFileLocation

func (*WhAsyncJob) GetSampleLoadFileLocation(string) (string, error)

func (*WhAsyncJob) GetSchemaInWarehouse

func (*WhAsyncJob) GetSchemaInWarehouse() warehouseutils.SchemaT

func (*WhAsyncJob) GetSingleLoadFile

func (*WhAsyncJob) GetSingleLoadFile(string) (warehouseutils.LoadFileT, error)

func (*WhAsyncJob) GetTableSchemaInUpload

func (*WhAsyncJob) GetTableSchemaInUpload(string) warehouseutils.TableSchemaT

func (*WhAsyncJob) GetTableSchemaInWarehouse

func (*WhAsyncJob) GetTableSchemaInWarehouse(string) warehouseutils.TableSchemaT

func (*WhAsyncJob) ShouldOnDedupUseNewRecord

func (*WhAsyncJob) ShouldOnDedupUseNewRecord() bool

func (*WhAsyncJob) UpdateLocalSchema

func (*WhAsyncJob) UpdateLocalSchema(warehouseutils.SchemaT) error

func (*WhAsyncJob) UseRudderStorage

func (*WhAsyncJob) UseRudderStorage() bool

type WhAsyncJobRunner

type WhAsyncJobRunner interface {
	// contains filtered or unexported methods
}

type WhJobsMetaData

type WhJobsMetaData struct {
	JobRunID  string `json:"job_run_id"`
	TaskRunID string `json:"task_run_id"`
	JobType   string `json:"jobtype"`
	StartTime string `json:"start_time"`
}

type WhStatusResponse

type WhStatusResponse struct {
	Status string
	Err    string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL