Documentation
¶
Index ¶
Constants ¶
const (
TRIGGER_STATE_NEW = "new"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job[JC any] struct { Id string // Id is a unique identifier for the job C JC // C holds the job specific context State string // State represents the current processing state of the job StateErrors map[string][]string // StateErrors is a map of errors that occurred in the current state LastUpdate *time.Time // The last time this job was fetched }
Job represents the current processing state of any job
func (Job[JC]) UpdateLastEvent ¶ added in v0.0.7
UpdateLastEvent updates the LastUpdate field of the Job struct to the current time.
type JsonSerializer ¶
JsonSerializer is a struct that implements Serializer and stores and loads run from a file specified in the File field, there is a anonymous variable type check
func NewJsonSerializer ¶
func NewJsonSerializer[OC any, JC any](file string) *JsonSerializer[OC, JC]
NewJsonSerializer create a new instance of the JsonSerializer struct. It takes a single argument `file` of type string, which represents the file path where the serialized run data will be stored or loaded from.
func (JsonSerializer[OC, JC]) Deserialize ¶
func (js JsonSerializer[OC, JC]) Deserialize() (*Run[OC, JC], error)
Deserialize reads the serialized Run[OC, JC] data from the file specified when creating the JsonSerializer instance, deserializes the JSON data into a Run[OC, JC] instance, and returns the deserialized Run instance.
If any error occurs during the process, such as opening the file or decoding the JSON data, the function returns a zero-value Run[OC, JC] instance and the error.
Returns:
Run[OC, JC]: The deserialized Run instance. error: An error value if the deserialization or file reading operation fails, otherwise nil.
func (JsonSerializer[OC, JC]) Serialize ¶
func (js JsonSerializer[OC, JC]) Serialize(run Run[OC, JC]) error
Serialize takes a Run[OC, JC] instance and serializes it to JSON format, writing the serialized data to the file specified when creating the JsonSerializer instance. It creates the parent directory for the file if it doesn't exist, and creates the file if it doesn't exist.
If any error occurs during the process, such as creating the directory, creating the file, or encoding the Run instance, the function returns the error.
Parameters:
run Run[OC, JC]: The Run instance to be serialized.
Returns:
error: An error value if the serialization or file writing operation fails, otherwise nil.
type KickRequest ¶
KickRequest struct is a job context with a requested state that the framework will expand into an actual job
type NilSerializer ¶
NilSerializer implements the Serializer interface with no-op implementations of the Serialize and Deserialize methods. It is useful when you don't need to persist or load Run instances, and is used as the default by NewProcessor if you don't specify one
func (*NilSerializer[OC, JC]) Deserialize ¶
func (n *NilSerializer[OC, JC]) Deserialize() (*Run[OC, JC], error)
Deserialize is a no-op implementation that panics with a "not implemented" message. It satisfies the Serializer interface's Deserialize method requirement, but it should never be called in practice when using the NilSerializer.
func (*NilSerializer[OC, JC]) Serialize ¶
func (n *NilSerializer[OC, JC]) Serialize(run Run[OC, JC]) error
Serialize is a no-op implementation that does nothing and always returns nil. It satisfies the Serializer interface's Serialize method requirement.
type NilStatusListener ¶
type NilStatusListener struct { }
NilStatusListener is a struct that implements the StatusListener interface with a no-op implementation of the StatusUpdate method. It is useful when you don't need to receive status updates or when you want to use a dummy status listener.
func (NilStatusListener) StatusUpdate ¶
func (n NilStatusListener) StatusUpdate(status []StatusCount)
StatusUpdate is a no-op implementation that does nothing. It satisfies the StatusListener interface's StatusUpdate method requirement.
type Processor ¶
Processor executes a job
func NewProcessor ¶
func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) (*Processor[AC, OC, JC], error)
type Return ¶
type Return[JC any] struct { PriorState string Job Job[JC] KickRequests []KickRequest[JC] }
Return is a struct that contains a job and a list of kick requests that is used for returning job updates to the system
type Run ¶
type Run[OC any, JC any] struct { Name string // Name of the run Jobs map[string]Job[JC] // Map of jobs, where keys are job ids and values are Job states Overall OC // Overall overall state that is usful to all jobs, basically context for the overall batch // contains filtered or unexported fields }
Run is basically the overall state of a given run (batch) in the processing framework it's meant to be re-entrant, eg if you kill the processor and you have a serializaer, you can restart using it at any time
func NewRun ¶
NewRun creates a new Run instance with the given name and overall context
Use the overall context to store any state that all of the jobs will want access to instead of storing it in the specific JobContexts
func (*Run[OC, JC]) AddJob ¶
func (r *Run[OC, JC]) AddJob(jc JC)
Add a job to the pool, this shouldn't be called once it's running
func (*Run[OC, JC]) AddJobWithState ¶ added in v0.0.9
type Serializer ¶
type Serializer[OC any, JC any] interface { Serialize(r Run[OC, JC]) error Deserialize() (*Run[OC, JC], error) }
Serializer is an interface for job run seralization
type State ¶
type State[AC any, OC any, JC any] struct { // TriggerState is the string identifier for this state. TriggerState string // Exec is a function that executes the logic for jobs in this state. // It takes the application context (AC), overall context (OC), and job context (JC) as input, // and returns the updated job context (JC), the next state string, // a slice of kick requests ([]KickRequest[JC]) for triggering other jobs, // and an error (if any). Exec func(ctx context.Context, ac AC, oc OC, jc JC) (JC, string, []KickRequest[JC], error) // Terminal indicates whether this state is a terminal state, // meaning that no further state transitions should occur after reaching this state. Terminal bool // Concurrency specifies the maximum number of concurrent executions allowed for this state. Concurrency int // RateLimit is an optional rate limiter for controlling the execution rate of this state. Useful when calling rate limited apis. RateLimit *rate.Limiter }
State represents a state in a state machine for job processing. It defines the behavior and configuration for a particular state.
type StatusCount ¶
type StatusListener ¶
type StatusListener interface { // StatusUpdate is called by the processor to provide an update on the current // status of job processing. The `status` parameter is a slice of StatusCount // instances, where each instance represents the count of jobs in a particular state. // // The status counts will be in the same order as the states passed to the processor StatusUpdate(status []StatusCount) }
StatusListener is an interface that defines a method for receiving status updates. It is used by the processor to notify interested parties about the current status of job processing.