router

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2022 License: AGPL-3.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Diagnostics diagnostics.DiagnosticsI

	QueryFilters jobsdb.QueryFiltersT
)

Functions

func CleanFailedRecordsTableProcess added in v0.1.10

func CleanFailedRecordsTableProcess(ctx context.Context)

func Init added in v0.1.10

func Init()

func InitRouterAdmin added in v0.1.10

func InitRouterAdmin()

func PrepareJobRunIdAbortedEventsMap added in v0.1.10

func PrepareJobRunIdAbortedEventsMap(parameters json.RawMessage, jobRunIDAbortedEventsMap map[string][]*FailedEventRowT)

func RegisterAdminHandlers added in v0.1.10

func RegisterAdminHandlers(readonlyRouterDB, readonlyBatchRouterDB jobsdb.ReadonlyJobsDB)

Types

type Admin added in v0.1.10

type Admin struct {
	// contains filtered or unexported fields
}

func (*Admin) Status added in v0.1.10

func (ra *Admin) Status() interface{}

Status function is used for debug purposes by the admin interface

type DSStats added in v0.1.10

type DSStats struct {
	JobCountsByStateAndDestination []JobCountsByStateAndDestination
	ErrorCodeCountsByDestination   []ErrorCodeCountsByDestination
	JobCountByConnections          []JobCountByConnections
	LatestJobStatusCounts          []LatestJobStatusCounts
	UnprocessedJobCounts           int
}

type ErrorCodeCountsByDestination added in v0.1.10

type ErrorCodeCountsByDestination struct {
	Count         int
	ErrorCode     string
	Destination   string
	DestinationID string
}

type Factory added in v0.1.10

type Factory struct {
	Reporting        reporter
	Multitenant      tenantStats
	BackendConfig    backendconfig.BackendConfig
	RouterDB         jobsdb.MultiTenantJobsDB
	ProcErrorDB      jobsdb.JobsDB
	TransientSources transientsource.Service
	RsourcesService  rsources.JobService
}

func (*Factory) New added in v0.1.10

func (f *Factory) New(destinationDefinition backendconfig.DestinationDefinitionT) *HandleT

type FailedEventRowT added in v0.1.10

type FailedEventRowT struct {
	DestinationID string          `json:"destination_id"`
	RecordID      json.RawMessage `json:"record_id"`
}

type FailedEventsManagerI added in v0.1.10

type FailedEventsManagerI interface {
	SaveFailedRecordIDs(map[string][]*FailedEventRowT, *sql.Tx)
	DropFailedRecordIDs(jobRunID string)
	FetchFailedRecordIDs(jobRunID string) []*FailedEventRowT
	GetDBHandle() *sql.DB
}

func GetFailedEventsManager added in v0.1.10

func GetFailedEventsManager() FailedEventsManagerI

type FailedEventsManagerT added in v0.1.10

type FailedEventsManagerT struct {
	// contains filtered or unexported fields
}

func (*FailedEventsManagerT) DropFailedRecordIDs added in v0.1.10

func (fem *FailedEventsManagerT) DropFailedRecordIDs(taskRunID string)

func (*FailedEventsManagerT) FetchFailedRecordIDs added in v0.1.10

func (fem *FailedEventsManagerT) FetchFailedRecordIDs(taskRunID string) []*FailedEventRowT

func (*FailedEventsManagerT) GetDBHandle added in v0.1.10

func (fem *FailedEventsManagerT) GetDBHandle() *sql.DB

func (*FailedEventsManagerT) SaveFailedRecordIDs added in v0.1.10

func (fem *FailedEventsManagerT) SaveFailedRecordIDs(taskRunIDFailedEventsMap map[string][]*FailedEventRowT, txn *sql.Tx)

type HandleDestOAuthRespParamsT added in v0.1.10

type HandleDestOAuthRespParamsT struct {
	// contains filtered or unexported fields
}

type HandleT

type HandleT struct {
	MultitenantI tenantStats

	Reporting reporter
	// contains filtered or unexported fields
}

HandleT is the handle to this module.

func (*HandleT) Disable

func (rt *HandleT) Disable()

Disable disables a router:)

func (*HandleT) Enable

func (rt *HandleT) Enable()

Enable enables a router :)

func (*HandleT) ExecDisableDestination added in v0.1.10

func (rt *HandleT) ExecDisableDestination(destinationJob types.DestinationJobT, workspaceId string, destResBody string, rudderAccountId string) (int, string)

func (*HandleT) HandleOAuthDestResponse added in v0.1.10

func (rt *HandleT) HandleOAuthDestResponse(params *HandleDestOAuthRespParamsT) (int, string)

func (*HandleT) ResetSleep

func (rt *HandleT) ResetSleep()

ResetSleep this makes the workers reset their sleep

func (*HandleT) Setup

func (rt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB jobsdb.MultiTenantJobsDB, errorDB jobsdb.JobsDB, destinationDefinition backendconfig.DestinationDefinitionT, transientSources transientsource.Service, rsourcesService rsources.JobService)

Setup initializes this module

func (*HandleT) Shutdown added in v0.1.10

