Documentation ¶
Index ¶
- Constants
- func ExtractJobIdFromKey(path string, key *stream.FlowKeyValue) (string, error)
- func NewEtcdStreamConsumer(ctx context.Context, cli *clientv3.Client, config *StreamConsumerConfiguration) (*streamConsumer, error)
- func NewStreamConsumer(ctx context.Context, config *StreamConsumerConfiguration, jobStore JobStore, ...) *streamConsumer
- func RunJobFlowUntil(ctx context.Context, cli *clientv3.Client, path string, ...) error
- type Job
- type JobHandler
- type JobParameters
- type JobStore
- type Producer
- type Progress
- type RunOption
- type StreamConsumerConfiguration
Constants ¶
View Source
const ( StateFinished = "FINISHED" StateErrored = "ERRORED" StateRunning = "RUNNING" )
Variables ¶
This section is empty.
Functions ¶
func ExtractJobIdFromKey ¶
func ExtractJobIdFromKey(path string, key *stream.FlowKeyValue) (string, error)
func NewEtcdStreamConsumer ¶
func NewStreamConsumer ¶
func NewStreamConsumer(ctx context.Context, config *StreamConsumerConfiguration, jobStore JobStore, flow horizontal.Flow) *streamConsumer
Types ¶
type Job ¶
type Job struct { Id string `json:"Id"` TaskName string `json:"TaskName"` Args JobParameters `json:"Args"` State string `json:"State"` CreatedAt time.Time `json:"CreatedAt"` UpdatedAt time.Time `json:"UpdatedAt"` Result string `json:"Result"` MaxRetry int64 `json:"MaxRetry"` CurrentRetry int64 `json:"CurrentRetry"` Errors []string `json:"Errors"` // attached to the job when picked up by the consumer ConsumerName string `json:"ConsumerName"` }
func NewJobWithRetry ¶
func NewJobWithRetry(taskName string, args JobParameters, maxRetry int) (string, *Job)
type JobHandler ¶
type JobParameters ¶
type JobParameters map[string]interface{}
type JobStore ¶
type JobStore interface { Get(ctx context.Context, jobId string) (*Job, error) MarkFinished(ctx context.Context, jobId string) error MarkFailed(ctx context.Context, jobId string, err error) error SaveResult(ctx context.Context, jobId string, result interface{}) error MarkRunning(ctx context.Context, jobId string) error // Touch only updates the UpdatedAt field of the job it doesn't affect any other fields Touch(ctx context.Context, jobId string) error }
func NewEtcdJobStore ¶
type Producer ¶
type Progress ¶
type Progress struct {
// contains filtered or unexported fields
}
func (*Progress) RunWithProgressResult ¶
type StreamConsumerConfiguration ¶
type StreamConsumerConfiguration struct { Path string `validate:"required" yaml:"Path"` ConsumerName string `validate:"required" yaml:"ConsumerName"` Concurrency int `validate:"required" yaml:"Concurrency"` // optional defaults to 5ecs NextRetry time.Duration `yaml:"NextRetry"` RunningNoUpdate time.Duration `yaml:"RunningNoUpdate"` FromEnd bool `yaml:"FromEnd"` // how often to set the job.UpdateAt field to keep the job running HeartBeatInterval time.Duration `yaml:"HeartBeatInterval"` RetentionPeriod time.Duration `yaml:"RetentionPeriod"` }
Click to show internal directories.
Click to hide internal directories.