Documentation ¶
Overview ¶
Package store provides the development kit for working with a database-as-a-queue, so the jobs queue can be persisted in a datastore.
At any point in time, destinations will be in a state of failure. By using a store, in tandem with a scheduler, Blacksmith applications are sure to build a reliable system for delivering jobs at scale and keep track of successes, failures, and discards.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var AvailableAdapters = map[string]bool{ "postgres": true, }
AvailableAdapters is a list of available store adapters.
var Defaults = &Options{ Context: context.Background(), }
Defaults are the defaults options set for the store. When not set, these values will automatically be applied.
var InterfaceStore = "store"
InterfaceStore is the string representation for the store interface.
var StatusAcknowledged = "acknowledged"
StatusAcknowledged is used to mark a job status as acknowledged. This is used when receiveing new jobs from the gateway into the store.
var StatusAwaiting = "awaiting"
StatusAwaiting is used to mark a job status as awaiting. This is used when a job is awaiting to be run.
var StatusDiscarded = "discarded"
StatusDiscarded is used to mark a job status as discarded. This is used when a job reached the maximum retries possible so it will not try to run again.
var StatusExecuting = "executing"
StatusExecuting is used to mark a job status as executing. This is used when a job is being executed.
var StatusFailed = "failed"
StatusFailed is used to mark a job status as failed.
var StatusSucceeded = "succeeded"
StatusSucceeded is used to mark a job status as succeeded.
var StatusUnknown = "unknown"
StatusUnknown is used to mark a job status as unknown. This is used when the scheduler is not aware of a job's status. This only happen when an action does not return the job ID(s) in the "Then" channel. There is no way for the scheduler to associate the job ID(s) to the error or the actions to execute so it can only marks the status as unknown.
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct { // ID is the unique identifier of the event. It must be a valid KSUID. // // Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ" ID string `json:"id"` // Source is the string representation of the incoming event's source. Source string `json:"source"` // Trigger is the string representation of the incoming or awaiting event. Trigger string `json:"trigger"` // Version is the version number of the source used by the event's payload // when triggered. // // Examples: "v1.0", "2020-10-01" Version string `json:"version,omitempty"` // Context is the marshaled representation of the "context" key presents in the // event's payload. Context []byte `json:"context,omitempty"` // Data is the marshaled representation of the "data" key presents in the event's // payload. Data []byte `json:"data,omitempty"` // Jobs is a list of jobs to execute related to the event. A destination should // have at most 1 job per event. Jobs []*Job `json:"jobs"` // SentAt is the timestamp of when the event is originally sent by the source. // It can be nil if none was provided. SentAt *time.Time `json:"sent_at,omitempty"` // ReceivedAt is the timestamp of when the event is received by the gateway. // This shall always be overridden by the gateway. ReceivedAt time.Time `json:"received_at"` // IngestedAt is a timestamp of the event creation date into the store. // It can be nil if none was provided. IngestedAt *time.Time `json:"ingested_at,omitempty"` }
Event define the fields stored in the datastore about an event.
type Job ¶
type Job struct { // ID is the unique identifier of the job. It must be a valid KSUID. // // Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ" ID string `json:"id"` // Destination is the string representation of the destination the job needs to // run to. Destination string `json:"destination"` // Action is the string representation of the action to execute against the // destination. Action string `json:"action"` // Version is the version number of the destination used by a flow when executed. // // Examples: "v1.0", "2020-10-01" Version string `json:"version,omitempty"` // Context is the marshaled representation of the "context" key presents in the // event's payload when the destination's event has been marshaled. Context []byte `json:"context,omitempty"` // Data is the marshaled representation of the "data" key presents in the event's // payload when the destination's event has been marshaled. Data []byte `json:"data,omitempty"` // Transitions is an array of the job's transitions. It is used to keep track of // successes, failures, and errors so the store is aware of the job's status. // // Note: It is up to the adapter to only return the latest job's transition since // this is the only one that really matters in this context. Transitions [1]*Transition `json:"transitions"` // CreatedAt is a timestamp of the job creation date into the store. CreatedAt time.Time `json:"created_at"` // EventID is the ID of the event related to this job. EventID string `json:"event_id,omitempty"` // ParentJobID is the ID of the parent job ID. ParentJobID *string `json:"parent_job_id,omitempty"` }
Job is the definition of a job that needs to run for a given action against a specific destination.
type Meta ¶ added in v0.11.0
type Meta struct { // Count is the number of entries found that match the constraints applied to // the query (without the limit). Count uint16 `json:"count"` // Pagination is the pagination details based on the count, offset, and limit. Pagination *Pagination `json:"pagination"` // Where is the constraints applied to the query to find events, jobs, or // transitions. This is included in the meta because the store can set defaults // or override some constraints (such as a maximum limit). This allows to be aware // of the constraints actually applied to the query. Where *WhereEvents `json:"where"` }
Meta includes information about the query's result returned by the store when looking for entries (events, jobs, or transitions).
type Options ¶
type Options struct { // From is used to set the desired store adapter. It must be one of // AvailableAdapters. From string `json:"from,omitempty"` // Context is a free key-value dictionary that will be passed to the underlying // adapter. Context context.Context `json:"-"` // Connection is the connection string to connect to the store. Connection string `json:"-"` }
Options is the options a user can pass to use the store adapter.
type Pagination ¶ added in v0.11.0
type Pagination struct { // Current is the current page. Current uint16 `json:"current"` // Previous is the previous page. It will be nil if there is no previous page. Previous *uint16 `json:"previous"` // Next is the next page. It will be nil if there is no next page. Next *uint16 `json:"next"` // First is the first page. It will always be 1. First uint16 `json:"first"` // Last is the last page. Last uint16 `json:"last"` }
Pagination holds the pagination details when looking for entries into the store.
type Queue ¶
type Queue struct { // Events is the collection of incoming or awaiting events. Events []*Event `json:"events"` }
Queue keeps track of incoming events, their jobs, and their jobs' transitions.
type Store ¶
type Store interface { // String returns the string representation of the adapter. // // Example: "postgres" String() string // Options returns the options originally passed to the Options struct. This // can be used to validate and override user's options if necessary. Options() *Options // AddEvents inserts a queue of events into the datastore given the data passed // in params. It returns an error if any occurred. AddEvents(*Toolkit, []*Event) error // FindEvent returns a event given the event ID passed in params. FindEvent(*Toolkit, string) (*Event, error) // FindEvents returns a list of events matching the constraints passed in params. // It also returns meta information about the query, such as pagination and the // constraints really applied to it. FindEvents(*Toolkit, *WhereEvents) ([]*Event, *Meta, error) // AddJobs inserts a list of jobs into the datastore. AddJobs(*Toolkit, []*Job) error // FindJob returns a job given the job ID passed in params. FindJob(*Toolkit, string) (*Job, error) // FindJobs returns a list of jobs matching the constraints passed in params. // It also returns meta information about the query, such as pagination and the // constraints really applied to it. FindJobs(*Toolkit, *WhereEvents) ([]*Job, *Meta, error) // AddTransitions inserts a list of transitions into the datastore to update // their related job status. We insert new transitions instead of updating the // job itself to keep track of the job's history. AddTransitions(*Toolkit, []*Transition) error // FindTransition returns a transition given the transition ID passed in params. FindTransition(*Toolkit, string) (*Transition, error) // FindTransitions returns a list of transitions matching the constraints passed // in params. It also returns meta information about the query, such as pagination // and the constraints really applied to it. FindTransitions(*Toolkit, *WhereEvents) ([]*Transition, *Meta, error) }
Store is the interface used to persist the jobs queue in a datastore to keep track of jobs states.
type Toolkit ¶
type Toolkit struct { // Logger gives access to the logrus Logger passed in options when creating the // Blacksmith application. Logger *logrus.Logger }
Toolkit contains a suite of utilities and data to help the adapter successfully run the store functions.
type Transition ¶
type Transition struct { // ID is the unique identifier of the transition. It must be a valid KSUID. // // Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ" ID string `json:"id"` // Attempt represents the number of tentatives that the job has run before // succeeded. Attempt uint16 `json:"attempt"` // StateBefore is the state of the job before running the new transition. This // shall be nil when receiving the job from the gateway. StateBefore *string `json:"state_before"` // StateAfter is the state of the job after running the new transition. StateAfter string `json:"state_after"` // Error keeps track of encountered error if any. Error error `json:"error,omitempty"` // CreatedAt is a timestamp of the transition creation date into the store. CreatedAt time.Time `json:"created_at"` // EventID is the ID of the event related to this job's transition. EventID string `json:"event_id,omitempty"` // JobID is the ID of the job related to this transition. JobID string `json:"job_id,omitempty"` }
Transition represents a job's transition to keep track of its states.
type WhereEvents ¶ added in v0.11.0
type WhereEvents struct { // SourcesIn makes sure the entries returned by the query have any of the source // name present in the slice. SourcesIn []string `json:"sources_in,omitempty"` // SourcesNotIn makes sure the entries returned by the query do not have any // of the source name present in the slice. SourcesNotIn []string `json:"sources_notin,omitempty"` // TriggersIn makes sure the entries returned by the query have any of the source's // event name present in the slice. TriggersIn []string `json:"triggers_in,omitempty"` // TriggersNotIn makes sure the entries returned by the query do not have any // of the source's event name present in the slice. TriggersNotIn []string `json:"triggers_notin,omitempty"` // CreatedBefore makes sure the entries returned by the query are related to an // event created before this instant. CreatedBefore *time.Time `json:"created_before,omitempty"` // CreatedAfter makes sure the entries returned by the query are related to an // event created after this instant. CreatedAfter *time.Time `json:"created_after,omitempty"` // AndWhereJobs lets you define additional constraints related to the jobs for // the entries you are looking for. AndWhereJobs *WhereJobs `json:"jobs,omitempty"` // Offset specifies the number of entries to skip before starting to return entries // from the query. Offset uint16 `json:"offset,omitempty"` // Limit specifies the number of entries to return after the offset clause has // been processed. Limit uint16 `json:"limit,omitempty"` }
WhereEvents is used to set constraints on the events when looking for entries into the store.
type WhereJobs ¶ added in v0.11.0
type WhereJobs struct { // EventID allows to find every entries related to a specific event ID. // // Note: When set, other constraints are not applied (except parent offset and // limit). EventID string `json:"event_id,omitempty"` // DestinationsIn makes sure the entries returned by the query have any of the // destination name present in the slice. DestinationsIn []string `json:"destinations_in,omitempty"` // DestinationsNotIn makes sure the entries returned by the query do not have any // of the destination name present in the slice. DestinationsNotIn []string `json:"destinations_notin,omitempty"` // ActionsIn makes sure the entries returned by the query have any of the destination's // event name present in the slice. ActionsIn []string `json:"actions_in,omitempty"` // ActionsNotIn makes sure the entries returned by the query do not have any of // the destination's event name present in the slice. ActionsNotIn []string `json:"actions_notin,omitempty"` // CreatedBefore makes sure the entries returned by the query are related to a // job created before this instant. CreatedBefore *time.Time `json:"created_before,omitempty"` // CreatedAfter makes sure the entries returned by the query are related to a // job created after this instant. CreatedAfter *time.Time `json:"created_after,omitempty"` // AndWhereTransitions lets you define additional constraints related to the // transitions for the entries you are looking for. AndWhereTransitions *WhereTransitions `json:"transitions,omitempty"` }
WhereJobs is used to set constraints on jobs when looking for entries into the store.
type WhereTransitions ¶ added in v0.11.0
type WhereTransitions struct { // JobID allows to find every entries related to a specific job ID. // // Note: When set, other constraints are not applied (except parent offset and // limit). JobID string `json:"job_id,omitempty"` // StatusIn makes sure the entries returned by the query have any of the status // present in the slice. StatusIn []string `json:"status_in,omitempty"` // StatusNotIn makes sure the entries returned by the query do not have any of // the status present in the slice. StatusNotIn []string `json:"status_notin,omitempty"` // MinAttempts makes sure the entries returned by the query have equal to or greater // than this number of attempts. MinAttempts uint16 `json:"min_attempts,omitempty"` // MaxAttempts makes sure the entries returned by the query have equal to or lesser // than this number of attempts. MaxAttempts uint16 `json:"max_attempts,omitempty"` // CreatedBefore makes sure the entries returned by the query are related to a // transition created before this instant. CreatedBefore *time.Time `json:"created_before,omitempty"` // CreatedAfter makes sure the entries returned by the query are related to a // transition created after this instant. CreatedAfter *time.Time `json:"created_after,omitempty"` }
WhereTransitions is used to set constraints on transitions when looking for entries into the store.