func (rt *HandleT) Shutdown()

func (*HandleT) Start added in v0.1.10

func (rt *HandleT) Start()

type JSONResponseHandler added in v0.1.10

type JSONResponseHandler struct {
	// contains filtered or unexported fields
}

JSONResponseHandler handler for json response

func (*JSONResponseHandler) IsSuccessStatus added in v0.1.10

func (handler *JSONResponseHandler) IsSuccessStatus(respCode int, respBody string) (returnCode int)

IsSuccessStatus - returns the status code based on the response code and body

type JobCountByConnections added in v0.1.10

type JobCountByConnections struct {
	Count         int
	SourceId      string
	DestinationId string
}

type JobCountsByStateAndDestination added in v0.1.10

type JobCountsByStateAndDestination struct {
	Count       int
	State       string
	Destination string
}

type JobParametersT added in v0.1.10

type JobParametersT struct {
	SourceID                string      `json:"source_id"`
	DestinationID           string      `json:"destination_id"`
	ReceivedAt              string      `json:"received_at"`
	TransformAt             string      `json:"transform_at"`
	SourceBatchID           string      `json:"source_batch_id"`
	SourceTaskID            string      `json:"source_task_id"`
	SourceTaskRunID         string      `json:"source_task_run_id"`
	SourceJobID             string      `json:"source_job_id"`
	SourceJobRunID          string      `json:"source_job_run_id"`
	SourceDefinitionID      string      `json:"source_definition_id"`
	DestinationDefinitionID string      `json:"destination_definition_id"`
	SourceCategory          string      `json:"source_category"`
	RecordID                interface{} `json:"record_id"`
	MessageID               string      `json:"message_id"`
	WorkspaceId             string      `json:"workspaceId"`
	RudderAccountId         string      `json:"rudderAccountId"`
}

JobParametersT struct holds source id and destination id of a job

type LatestJobStatusCounts added in v0.1.10

type LatestJobStatusCounts struct {
	Count int
	State string
	Rank  int
}

type NetHandleI added in v0.1.10

type NetHandleI interface {
	SendPost(ctx context.Context, structData integrations.PostParametersT) *utils.SendPostResponse
}

Network interface

type NetHandleT

type NetHandleT struct {
	// contains filtered or unexported fields
}

NetHandleT is the wrapper holding private variables

func (*NetHandleT) SendPost added in v0.1.10

func (network *NetHandleT) SendPost(ctx context.Context, structData integrations.PostParametersT) *utils.SendPostResponse

SendPost takes the EventPayload of a transformed job, gets the necessary values from the payload and makes a call to destination to push the event to it this returns the statusCode, status and response body from the response of the destination call

func (*NetHandleT) Setup

func (network *NetHandleT) Setup(destID string, netClientTimeout time.Duration)

Setup initializes the module

type PauseT added in v0.1.10

type PauseT struct {
	// contains filtered or unexported fields
}

type ResponseHandlerI added in v0.1.10

type ResponseHandlerI interface {
	IsSuccessStatus(respCode int, respBody string) (returnCode int)
}

ResponseHandlerI - handle destination response

func New added in v0.1.10

func New(responseRules map[string]interface{}) ResponseHandlerI

New returns a destination response handler. Can be nil(Check before using this)

type RouterJobResponse added in v0.1.10

type RouterJobResponse struct {
	// contains filtered or unexported fields
}

type RouterRpcHandler added in v0.1.10

type RouterRpcHandler struct {
	// contains filtered or unexported fields
}

func (*RouterRpcHandler) GetDSFailedJobs added in v0.1.10

func (r *RouterRpcHandler) GetDSFailedJobs(arg string, result *string) (err error)

func (*RouterRpcHandler) GetDSJobCount added in v0.1.10

func (r *RouterRpcHandler) GetDSJobCount(arg string, result *string) (err error)

func (*RouterRpcHandler) GetDSList added in v0.1.10

func (r *RouterRpcHandler) GetDSList(_ string, result *string) (err error)

func (*RouterRpcHandler) GetDSStats added in v0.1.10

func (r *RouterRpcHandler) GetDSStats(dsName string, result *string) (err error)

GetDSStats group_by job_status group by custom_val Get all errors = distinct (error), count(*) where state=failed Distinct (src_id, dst_id) Router jobs status flow ⇒ ordered by rank unprocessed_params ⇒ Num jobs not yet picked

func (*RouterRpcHandler) GetJobByID added in v0.1.10

func (r *RouterRpcHandler) GetJobByID(arg string, result *string) (err error)

func (*RouterRpcHandler) GetJobIDStatus added in v0.1.10

func (r *RouterRpcHandler) GetJobIDStatus(arg string, result *string) (err error)

type TXTResponseHandler added in v0.1.10

type TXTResponseHandler struct {
	// contains filtered or unexported fields
}

TXTResponseHandler handler for text response

func (*TXTResponseHandler) IsSuccessStatus added in v0.1.10

func (handler *TXTResponseHandler) IsSuccessStatus(respCode int, _ string) (returnCode int)

IsSuccessStatus - returns the status code based on the response code and body

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL