Documentation ¶
Index ¶
- Constants
- func CheckIndexingJobsImageConfig(jobConfigs []JobConfig) error
- func GetCleanupTime() int
- func GetMaxJobConfig() int
- func GetRandString(n int) string
- func GetValueFromJSON(jsonBytes []byte, keys []string) (interface{}, error)
- func LookupCredFile() string
- func NewJobHandler() *jobHandler
- func NewSQSClient() (sqsiface.SQSAPI, error)
- func ReadFile(path string) ([]byte, error)
- func RegisterJob()
- func RegisterSystem()
- func StringContainsPrefixInSlice(s string, prefixList []string) bool
- type AWSCredentials
- type JobConfig
- type JobInfo
- type JobsArray
- type RetryMessage
- type SQSHandler
- func (handler *SQSHandler) GetIndexingJobStatus(w http.ResponseWriter, r *http.Request)
- func (handler *SQSHandler) HandleDispatchJob(w http.ResponseWriter, r *http.Request)
- func (handler *SQSHandler) HandleJobConfig(w http.ResponseWriter, r *http.Request)
- func (handler *SQSHandler) HandleSQSMessage(message *sqs.Message) error
- func (handler *SQSHandler) RegisterSQSHandler()
- func (handler *SQSHandler) RemoveCompletedJobsProcess()
- func (handler *SQSHandler) RemoveSQSMessage(message *sqs.Message) error
- func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error
- func (handler *SQSHandler) StartConsumingProcess() error
- func (handler *SQSHandler) StartMonitoringProcess()
- func (handler *SQSHandler) StartServer() error
Constants ¶
const (
GRACE_PERIOD int64 = 1 // grace period in seconds before a job is deleted
)
const (
MAX_RETRIES int = 3
)
Variables ¶
This section is empty.
Functions ¶
func CheckIndexingJobsImageConfig ¶
Check that all "indexing" jobs have both Indexd and Metadata Service creds configured. If not, return an error.
func GetCleanupTime ¶
func GetCleanupTime() int
GetCleanupTime returns the cleanuo time for completed jobs
func GetMaxJobConfig ¶
func GetMaxJobConfig() int
GetMaxJobConfig returns maximum number of jobs allowed to run simultanously
func GetRandString ¶
GetRandString returns a random string of lenght N
func GetValueFromJSON ¶
func NewJobHandler ¶
func NewJobHandler() *jobHandler
func NewSQSClient ¶
NewSQSClient create new SQSAPI client
func RegisterJob ¶
func RegisterJob()
Types ¶
type AWSCredentials ¶
type AWSCredentials struct {
// contains filtered or unexported fields
}
S3Credentials contains AWS credentials
type JobInfo ¶
type JobInfo struct { UID string `json:"uid"` Name string `json:"name"` Status string `json:"status"` URL string `json:"url"` SQSMessage *sqs.Message // contains filtered or unexported fields }
func CreateK8sJob ¶
CreateK8sJob creates a k8s job to handle s3 object
func (*JobInfo) DetailedStatus ¶
type RetryMessage ¶
type SQSHandler ¶
type SQSHandler struct { QueueURL string Start bool JobConfigs []JobConfig // contains filtered or unexported fields }
func NewSQSHandler ¶
func NewSQSHandler(queueURL string) *SQSHandler
NewSQSHandler creates new SQSHandler instance
func (*SQSHandler) GetIndexingJobStatus ¶
func (handler *SQSHandler) GetIndexingJobStatus(w http.ResponseWriter, r *http.Request)
GetIndexingJobStatus get indexing job status
func (*SQSHandler) HandleDispatchJob ¶
func (handler *SQSHandler) HandleDispatchJob(w http.ResponseWriter, r *http.Request)
HandleDispatchJob dispatch an job
func (*SQSHandler) HandleJobConfig ¶
func (handler *SQSHandler) HandleJobConfig(w http.ResponseWriter, r *http.Request)
HandleJobConfig handles job config endpoints to add a jobConfig
curl -X POST http://localhost/jobConfig -d `{"name": "usersync", "pattern": "s3://bucket/usersync.yaml", "image": "quay.io/cdis/fence:master", "imageConfig":{}}`
to delete jobConfig
curl -X DELETE http://localhost/jobConfig?pattern=s3://bucket/usersync.yaml
func (*SQSHandler) HandleSQSMessage ¶
func (handler *SQSHandler) HandleSQSMessage(message *sqs.Message) error
HandleSQSMessage handles SQS message
The function takes a sqs message as input, extract the object urls and decide which image need to be pulled to handle the s3 object based on the object url
A SQS message may contains multiple records. The service goes though all the records and compute the number of records need to be processed base on their url and jobConfig list. If the number is larger than the availbility of jobpool, the service will take a sleep until the resource is available.
If the function returns an error other than nil, the message is put back to the queue and retry later (handled by `md` library). That makes sure the message is properly handle before it actually deleted
func (*SQSHandler) RegisterSQSHandler ¶
func (handler *SQSHandler) RegisterSQSHandler()
RegisterSQSHandler registers endpoints
func (*SQSHandler) RemoveCompletedJobsProcess ¶
func (handler *SQSHandler) RemoveCompletedJobsProcess()
RemoveCompletedJobsProcess starts the process to remove completed jobs
func (*SQSHandler) RemoveSQSMessage ¶
func (handler *SQSHandler) RemoveSQSMessage(message *sqs.Message) error
RemoveSQSMessage removes SQS message
func (*SQSHandler) RetryCreateIndexingJob ¶
func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error
RetryCreateIndexingJob creates manually job
func (*SQSHandler) StartConsumingProcess ¶
func (handler *SQSHandler) StartConsumingProcess() error
StartConsumingProcess starts consuming the queue
func (*SQSHandler) StartMonitoringProcess ¶
func (handler *SQSHandler) StartMonitoringProcess()
StartMonitoringProcess starts the process to monitor the created job
func (*SQSHandler) StartServer ¶
func (handler *SQSHandler) StartServer() error
StartServer starts a server