store

package
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2020 License: Apache-2.0 Imports: 7 Imported by: 7

Documentation

Overview

Package store provides the development kit for working with a database-as-a-queue, so the jobs queue can be persisted in a datastore.

At any point in time, destinations will be in a state of failure. By using a store, in tandem with a scheduler, Blacksmith applications are sure to build a reliable system for delivering events at scale and keep track of successes, failures, errors, and jobs' transitions.

The store shall be immutable: no rows shall be updated or removed.

A store adapter can be generated using the Blacksmith CLI:

$ blacksmith generate store

Note: Adapter generation using the Blacksmith CLI is a feature only available in Blacksmith Enterprise.

Index

Constants

This section is empty.

Variables

View Source
var AvailableAdapters = map[string]bool{
	"postgres": true,
}

AvailableAdapters is a list of available store adapters.

View Source
var Defaults = &Options{}

Defaults are the defaults options set for the store. When not set, these values will automatically be applied.

View Source
var InterfaceStore = "store"

InterfaceStore is the string representation for the store interface.

View Source
var StatusAcknowledged = "acknowledged"

StatusAcknowledged is used to mark a job as acknowledged. This is used when registering new jobs into the store.

View Source
var StatusAwaiting = "awaiting"

StatusAwaiting is used to mark a job as awaiting. This is used when a job is awaiting to be run.

View Source
var StatusDiscarded = "discarded"

StatusDiscarded is used to mark a job as discarded.

View Source
var StatusExecuting = "executing"

StatusExecuting is used to mark a job as executing. This is used when a job is being executed.

View Source
var StatusFailed = "failed"

StatusFailed is used to mark a job as failed.

View Source
var StatusSucceeded = "succeeded"

StatusSucceeded is used to mark a job as succeeded.

Functions

This section is empty.

Types

type Event

type Event struct {

	// ID is the unique identifier of the event. It must be a valid KSUID.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ID string `json:"id"`

	// Source is the string representation of the incoming event's source.
	Source string `json:"source"`

	// Name is the string representation of the incoming or awaiting event.
	Name string `json:"name"`

	// Context is the marshaled representation of the "context" key presents in the
	// event's payload.
	Context []byte `json:"context"`

	// Data is the marshaled representation of the "data" key presents in the event's
	// payload.
	Data []byte `json:"data"`

	// Jobs is a list of jobs to execute related to the event. A destination should
	// have at most 2 jobs per event: a wildcard and the specific event.
	Jobs []*Job `json:"jobs,omitempty"`

	// SentAt is the timestamp of when the event is originally sent by the source.
	// It can be nil if none was provided.
	SentAt *time.Time `json:"sent_at,omitempty"`

	// ReceivedAt is the timestamp of when the event is received by the gateway.
	// This shall always be overridden by the gateway.
	ReceivedAt time.Time `json:"received_at"`

	// IngestedAt is a timestamp of the event creation date into the store.
	// It can be nil if none was provided.
	IngestedAt *time.Time `json:"ingested_at,omitempty"`
}

Event define the fields stores in the datastore about an event.

type Job

type Job struct {

	// ID is the unique identifier of the job. It must be a valid KSUID.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ID string `json:"id"`

	// Destination is the string representation of the destination the job needs to
	// run to.
	Destination string `json:"destination"`

	// Event is the string representation of the incoming or awaiting event. It is
	// also present in the job so we can make a distinction between specific events
	// and wildcard events.
	Event string `json:"event"`

	// Context is the marshaled representation of the "context" key presents in the
	// event's payload when the destination's event has been marshaled.
	Context []byte `json:"context,omitempty"`

	// Data is the marshaled representation of the "data" key presents in the event's
	// payload when the destination's event has been marshaled.
	Data []byte `json:"data,omitempty"`

	// Transitions is an array of the job's transitions. It is used to keep track of
	// successes, failures, and errors so the store is aware of the job's status.
	// It is up to the adapter to only return the latest job's transition since this
	// is the only one that really matters.
	Transitions [1]*Transition `json:"transitions,omitempty"`

	// CreatedAt is a timestamp of the job creation date into the store.
	CreatedAt time.Time `json:"created_at"`

	// EventID is the ID of the event related to this job. This is here for convenience
	// and should not be included in results if used in an API.
	EventID string `json:"-"`

	// ParentJobID is the ID of the parent job ID. This is here for convenience and
	// should not be included in results if used in an API.
	ParentJobID *string `json:"-"`
}

Job is the definition of a job that needs to run for a given event and a specific destination.

type Options

type Options struct {

	// From can be used to download, install, and use an existing adapter. This way
	// the user does not need to develop a custom store adapter.
	From string

	// Load can be used to load and use a custom store adapter developed in-house.
	Load Store

	// Context is a free key-value dictionary that will be passed to the underlying
	// adapter.
	Context context.Context

	// Connection is the connection string to connect to the store.
	Connection string
}

