Documentation ¶
Overview ¶
Package store provides the development kit for working with a database-as-a-queue, so the jobs queue can be persisted in a datastore.
At any point in time, destinations will be in a state of failure. By using a store, in tandem with a scheduler, Blacksmith applications are sure to build a reliable system for delivering events at scale and keep track of successes, failures, errors, and jobs' transitions.
The store shall be immutable: no rows shall be updated or removed.
A store adapter can be generated using the Blacksmith CLI:
$ blacksmith generate store
Note: Adapter generation using the Blacksmith CLI is a feature only available in Blacksmith Enterprise.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var AvailableAdapters = map[string]bool{ "postgres": true, }
AvailableAdapters is a list of available store adapters.
var Defaults = &Options{}
Defaults are the defaults options set for the store. When not set, these values will automatically be applied.
var InterfaceStore = "store"
InterfaceStore is the string representation for the store interface.
var StatusAcknowledged = "acknowledged"
StatusAcknowledged is used to mark a job as acknowledged. This is used when registering new jobs into the store.
var StatusAwaiting = "awaiting"
StatusAwaiting is used to mark a job as awaiting. This is used when a job is awaiting to be run.
var StatusDiscarded = "discarded"
StatusDiscarded is used to mark a job as discarded.
var StatusExecuting = "executing"
StatusExecuting is used to mark a job as executing. This is used when a job is being executed.
var StatusFailed = "failed"
StatusFailed is used to mark a job as failed.
var StatusSucceeded = "succeeded"
StatusSucceeded is used to mark a job as succeeded.
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct { // ID is the unique identifier of the event. It must be a valid KSUID. // // Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ" ID string `json:"id"` // Source is the string representation of the incoming event's source. Source string `json:"source"` // Name is the string representation of the incoming or awaiting event. Name string `json:"name"` // Context is the marshaled representation of the "context" key presents in the // event's payload. Context []byte `json:"context"` // Data is the marshaled representation of the "data" key presents in the event's // payload. Data []byte `json:"data"` // Jobs is a list of jobs to execute related to the event. A destination should // have at most 2 jobs per event: a wildcard and the specific event. Jobs []*Job `json:"jobs,omitempty"` // SentAt is the timestamp of when the event is originally sent by the source. // It can be nil if none was provided. SentAt *time.Time `json:"sent_at,omitempty"` // ReceivedAt is the timestamp of when the event is received by the gateway. // This shall always be overridden by the gateway. ReceivedAt time.Time `json:"received_at"` // IngestedAt is a timestamp of the event creation date into the store. // It can be nil if none was provided. IngestedAt *time.Time `json:"ingested_at,omitempty"` }
Event define the fields stores in the datastore about an event.
type Job ¶
type Job struct { // ID is the unique identifier of the job. It must be a valid KSUID. // // Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ" ID string `json:"id"` // Destination is the string representation of the destination the job needs to // run to. Destination string `json:"destination"` // Event is the string representation of the incoming or awaiting event. It is // also present in the job so we can make a distinction between specific events // and wildcard events. Event string `json:"event"` // Context is the marshaled representation of the "context" key presents in the // event's payload when the destination's event has been marshaled. Context []byte `json:"context,omitempty"` // Data is the marshaled representation of the "data" key presents in the event's // payload when the destination's event has been marshaled. Data []byte `json:"data,omitempty"` // Transitions is an array of the job's transitions. It is used to keep track of // successes, failures, and errors so the store is aware of the job's status. // It is up to the adapter to only return the latest job's transition since this // is the only one that really matters. Transitions [1]*Transition `json:"transitions,omitempty"` // CreatedAt is a timestamp of the job creation date into the store. CreatedAt time.Time `json:"created_at"` // EventID is the ID of the event related to this job. This is here for convenience // and should not be included in results if used in an API. EventID string `json:"-"` // ParentJobID is the ID of the parent job ID. This is here for convenience and // should not be included in results if used in an API. ParentJobID *string `json:"-"` }
Job is the definition of a job that needs to run for a given event and a specific destination.
type Options ¶
type Options struct { // From can be used to download, install, and use an existing adapter. This way // the user does not need to develop a custom store adapter. From string // Load can be used to load and use a custom store adapter developed in-house. Load Store // Context is a free key-value dictionary that will be passed to the underlying // adapter. Context context.Context // Connection is the connection string to connect to the store. Connection string }
Options is the options a user can pass to create a new store.
func (*Options) ValidateAndLoad ¶
ValidateAndLoad validates the store's options and returns a valid store interface.
type Queue ¶
type Queue struct { // Events is the collection of incoming or awaiting events. Events []*Event `json:"events,omitempty"` }
Queue keeps track of incoming events, their jobs, and their jobs' transitions.
type Store ¶
type Store interface { // String returns the string representation of the adapter. // // Example: "postgres" String() string // Options returns the options originally passed to the Options struct. This // can be used to validate and override user's options if necessary. Options() *Options // Migrations returns the list of all migrations for the store, regardless // their status. // // The adapter can use the package helper/sqlike to easily read migrations files // from a directory. See package helper/sqlike for more details. // // Note: Feature only available in Blacksmith Enterprise. Migrations() ([]*wanderer.Migration, error) // Migrate is the function called for running a migration for the store. This // function is the migration logic for running every migrations of the store. // When being executed, the function has access to a toolkit and the desired // migration. // // It is important to understand that it is up to the adapter to run the migration // within a transaction (when applicable). // // Note: Feature only available in Blacksmith Enterprise. Migrate(*wanderer.Toolkit, *wanderer.Migration) error // InsertQueue inserts a queue of events into the datastore given the data passed // in params. It returns an error if any occurred. This method shall be called // by the gateway when a new event happens. InsertQueue(*Toolkit, *Queue) error // AddJobs inserts a list of jobs into the datastore. AddJobs(*Toolkit, []*Job) error // AddTransitions inserts a list of transitions into the datastore to update // their related job status. We insert new transitions instead of updating the // job itself to keep track of job history. This method shall be called by the // scheduler for registering jobs transitions. AddTransitions(*Toolkit, []*Transition) error // Find returns a list of acknowledged jobs' events given some properties passed // in params. Returned jobs are grouped by events. Find(*Toolkit, *WhereJob) ([]*Event, error) // Delete deletes a list of given events from the store. It is useful to clear // unused events so the store can be more performant. Delete(*Toolkit, []*Event) error }
Store is the interface used to persist the jobs queue in a datastore to keep track of jobs states.
type Toolkit ¶
type Toolkit struct { // Logger gives access to the logrus Logger passed in options when creating the // Blacksmith application. Logger *logrus.Logger }
Toolkit contains a suite of utilities and data to help the user successfully run the functions against the store.
type Transition ¶
type Transition struct { // ID is the unique identifier of the transition. It must be a valid KSUID. // // Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ" ID string `json:"id"` // Attempt represents the number of tentatives that the job has run before // succeeded. Attempt uint16 `json:"attempt"` // StateBefore is the state of the job before running the new transition. See // status details for more info. This shall be nil when receiving the job from // the gateway. StateBefore *string `json:"state_before"` // StateAfter is the state of the job after running the new transition. See // status details for more info. StateAfter string `json:"state_after"` // Error keeps track of encountered error if any. Error error `json:"error,omitempty"` // CreatedAt is a timestamp of the transition creation date into the store. CreatedAt time.Time `json:"created_at"` // EventID is the ID of the event related to this job's transition. This is here // for convenience and should not be included in results if used in an API. EventID string `json:"-"` // JobID is the ID of the job related to this transition. This is here for // convenience and should not be included in results if used in an API. JobID string `json:"-"` }
Transition represents a job's transition to keep track of its states.
type WhereJob ¶
type WhereJob struct { // DestinationsIn contains the destinations where the jobs destinations match // at least one element in the list. DestinationsIn []string `json:"destinations_in"` // EventsIn contains the events where the jobs events match at least one element // in the list. EventsIn []string `json:"events_in"` // StatusIn contains the status where the jobs status match at least one element // in the list. StatusIn []string `json:"status_in"` // StatusNotIn contains the status where the jobs status must not match at least // one element in the list. StatusNotIn []string `json:"status_notin"` // MaxAttempts defines the maximum number of attempts of the jobs looking for. MaxAttempts uint16 `json:"max_attempts,omitempty"` }
WhereJob is used to find events' jobs in the datastore.