repository

package
v0.3.66-rc-347635d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func JobStateStrToJSRState added in v0.3.63

func JobStateStrToJSRState(jobState string) (js.JobServiceResponse_State, error)

Types

type JSRepoPostgres added in v0.3.63

type JSRepoPostgres struct {
	// contains filtered or unexported fields
}

func NewJSRepoPostgres added in v0.3.63

func NewJSRepoPostgres(cfg *configuration.JobServiceConfiguration, log *log.Entry) (error, *JSRepoPostgres, func())

func (*JSRepoPostgres) AddMessageIdAndClearSubscriptionError added in v0.3.63

func (s *JSRepoPostgres) AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string,
	jobSet string, fromMessageId string,
) error

Clear subscription error if present

func (*JSRepoPostgres) CheckToUnSubscribe added in v0.3.63

func (s *JSRepoPostgres) CheckToUnSubscribe(ctx context.Context, queue string, jobSet string,
	configTimeWithoutUpdates int64,
) (bool, error)

Checks JobSet table to make determine if we should unsubscribe from JobSet configTimeWithoutUpdates is a configurable value that is read from the config We allow unsubscribing if the jobset hasn't been updated in configTime TODO implement this

func (*JSRepoPostgres) CleanupJobSetAndJobs added in v0.3.63

func (s *JSRepoPostgres) CleanupJobSetAndJobs(ctx context.Context, queue string, jobSet string) (int64, error)

UnSubscribe to JobSet and delete all the jobs in the database

func (*JSRepoPostgres) DeleteJobsInJobSet added in v0.3.63

func (s *JSRepoPostgres) DeleteJobsInJobSet(ctx context.Context, queue string, jobSet string) (int64, error)

Delete Jobs in the database

func (*JSRepoPostgres) GetJobStatus added in v0.3.63

func (s *JSRepoPostgres) GetJobStatus(ctx context.Context, jobId string) (*js.JobServiceResponse, error)

Get the JobStatus given the jodId

func (*JSRepoPostgres) GetSubscribedJobSets added in v0.3.63

func (s *JSRepoPostgres) GetSubscribedJobSets(ctx context.Context) ([]SubscribedTuple, error)

func (*JSRepoPostgres) GetSubscriptionError added in v0.3.63

func (s *JSRepoPostgres) GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error)

Get subscription error if present

func (*JSRepoPostgres) HealthCheck added in v0.3.63

func (s *JSRepoPostgres) HealthCheck(ctx context.Context) (bool, error)

func (*JSRepoPostgres) IsJobSetSubscribed added in v0.3.63

func (s *JSRepoPostgres) IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error)

Check if JobSet is in our map.

func (*JSRepoPostgres) SetSubscriptionError added in v0.3.63

func (s *JSRepoPostgres) SetSubscriptionError(ctx context.Context, queue string, jobSet string,
	connErr string, fromMessageId string,
) error

Set subscription error if present

func (*JSRepoPostgres) Setup added in v0.3.63

func (s *JSRepoPostgres) Setup(ctx context.Context)

Set up the DB for use, create tables

func (*JSRepoPostgres) SubscribeJobSet added in v0.3.63

func (s *JSRepoPostgres) SubscribeJobSet(ctx context.Context, queue string, jobSet string,
	fromMessageId string,
) error

Mark our JobSet as being subscribed SubscribeTable contains Queue, JobSet and time when it was created.

func (*JSRepoPostgres) UnsubscribeJobSet added in v0.3.63

func (s *JSRepoPostgres) UnsubscribeJobSet(ctx context.Context, queue, jobSet string) (int64, error)

func (*JSRepoPostgres) UpdateJobServiceDb added in v0.3.63

func (s *JSRepoPostgres) UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error

Update database with JobTable.

func (*JSRepoPostgres) UpdateJobSetDb added in v0.3.63

func (s *JSRepoPostgres) UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error

type JSRepoSQLite added in v0.3.63

type JSRepoSQLite struct {
	// contains filtered or unexported fields
}

JSRepoSQLite for persisting to DB.

func NewJSRepoSQLite added in v0.3.63

func NewJSRepoSQLite(config *configuration.JobServiceConfiguration, log *log.Entry) (error, *JSRepoSQLite, func())

func (*JSRepoSQLite) AddMessageIdAndClearSubscriptionError added in v0.3.63

func (s *JSRepoSQLite) AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string,
	jobSet string, fromMessageId string,
) error

Clear subscription error if present

func (*JSRepoSQLite) CheckToUnSubscribe added in v0.3.63

func (s *JSRepoSQLite) CheckToUnSubscribe(ctx context.Context, queue string, jobSet string,
	configTimeWithoutUpdates int64,
) (bool, error)

Checks JobSet table to make determine if we should unsubscribe from JobSet configTimeWithoutUpdates is a configurable value that is read from the config We allow unsubscribing if the jobset hasn't been updated in configTime TODO implement this

func (*JSRepoSQLite) CleanupJobSetAndJobs added in v0.3.63

func (s *JSRepoSQLite) CleanupJobSetAndJobs(ctx context.Context, queue string, jobSet string) (int64, error)

UnSubscribe to JobSet and delete all the jobs in the database

func (*JSRepoSQLite) DeleteJobsInJobSet added in v0.3.63

func (s *JSRepoSQLite) DeleteJobsInJobSet(ctx context.Context, queue string, jobSet string) (int64, error)

Delete Jobs in the database

func (*JSRepoSQLite) GetJobStatus added in v0.3.63

func (s *JSRepoSQLite) GetJobStatus(ctx context.Context, jobId string) (*js.JobServiceResponse, error)

Get the JobStatus given the jodId

func (*JSRepoSQLite) GetSubscribedJobSets added in v0.3.63

func (s *JSRepoSQLite) GetSubscribedJobSets(ctx context.Context) ([]SubscribedTuple, error)

func (*JSRepoSQLite) GetSubscriptionError added in v0.3.63

func (s *JSRepoSQLite) GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error)

Get subscription error if present

func (*JSRepoSQLite) HealthCheck added in v0.3.63

func (s *JSRepoSQLite) HealthCheck(ctx context.Context) (bool, error)

func (*JSRepoSQLite) IsJobSetSubscribed added in v0.3.63

func (s *JSRepoSQLite) IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error)

Check if JobSet is in our map.

func (*JSRepoSQLite) SetSubscriptionError added in v0.3.63

func (s *JSRepoSQLite) SetSubscriptionError(ctx context.Context, queue string, jobSet string,
	connErr string, fromMessageId string,
) error

Set subscription error if present

func (*JSRepoSQLite) Setup added in v0.3.63

func (s *JSRepoSQLite) Setup(ctx context.Context)

Set up the DB for use, create tables

func (*JSRepoSQLite) SubscribeJobSet added in v0.3.63

func (s *JSRepoSQLite) SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error

Mark our JobSet as being subscribed SubscribeTable contains Queue, JobSet and time when it was created.

func (*JSRepoSQLite) UnsubscribeJobSet added in v0.3.63

func (s *JSRepoSQLite) UnsubscribeJobSet(ctx context.Context, queue, jobSet string) (int64, error)

func (*JSRepoSQLite) UpdateJobServiceDb added in v0.3.63

func (s *JSRepoSQLite) UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error

Update database with JobTable.

func (*JSRepoSQLite) UpdateJobSetDb added in v0.3.63

func (s *JSRepoSQLite) UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error

type JobStatus

type JobStatus struct {
	// contains filtered or unexported fields
}

JobStatus represents a job status

func NewJobStatus

func NewJobStatus(queue string, jobSetId string, jobId string, jobResponse js.JobServiceResponse) *JobStatus

NewJobStatus combines params into struct JobStatus and adds a timestamp

type JobTableUpdater

type JobTableUpdater interface {
	SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error
	IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error)
	UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error
	UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error
	SetSubscriptionError(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error
	GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error)
	AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, jobSet string, messageId string) error
	UnsubscribeJobSet(ctx context.Context, queue string, jobSet string) (int64, error)
}

type JobTableUpdaterMock

type JobTableUpdaterMock struct {
	// AddMessageIdAndClearSubscriptionErrorFunc mocks the AddMessageIdAndClearSubscriptionError method.
	AddMessageIdAndClearSubscriptionErrorFunc func(ctx context.Context, queue string, jobSet string, messageId string) error

	// GetSubscriptionErrorFunc mocks the GetSubscriptionError method.
	GetSubscriptionErrorFunc func(ctx context.Context, queue string, jobSet string) (string, error)

	// IsJobSetSubscribedFunc mocks the IsJobSetSubscribed method.
	IsJobSetSubscribedFunc func(ctx context.Context, queue string, jobSet string) (bool, string, error)

	// SetSubscriptionErrorFunc mocks the SetSubscriptionError method.
	SetSubscriptionErrorFunc func(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error

	// SubscribeJobSetFunc mocks the SubscribeJobSet method.
	SubscribeJobSetFunc func(ctx context.Context, queue string, jobSet string, fromMessageId string) error

	// UnsubscribeJobSetFunc mocks the UnsubscribeJobSet method.
	UnsubscribeJobSetFunc func(ctx context.Context, queue string, jobSet string) (int64, error)

	// UpdateJobServiceDbFunc mocks the UpdateJobServiceDb method.
	UpdateJobServiceDbFunc func(ctx context.Context, jobTable *JobStatus) error

	// UpdateJobSetDbFunc mocks the UpdateJobSetDb method.
	UpdateJobSetDbFunc func(ctx context.Context, queue string, jobSet string, fromMessageId string) error
	// contains filtered or unexported fields
}

JobTableUpdaterMock is a mock implementation of JobTableUpdater.

func TestSomethingThatUsesJobTableUpdater(t *testing.T) {

	// make and configure a mocked JobTableUpdater
	mockedJobTableUpdater := &JobTableUpdaterMock{
		AddMessageIdAndClearSubscriptionErrorFunc: func(ctx context.Context, queue string, jobSet string, messageId string) error {
			panic("mock out the AddMessageIdAndClearSubscriptionError method")
		},
		GetSubscriptionErrorFunc: func(ctx context.Context, queue string, jobSet string) (string, error) {
			panic("mock out the GetSubscriptionError method")
		},
		IsJobSetSubscribedFunc: func(ctx context.Context, queue string, jobSet string) (bool, string, error) {
			panic("mock out the IsJobSetSubscribed method")
		},
		SetSubscriptionErrorFunc: func(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error {
			panic("mock out the SetSubscriptionError method")
		},
		SubscribeJobSetFunc: func(ctx context.Context, queue string, jobSet string, fromMessageId string) error {
			panic("mock out the SubscribeJobSet method")
		},
		UnsubscribeJobSetFunc: func(ctx context.Context, queue string, jobSet string) (int64, error) {
			panic("mock out the UnsubscribeJobSet method")
		},
		UpdateJobServiceDbFunc: func(ctx context.Context, jobTable *JobStatus) error {
			panic("mock out the UpdateJobServiceDb method")
		},
		UpdateJobSetDbFunc: func(ctx context.Context, queue string, jobSet string, fromMessageId string) error {
			panic("mock out the UpdateJobSetDb method")
		},
	}

	// use mockedJobTableUpdater in code that requires JobTableUpdater
	// and then make assertions.

}

func (*JobTableUpdaterMock) AddMessageIdAndClearSubscriptionError added in v0.3.60

func (mock *JobTableUpdaterMock) AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, jobSet string, messageId string) error

AddMessageIdAndClearSubscriptionError calls AddMessageIdAndClearSubscriptionErrorFunc.

func (*JobTableUpdaterMock) AddMessageIdAndClearSubscriptionErrorCalls added in v0.3.63

func (mock *JobTableUpdaterMock) AddMessageIdAndClearSubscriptionErrorCalls() []struct {
	Ctx       context.Context
	Queue     string
	JobSet    string
	MessageId string
}

AddMessageIdAndClearSubscriptionErrorCalls gets all the calls that were made to AddMessageIdAndClearSubscriptionError. Check the length with:

len(mockedJobTableUpdater.AddMessageIdAndClearSubscriptionErrorCalls())

func (*JobTableUpdaterMock) GetSubscriptionError

func (mock *JobTableUpdaterMock) GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error)

GetSubscriptionError calls GetSubscriptionErrorFunc.

func (*JobTableUpdaterMock) GetSubscriptionErrorCalls

func (mock *JobTableUpdaterMock) GetSubscriptionErrorCalls() []struct {
	Ctx    context.Context
	Queue  string
	JobSet string
}

GetSubscriptionErrorCalls gets all the calls that were made to GetSubscriptionError. Check the length with:

len(mockedJobTableUpdater.GetSubscriptionErrorCalls())

func (*JobTableUpdaterMock) IsJobSetSubscribed

func (mock *JobTableUpdaterMock) IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error)

IsJobSetSubscribed calls IsJobSetSubscribedFunc.

func (*JobTableUpdaterMock) IsJobSetSubscribedCalls

func (mock *JobTableUpdaterMock) IsJobSetSubscribedCalls() []struct {
	Ctx    context.Context
	Queue  string
	JobSet string
}

IsJobSetSubscribedCalls gets all the calls that were made to IsJobSetSubscribed. Check the length with:

len(mockedJobTableUpdater.IsJobSetSubscribedCalls())

func (*JobTableUpdaterMock) SetSubscriptionError

func (mock *JobTableUpdaterMock) SetSubscriptionError(ctx context.Context, queue string, jobSet string, err string, fromMessageId string) error

SetSubscriptionError calls SetSubscriptionErrorFunc.

func (*JobTableUpdaterMock) SetSubscriptionErrorCalls

func (mock *JobTableUpdaterMock) SetSubscriptionErrorCalls() []struct {
	Ctx           context.Context
	Queue         string
	JobSet        string
	Err           string
	FromMessageId string
}