Options is the options a user can pass to create a new store.

func (*Options) ValidateAndLoad

func (opts *Options) ValidateAndLoad() (Store, error)

ValidateAndLoad validates the store's options and returns a valid store interface.

type Queue

type Queue struct {

	// Events is the collection of incoming or awaiting events.
	Events []*Event `json:"events,omitempty"`
}

Queue keeps track of incoming events, their jobs, and their jobs' transitions.

type Store

type Store interface {

	// String returns the string representation of the adapter.
	//
	// Example: "postgres"
	String() string

	// Options returns the options originally passed to the Options struct. This
	// can be used to validate and override user's options if necessary.
	Options() *Options

	// Migrations returns the list of all migrations for the store, regardless
	// their status.
	//
	// The adapter can use the package helper/sqlike to easily read migrations files
	// from a directory. See package helper/sqlike for more details.
	//
	// Note: Feature only available in Blacksmith Enterprise.
	Migrations() ([]*wanderer.Migration, error)

	// Migrate is the function called for running a migration for the store. This
	// function is the migration logic for running every migrations of the store.
	// When being executed, the function has access to a toolkit and the desired
	// migration.
	//
	// It is important to understand that it is up to the adapter to run the migration
	// within a transaction (when applicable).
	//
	// Note: Feature only available in Blacksmith Enterprise.
	Migrate(*wanderer.Toolkit, *wanderer.Migration) error

	// InsertQueue inserts a queue of events into the datastore given the data passed
	// in params. It returns an error if any occurred. This method shall be called
	// by the gateway when a new event happens.
	InsertQueue(*Toolkit, *Queue) error

	// AddJobs inserts a list of jobs into the datastore.
	AddJobs(*Toolkit, []*Job) error

	// AddTransitions inserts a list of transitions into the datastore to update
	// their related job status. We insert new transitions instead of updating the
	// job itself to keep track of job history. This method shall be called by the
	// scheduler for registering jobs transitions.
	AddTransitions(*Toolkit, []*Transition) error

	// Find returns a list of acknowledged jobs' events given some properties passed
	// in params. Returned jobs are grouped by events.
	Find(*Toolkit, *WhereJob) ([]*Event, error)

	// Delete deletes a list of given events from the store. It is useful to clear
	// unused events so the store can be more performant.
	Delete(*Toolkit, []*Event) error
}

Store is the interface used to persist the jobs queue in a datastore to keep track of jobs states.

type Toolkit

type Toolkit struct {

	// Logger gives access to the logrus Logger passed in options when creating the
	// Blacksmith application.
	Logger *logrus.Logger
}

Toolkit contains a suite of utilities and data to help the user successfully run the functions against the store.

type Transition

type Transition struct {

	// ID is the unique identifier of the transition. It must be a valid KSUID.
	//
	// Example: "1UYc8EebLqCAFMOSkbYZdJwNLAJ"
	ID string `json:"id"`

	// Attempt represents the number of tentatives that the job has run before
	// succeeded.
	Attempt uint16 `json:"attempt"`

	// StateBefore is the state of the job before running the new transition. See
	// status details for more info. This shall be nil when receiving the job from
	// the gateway.
	StateBefore *string `json:"state_before"`

	// StateAfter is the state of the job after running the new transition. See
	// status details for more info.
	StateAfter string `json:"state_after"`

	// Error keeps track of encountered error if any.
	Error error `json:"error,omitempty"`

	// CreatedAt is a timestamp of the transition creation date into the store.
	CreatedAt time.Time `json:"created_at"`

	// EventID is the ID of the event related to this job's transition. This is here
	// for convenience and should not be included in results if used in an API.
	EventID string `json:"-"`

	// JobID is the ID of the job related to this transition. This is here for
	// convenience and should not be included in results if used in an API.
	JobID string `json:"-"`
}

Transition represents a job's transition to keep track of its states.

type WhereJob

type WhereJob struct {

	// DestinationsIn contains the destinations where the jobs destinations match
	// at least one element in the list.
	DestinationsIn []string `json:"destinations_in"`

	// EventsIn contains the events where the jobs events match at least one element
	// in the list.
	EventsIn []string `json:"events_in"`

	// StatusIn contains the status where the jobs status match at least one element
	// in the list.
	StatusIn []string `json:"status_in"`

	// StatusNotIn contains the status where the jobs status must not match at least
	// one element in the list.
	StatusNotIn []string `json:"status_notin"`

	// MaxAttempts defines the maximum number of attempts of the jobs looking for.
	MaxAttempts uint16 `json:"max_attempts,omitempty"`
}

WhereJob is used to find events' jobs in the datastore.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL