Documentation
¶
Index ¶
- Variables
- func EncodePosition(id api.UUID, pos EnvelopeReadPos) *api.EnvelopePosition
- func NoOpProcessEventStatusHandler(Process, api.Process_Status, api.Process_Status)
- func NoopActorEventHandler(api.Actor)
- func VoidProcessLogEntryHandler([]ProcessLogEntry)
- type AccountStorage
- type ActorEventHandler
- type ActorStorage
- type Edge
- type Emission
- type EnvelopeFilter
- type EnvelopeReadPos
- type EnvelopeState
- type EnvelopeStorage
- type EnvelopeStorageEventHandler
- type EnvelopeStorageReceptionStatus
- type EventReadPos
- type EventState
- type Graph
- type Job
- type JobID
- type JobList
- type JobState
- type JobStorage
- type LogStorage
- type Node
- type NodeInputRef
- type NodeOutputRef
- type NodeType
- type PasswordStorage
- type Pipeline
- type PipelineStorage
- type PostMortemState
- type Process
- type ProcessEventStatusHandler
- type ProcessLogEntry
- type ProcessLogEntryHandler
- type ProcessNode
- type ProcessNodeEnd
- type ProcessNodeIOStatus
- type ProcessNodeIOStatusMap
- type ProcessNodeInput
- type ProcessNodeState
- type ProcessNodeStateMap
- type ProcessNodeStatus
- type ProcessState
- type ProcessStorage
- type Reception
- type ReceptionStatus
- type Session
- type SessionStorage
- type SourceMatch
- type StateUpdater
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidPipelineStatus is returned when the pipeline status does not // allow the operation ErrInvalidPipelineStatus = errors.New("Invalid Pipeline Status") )
var ( // ErrNoSuchID is returned when a non-existing id is passed to some function ErrNoSuchID = errors.New("No such ID") )
Functions ¶
func EncodePosition ¶
func EncodePosition(id api.UUID, pos EnvelopeReadPos) *api.EnvelopePosition
EncodePosition translate a storage.EnvelopeReadPos into a api.EnvelopePosition
func NoOpProcessEventStatusHandler ¶
func NoOpProcessEventStatusHandler(Process, api.Process_Status, api.Process_Status)
NoOpProcessEventStatusHandler is a noop handler
func NoopActorEventHandler ¶
NoopActorEventHandler is an ActorEventHandler that does nothing
func VoidProcessLogEntryHandler ¶
func VoidProcessLogEntryHandler([]ProcessLogEntry)
VoidProcessLogEntryHandler is a noop ProcessLogEntryHandler
Types ¶
type AccountStorage ¶
type AccountStorage interface { Get(id api.UUID) (api.Account, bool) GetByName(name string) (api.Account, bool) GetByCert(cert string) (api.Account, bool) GetByAPIKey(key string) (api.Account, bool) Create(api.Account) (api.Account, error) Update(api.Account) (api.Account, error) Delete(api.Account) List() []api.Account }
AccountStorage is in charge of storing account definitions. When multiple instances are running, CRUD operations be reflected from one to the others
type ActorEventHandler ¶
ActorEventHandler is the type for callbacks
type ActorStorage ¶
type ActorStorage interface { Get(api.UUID) (api.Actor, bool) GetByName(name string) (api.Actor, bool) List() []api.Actor ListAccount(accountID api.UUID) []api.Actor ListKind(actorKind api.Actor_Kind) []api.Actor ListRole(actorKind api.Actor_Kind, role string) []api.Actor Create(api.Actor) (api.Actor, error) Delete(api.UUID) Update(api.Actor) (api.Actor, error) MarkUnresponsive(id api.UUID, decisionTime time.Time) error JustSeen(api.UUID) error SetOnline(api.UUID, bool) error SetEventHandlers(onActorUp, onActorDown ActorEventHandler) }
ActorStorage is in charge of storing the actor definitions When multiple instances are running, CRUD operations be reflected from one to the others
type Edge ¶
type Edge struct { From NodeOutputRef To NodeInputRef }
Edge is a Graph oriented edge
func EdgeFromString ¶
EdgeFromString unserialize a Edge
func (Edge) MarshalYAML ¶
MarshalYAML marshals Edge to string
func (*Edge) UnmarshalYAML ¶
UnmarshalYAML unmarshals Edge from string
type Emission ¶
type Emission struct { ProcessID api.UUID EnvelopeID api.UUID TargetActor api.UUID TargetInputRef NodeInputRef }
Emission identifies the emission of an envelope to a target
type EnvelopeFilter ¶
EnvelopeFilter filters a batch of envelope ids at once
type EnvelopeReadPos ¶
type EnvelopeReadPos struct { Events map[api.UUID]EventReadPos Start bool Complete bool }
EnvelopeReadPos contains the current reading position of each event of an envelope It can be used to retrieve the next part of an envelope later
func DecodePosition ¶
func DecodePosition(apiPos api.EnvelopePosition) EnvelopeReadPos
DecodePosition translate a api.EnvelopePosition into a storage.EnvelopeReadPos
func EnvelopeStartPos ¶
func EnvelopeStartPos() EnvelopeReadPos
EnvelopeStartPos returns a new EnvelopeReadPos pointing the beginning of the envelope
func (EnvelopeReadPos) Clone ¶
func (p EnvelopeReadPos) Clone() EnvelopeReadPos
Clone duplicate the position
func (EnvelopeReadPos) Equals ¶
func (p EnvelopeReadPos) Equals(other EnvelopeReadPos) bool
Equals returns true if postions are equals
type EnvelopeState ¶
type EnvelopeState struct { ID api.UUID TypesKnown bool // true if the type of all the events is known Status EnvelopeStorageReceptionStatus Err error Events []EventState }
EnvelopeState contains the detailled status of an envelope, allowing the broker to make decisions
func (EnvelopeState) Error ¶
func (s EnvelopeState) Error() string
Error implements the error interface in case State is "error"
func (EnvelopeState) EventTypes ¶
func (s EnvelopeState) EventTypes() []string
EventTypes returns the event types or nil if not fully known yet
type EnvelopeStorage ¶
type EnvelopeStorage interface { // Set the event handler. Any current handler gets overridden. // nil can be passed to remove the current handler. SetEventHandler(EnvelopeStorageEventHandler) // Store part or all fo an envelope // The returned EnvelopeState reflect the final state of the complete // envelope, once the given part is merge to it. // // Concurrency & ordering: // The envelope parts can be passed in any order, from different // goroutines, and to different instance of the Storage in different // processes all pointed to the same backend. // // Error handling: // It any error occur, including validity errors (which are reported // within the EnvelopeState), the envelope should be marked as 'error' // in the storage, and no other part of the envelope should be accepted. StoreEnvelope(api.Envelope) (EnvelopeState, error) // CheckStalled mark the given envelope as stalled if no fragment was // received in the given time _and_ the envelope is in state "Receiving" // Returns true if the envelope is indeed stalled // If the envelope is already marked stalled, the given time is ignored. CheckStalled(api.UUID, time.Duration) bool // GetEventTypes returns the eventtypes of an envelope, or nil if unknown // exists is false if the envelope is unknown, true if exists. // should panic on any internal error. GetEventTypes(id api.UUID) (types []string, exists bool) ReadEnvelope(id api.UUID, position EnvelopeReadPos, maxsize int) (api.Envelope, EnvelopeReadPos, error) Purge(EnvelopeFilter) }
EnvelopeStorage can store partial envelopes, track the completions of the events inside it, serve back the data in chunks
type EnvelopeStorageEventHandler ¶
type EnvelopeStorageEventHandler func( envelopeID api.UUID, status EnvelopeStorageReceptionStatus, newData bool)
EnvelopeStorageEventHandler is a handler for EnvelopeStorage events. newData is true is more data is now available on the envelope
type EnvelopeStorageReceptionStatus ¶
type EnvelopeStorageReceptionStatus int
EnvelopeStorageReceptionStatus is the reception status of an envelope or an event
const ( // EnvelopeStorageReceptionUnknown is when the reception status is unknwown EnvelopeStorageReceptionUnknown EnvelopeStorageReceptionStatus = iota // EnvelopeStorageReceptionReceiving is when the reception is not finished and // a piece of data was received not long ago EnvelopeStorageReceptionReceiving // EnvelopeStorageReceptionComplete is when the envelope or event is fully arrived EnvelopeStorageReceptionComplete // EnvelopeStorageReceptionError is when an error occured EnvelopeStorageReceptionError // EnvelopeStorageReceptionStalled is when an envelope misses some fragment // and nothing comes in for a given time and some storage client is waiting EnvelopeStorageReceptionStalled )
func (EnvelopeStorageReceptionStatus) MarshalText ¶
func (i EnvelopeStorageReceptionStatus) MarshalText() ([]byte, error)
func (EnvelopeStorageReceptionStatus) String ¶
func (i EnvelopeStorageReceptionStatus) String() string
func (*EnvelopeStorageReceptionStatus) UnmarshalText ¶
func (i *EnvelopeStorageReceptionStatus) UnmarshalText(text []byte) error
type EventReadPos ¶
EventReadPos containts the current reading position of an event
type EventState ¶
type EventState struct { ID api.UUID Type string AvailableItems uint64 // number of items that can be read in a sequence TotalItems uint64 // total number of items or 0 if unknown Status EnvelopeStorageReceptionStatus Err error }
EventState contains the detailled reception state of an event
func (EventState) Error ¶
func (s EventState) Error() string
Error implements the error interface in case Status is "error"
type JobState ¶
type JobState struct { Detached bool Status api.ActorProcessingState_Status EnvelopeIDs []api.UUID }
JobState contains all the job state
type JobStorage ¶
type JobStorage interface { NewJob(JobID, JobState) SetState(JobID, JobState) GetState(JobID) JobState ListPendingJobByActor(actor api.UUID, after *JobID, limit int) []Job ListJobByProcess(api.UUID) []Job ListRunningJobByEnvelopeID(api.UUID) []Job ListDetachedJobByActor(actorID api.UUID) (JobList, error) Purge([]api.UUID) error }
JobStorage persists the jobs
type LogStorage ¶
type LogStorage interface { Log(api.LogEntry) GetProcessMessages(processID api.UUID, level api.LogLevel) []api.LogEntry PurgeBefore(time.Time) int64 }
A LogStorage stores log entries and can return them given some filters
type Node ¶
type Node struct { ID string Type NodeType ActorIDs []api.UUID // Explicit actors. Deprecated (will be removed in a later version) Actors []string // Explicit actors, by name or UUID Roles []string // Actor roles RoleBroadcast bool // If true, all actors matching each role will receive the message, instead of only one actor per role SourceMatch *SourceMatch // define only if Type==EmitterNode Inputs []string Outputs []string }
Node is a graph vertex
type NodeInputRef ¶
NodeInputRef is a reference to a node input
func NodeInputRefFromString ¶
func NodeInputRefFromString(str string) (ref NodeInputRef, err error)
NodeInputRefFromString loads a NodeInputRef
func (NodeInputRef) MarshalYAML ¶
func (r NodeInputRef) MarshalYAML() (interface{}, error)
MarshalYAML marshals NodeInputRef to string
func (*NodeInputRef) UnmarshalYAML ¶
func (r *NodeInputRef) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML unmarshals NodeInputRef from string
type NodeOutputRef ¶
NodeOutputRef is a reference to a node input
func NodeOutputRefFromString ¶
func NodeOutputRefFromString(str string) (ref NodeOutputRef, err error)
NodeOutputRefFromString loads a NodeInputRef
func (NodeOutputRef) MarshalYAML ¶
func (r NodeOutputRef) MarshalYAML() (interface{}, error)
MarshalYAML marshals NodeOutputRef to string
func (NodeOutputRef) String ¶
func (r NodeOutputRef) String() string
String serialize a NodeOutputRef
func (*NodeOutputRef) UnmarshalYAML ¶
func (r *NodeOutputRef) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML unmarshals NodeOutputRef from string
type NodeType ¶
type NodeType int
NodeType is the type of Node
func (NodeType) MarshalText ¶
func (*NodeType) UnmarshalText ¶
type PasswordStorage ¶
type PasswordStorage interface { Set(id api.UUID, password string) Reset(id api.UUID) Verify(id api.UUID, password string) bool }
PasswordStorage stores passwords in a safe manner
type Pipeline ¶
type Pipeline struct { api.PipelineInfo Graph Graph }
Pipeline defines how an envelope should be processed
type PipelineStorage ¶
type PipelineStorage interface { Load(api.UUID) (Pipeline, bool) Save(Pipeline) (api.UUID, error) SetStatus(api.UUID, api.PipelineInfo_Status) error LoadActivePipeline() []Pipeline Query(name, version string, activeOnly bool) []Pipeline }
PipelineStorage handle Pipeline persistence
type PostMortemState ¶
type PostMortemState struct { Level api.LogLevel Status api.PMProcess_Status Comment string Changed time.Time }
PostMortemState describes the post-mortem state of a Process
type Process ¶
type Process struct { ID api.UUID TriggerEmitterID api.UUID TriggerEnvelopeID api.UUID CreatedAt time.Time PipelineID api.UUID SourceOutput NodeOutputRef }
Process contains the data necessary for process persistence
type ProcessEventStatusHandler ¶
type ProcessEventStatusHandler func(p Process, old api.Process_Status, new api.Process_Status)
ProcessEventStatusHandler is the type of function that handle status changes
type ProcessLogEntry ¶
type ProcessLogEntry struct { ID string ProcessID api.UUID Timestamp time.Time Emission *Emission `json:",omitempty"` EmissionPosition *EnvelopeReadPos `json:",omitempty"` Reception *Reception `json:",omitempty"` ReceptionStatus ReceptionStatus `json:",omitempty"` ProcessNodeEnd *ProcessNodeEnd `json:",omitempty"` ResultAck bool `json:",omitempty"` }
ProcessLogEntry is the unique source of information for process tracking.
type ProcessLogEntryHandler ¶
type ProcessLogEntryHandler func(entries []ProcessLogEntry)
ProcessLogEntryHandler is the type of functions that handle process log entries
type ProcessNode ¶
A ProcessNode is a live pipeline graph node. One graph node can correspond to several ProcessNode (especially consumer nodes)
func ProcessNodeFromString ¶
func ProcessNodeFromString(s string) (ProcessNode, error)
ProcessNodeFromString deserialize a ProcessNode
func (ProcessNode) MarshalText ¶
func (n ProcessNode) MarshalText() ([]byte, error)
MarshalText implements encoding.TextMarshaler for ProcessNode
func (*ProcessNode) UnmarshalText ¶
func (n *ProcessNode) UnmarshalText(text []byte) (err error)
UnmarshalText implements encoding.TextUnmarshaler for ProcessNode
type ProcessNodeEnd ¶
type ProcessNodeEnd struct { Node ProcessNode Status api.ActorProcessingState_Status }
ProcessNodeEnd signals the end of processing by a process node
type ProcessNodeIOStatus ¶
type ProcessNodeIOStatus int
ProcessNodeIOStatus reflects the status of the inputs or outputs of a node
const ( // ProcessNodeIOPending is when no io started yet ProcessNodeIOPending ProcessNodeIOStatus = iota // ProcessNodeIOStarted is when one or more io started emitting ProcessNodeIOStarted // ProcessNodeIOAllStarted is when all io started emitting ProcessNodeIOAllStarted // ProcessNodeIOClosed is when all the io is closed without any transmission ProcessNodeIOClosed // ProcessNodeIODone is when all io finished emitting ProcessNodeIODone // ProcessNodeIOError is when an error occured on one of the io ProcessNodeIOError )
func (ProcessNodeIOStatus) MarshalText ¶
func (i ProcessNodeIOStatus) MarshalText() ([]byte, error)
func (ProcessNodeIOStatus) String ¶
func (i ProcessNodeIOStatus) String() string
func (*ProcessNodeIOStatus) UnmarshalText ¶
func (i *ProcessNodeIOStatus) UnmarshalText(text []byte) error
type ProcessNodeIOStatusMap ¶
type ProcessNodeIOStatusMap map[string]ProcessNodeIOStatus
ProcessNodeIOStatusMap is a map of emission or reception statuses
type ProcessNodeInput ¶
ProcessNodeInput describes which envelope does on a given input
type ProcessNodeState ¶
type ProcessNodeState struct { Status ProcessNodeStatus Error string Input ProcessNodeIOStatus InputStatuses ProcessNodeIOStatusMap Output ProcessNodeIOStatus OutputStatuses ProcessNodeIOStatusMap }
ProcessNodeState reflect the current state of a process node.
type ProcessNodeStateMap ¶
type ProcessNodeStateMap map[ProcessNode]ProcessNodeState
ProcessNodeStateMap contains states of process nodes
func (ProcessNodeStateMap) MarshalYAML ¶
func (m ProcessNodeStateMap) MarshalYAML() (interface{}, error)
MarshalYAML as an ordered dict
type ProcessNodeStatus ¶
type ProcessNodeStatus int
ProcessNodeStatus is the process node status
const ( // ProcessNodeStatusPending is before node activation ProcessNodeStatusPending ProcessNodeStatus = iota // ProcessNodeActive is when any input or output is active ProcessNodeActive // ProcessNodeDone is when all input/output are done transmitting ProcessNodeDone // ProcessNodeUnreachable is when every path leading to the node // are closed and the node will never be active ProcessNodeUnreachable // ProcessNodeError is when an error occured on the node ProcessNodeError )
func (ProcessNodeStatus) MarshalText ¶
func (i ProcessNodeStatus) MarshalText() ([]byte, error)
func (ProcessNodeStatus) String ¶
func (i ProcessNodeStatus) String() string
func (*ProcessNodeStatus) UnmarshalText ¶
func (i *ProcessNodeStatus) UnmarshalText(text []byte) error
type ProcessState ¶
type ProcessState struct { Status api.Process_Status StatusChanged time.Time StatusReason string NodeStates ProcessNodeStateMap NodeStatesChanged time.Time ResponseEnvelopeID api.UUID ResultAcked bool }
ProcessState reflect the current state of a process execution. It is completely updated by applying incoming logentries
func (ProcessState) Fmt ¶
func (s ProcessState) Fmt() string
Fmt returns a formatted string of the state
type ProcessStorage ¶
type ProcessStorage interface { GetProcess(api.UUID) (Process, bool) GetProcessByTrigger(emitter api.Actor, envelopeID api.UUID) (Process, bool) Query(filter api.ProcessFilter) []Process FindProcessForEnvelope(envelopeIDs []api.UUID) map[api.UUID]api.UUID CreateProcess(Process) (api.UUID, error) SetPipeline(processID api.UUID, pipelineID api.UUID, sourceOutput NodeOutputRef) GetTargets( processID api.UUID, source NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool, calcTargets func() ([]api.EnvelopeTarget, bool, error), ) ([]api.EnvelopeTarget, error) GetInputs( processID api.UUID, targetNode ProcessNode, ) []ProcessNodeInput UpdateReceiveStatus(reception Reception, status ReceptionStatus) error ProcessNodeEnd( process api.UUID, node ProcessNode, status api.ActorProcessingState_Status) error GetState(api.UUID) ProcessState GetStateUpdater(api.UUID) StateUpdater SetStatus(api.UUID, api.Process_Status, string) error // Generate a AckResult process log entry AckResult(api.UUID) SetEventHandlers(onStatusChanged ProcessEventStatusHandler) SetProcessLogEntryHandler(ProcessLogEntryHandler) GetUnprocessedProcessLogEntries() []ProcessLogEntry GetProcessLogEntries(api.UUID) []ProcessLogEntry QueryPostMortem(level api.LogLevel, status ...api.PMProcess_Status) []Process GetPostMortemState(api.UUID) PostMortemState SetPostMortemState(api.UUID, PostMortemState) // PurgeProcess removes all data of some processes. // The process must be terminated // Returns the list of process purged. If some ids are missing, the process // were non-existant or they had a non terminal status PurgeProcess([]api.UUID) ([]api.UUID, error) }
ProcessStorage persists process data
type Reception ¶
type Reception struct { ProcessID api.UUID EnvelopeID api.UUID SourceActor api.UUID SourceOutputRef NodeOutputRef }
Reception identifies a reception of an envelop from an actor
type ReceptionStatus ¶
type ReceptionStatus int
ReceptionStatus describes the status of an envelope reception in the context of a process
const ( // ReceptionPending is when envelop reception is not yet started ReceptionPending ReceptionStatus = iota // ReceptionOngoing is when part(s) of the envelope were received, but not all ReceptionOngoing // ReceptionDone is when all the complete envelop has been receiced ReceptionDone // ReceptionError is when a reception error occured ReceptionError // ReceptionClosed is when no envelope is to be expected ReceptionClosed )
func (ReceptionStatus) MarshalText ¶
func (s ReceptionStatus) MarshalText() ([]byte, error)
MarshalText serialize a ReceptionStatus to text
func (ReceptionStatus) String ¶
func (s ReceptionStatus) String() string
func (*ReceptionStatus) UnmarshalText ¶
func (s *ReceptionStatus) UnmarshalText(data []byte) error
UnmarshalText parses a ReceptionStatus
type Session ¶
type Session struct { api.SessionToken AccountID api.UUID }
Session links a SessionToken and a Account
type SessionStorage ¶
type SessionStorage interface { List() ([]Session, error) Get(token string) (*Session, error) Set(Session) error Delete(token string) error DeleteOlderThan(time time.Time) error }
SessionStorage stores Sessions
type SourceMatch ¶
type SourceMatch struct {
EventTypes []string // All event types envelope must contain exactly
}
SourceMatch are rules to match incoming envelope with
type StateUpdater ¶
type StateUpdater interface { Get() (ProcessState, error) Set(ProcessState) error MarkLogEntry(id string, asError bool) error Unlock() error }
StateUpdater provides exclusive access to a process state
Source Files
¶
- account_storage.go
- actor_storage.go
- envelope_storage.go
- envelopestoragereceptionstatus_string.go
- graphtypes.go
- job_storage.go
- logs.go
- nodetype_string.go
- password_storage.go
- pipeline_storage.go
- process.go
- processnodeiostatus_string.go
- processnodestatus_string.go
- processtypes.go
- session_storage.go
- storage.go