SetSubscriptionErrorCalls gets all the calls that were made to SetSubscriptionError. Check the length with:

len(mockedJobTableUpdater.SetSubscriptionErrorCalls())

func (*JobTableUpdaterMock) SubscribeJobSet

func (mock *JobTableUpdaterMock) SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error

SubscribeJobSet calls SubscribeJobSetFunc.

func (*JobTableUpdaterMock) SubscribeJobSetCalls

func (mock *JobTableUpdaterMock) SubscribeJobSetCalls() []struct {
	Ctx           context.Context
	Queue         string
	JobSet        string
	FromMessageId string
}

SubscribeJobSetCalls gets all the calls that were made to SubscribeJobSet. Check the length with:

len(mockedJobTableUpdater.SubscribeJobSetCalls())

func (*JobTableUpdaterMock) UnsubscribeJobSet added in v0.3.60

func (mock *JobTableUpdaterMock) UnsubscribeJobSet(ctx context.Context, queue string, jobSet string) (int64, error)

UnsubscribeJobSet calls UnsubscribeJobSetFunc.

func (*JobTableUpdaterMock) UnsubscribeJobSetCalls added in v0.3.63

func (mock *JobTableUpdaterMock) UnsubscribeJobSetCalls() []struct {
	Ctx    context.Context
	Queue  string
	JobSet string
}

UnsubscribeJobSetCalls gets all the calls that were made to UnsubscribeJobSet. Check the length with:

len(mockedJobTableUpdater.UnsubscribeJobSetCalls())

func (*JobTableUpdaterMock) UpdateJobServiceDb

func (mock *JobTableUpdaterMock) UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error

UpdateJobServiceDb calls UpdateJobServiceDbFunc.

func (*JobTableUpdaterMock) UpdateJobServiceDbCalls

func (mock *JobTableUpdaterMock) UpdateJobServiceDbCalls() []struct {
	Ctx      context.Context
	JobTable *JobStatus
}

UpdateJobServiceDbCalls gets all the calls that were made to UpdateJobServiceDb. Check the length with:

len(mockedJobTableUpdater.UpdateJobServiceDbCalls())

func (*JobTableUpdaterMock) UpdateJobSetDb added in v0.3.60

func (mock *JobTableUpdaterMock) UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error

UpdateJobSetDb calls UpdateJobSetDbFunc.

func (*JobTableUpdaterMock) UpdateJobSetDbCalls added in v0.3.60

func (mock *JobTableUpdaterMock) UpdateJobSetDbCalls() []struct {
	Ctx           context.Context
	Queue         string
	JobSet        string
	FromMessageId string
}

UpdateJobSetDbCalls gets all the calls that were made to UpdateJobSetDb. Check the length with:

len(mockedJobTableUpdater.UpdateJobSetDbCalls())

type SQLJobService

type SQLJobService interface {
	AddMessageIdAndClearSubscriptionError(ctx context.Context, queue string, jobSet string, fromMessageId string) error
	CheckToUnSubscribe(ctx context.Context, queue string, jobSet string, configTimeWithoutUpdates int64) (bool, error)
	CleanupJobSetAndJobs(ctx context.Context, queue string, jobSet string) (int64, error)
	DeleteJobsInJobSet(ctx context.Context, queue string, jobSet string) (int64, error)
	GetJobStatus(ctx context.Context, jobId string) (*js.JobServiceResponse, error)
	GetSubscribedJobSets(ctx context.Context) ([]SubscribedTuple, error)
	GetSubscriptionError(ctx context.Context, queue string, jobSet string) (string, error)
	HealthCheck(ctx context.Context) (bool, error)
	IsJobSetSubscribed(ctx context.Context, queue string, jobSet string) (bool, string, error)
	SetSubscriptionError(ctx context.Context, queue string, jobSet string, connErr string, fromMessageId string) error
	Setup(ctx context.Context)
	SubscribeJobSet(ctx context.Context, queue string, jobSet string, fromMessageId string) error
	UnsubscribeJobSet(ctx context.Context, queue, jobSet string) (int64, error)
	UpdateJobServiceDb(ctx context.Context, jobTable *JobStatus) error
	UpdateJobSetDb(ctx context.Context, queue string, jobSet string, fromMessageId string) error
}

SQLJobService for persisting to DB.

func NewSQLJobService

func NewSQLJobService(cfg *configuration.JobServiceConfiguration, log *log.Entry) (error, SQLJobService, func())

type SubscribeTable

type SubscribeTable struct {
	// contains filtered or unexported fields
}

func NewSubscribeTable

func NewSubscribeTable(queue string, jobSet string) *SubscribeTable

type SubscribedTuple

type SubscribedTuple struct {
	Queue         string
	JobSet        string
	FromMessageId string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL