Documentation ¶
Index ¶
- type AccountStorage
- func (p AccountStorage) Create(account api.Account) (api.Account, error)
- func (p AccountStorage) Delete(account api.Account)
- func (p AccountStorage) Get(id api.UUID) (api.Account, bool)
- func (p AccountStorage) GetByAPIKey(key string) (account api.Account, exists bool)
- func (p AccountStorage) GetByCert(cert string) (account api.Account, exists bool)
- func (p AccountStorage) GetByName(name string) (api.Account, bool)
- func (p AccountStorage) List() []api.Account
- func (p AccountStorage) Update(account api.Account) (api.Account, error)
- type ActorStorage
- func (p ActorStorage) Create(actor api.Actor) (api.Actor, error)
- func (p ActorStorage) Delete(id api.UUID)
- func (p ActorStorage) Get(id api.UUID) (api.Actor, bool)
- func (p ActorStorage) GetByName(name string) (api.Actor, bool)
- func (p ActorStorage) JustSeen(id api.UUID) error
- func (p ActorStorage) List() []api.Actor
- func (p ActorStorage) ListAccount(accountID api.UUID) []api.Actor
- func (p ActorStorage) ListKind(actorKind api.Actor_Kind) []api.Actor
- func (p ActorStorage) ListRole(actorKind api.Actor_Kind, role string) []api.Actor
- func (p ActorStorage) MarkUnresponsive(id api.UUID, decisionTime time.Time) error
- func (p *ActorStorage) SetEventHandlers(onActorUp, onActorDown storage.ActorEventHandler)
- func (p ActorStorage) SetOnline(id api.UUID, online bool) error
- func (p ActorStorage) Update(actor api.Actor) (api.Actor, error)
- type Base
- type EnvelopeStorage
- func (es EnvelopeStorage) CheckStalled(id api.UUID, delay time.Duration) bool
- func (es EnvelopeStorage) GetEventTypes(id api.UUID) (types []string, exists bool)
- func (es EnvelopeStorage) Purge(filter storage.EnvelopeFilter)
- func (es EnvelopeStorage) ReadEnvelope(id api.UUID, position storage.EnvelopeReadPos, maxSize int) (api.Envelope, storage.EnvelopeReadPos, error)
- func (es *EnvelopeStorage) SetEventHandler(handler storage.EnvelopeStorageEventHandler)
- func (es EnvelopeStorage) StoreEnvelope(envelope api.Envelope) (storage.EnvelopeState, error)
- type JobStorage
- func (s *JobStorage) GetState(id storage.JobID) storage.JobState
- func (s *JobStorage) ListDetachedJobByActor(actorID api.UUID) (storage.JobList, error)
- func (s *JobStorage) ListJobByProcess(processID api.UUID) []storage.Job
- func (s *JobStorage) ListJobByStatus(status api.ActorProcessingState_Status) []storage.Job
- func (s *JobStorage) ListPendingJobByActor(actorID api.UUID, after *storage.JobID, limit int) []storage.Job
- func (s *JobStorage) ListRunningJobByEnvelopeID(envelopeID api.UUID) []storage.Job
- func (s *JobStorage) NewJob(id storage.JobID, state storage.JobState)
- func (s *JobStorage) Purge(processIDs []api.UUID) error
- func (s *JobStorage) SetState(id storage.JobID, state storage.JobState)
- type LogStorage
- type PasswordStorage
- type PipelineStorage
- func (s *PipelineStorage) Load(id api.UUID) (storage.Pipeline, bool)
- func (s *PipelineStorage) LoadActivePipeline() []storage.Pipeline
- func (s *PipelineStorage) Query(name, version string, activeOnly bool) (graphs []storage.Pipeline)
- func (s *PipelineStorage) Save(pipeline storage.Pipeline) (api.UUID, error)
- func (s *PipelineStorage) SetStatus(id api.UUID, status api.PipelineInfo_Status) error
- type ProcessStorage
- func (s *ProcessStorage) AckResult(processID api.UUID)
- func (s *ProcessStorage) CreateProcess(p storage.Process) (api.UUID, error)
- func (s *ProcessStorage) FindProcessForEnvelope(envelopeIDs []api.UUID) map[api.UUID]api.UUID
- func (s *ProcessStorage) GetInputs(processID api.UUID, targetNode storage.ProcessNode) (inputs []storage.ProcessNodeInput)
- func (s *ProcessStorage) GetPostMortemState(id api.UUID) storage.PostMortemState
- func (s *ProcessStorage) GetProcess(id api.UUID) (storage.Process, bool)
- func (s *ProcessStorage) GetProcessByTrigger(emitter api.Actor, envelopeID api.UUID) (storage.Process, bool)
- func (s *ProcessStorage) GetProcessLogEntries(id api.UUID) (entries []storage.ProcessLogEntry)
- func (s *ProcessStorage) GetState(processID api.UUID) storage.ProcessState
- func (s *ProcessStorage) GetStateUpdater(processID api.UUID) storage.StateUpdater
- func (s *ProcessStorage) GetTargets(processID api.UUID, source storage.NodeOutputRef, envelopeID api.UUID, ...) (targets []api.EnvelopeTarget, calcErr error)
- func (s *ProcessStorage) GetUnprocessedProcessLogEntries() (entries []storage.ProcessLogEntry)
- func (s *ProcessStorage) ProcessNodeEnd(processID api.UUID, node storage.ProcessNode, ...) error
- func (s *ProcessStorage) PurgeProcess(ids []api.UUID) ([]api.UUID, error)
- func (s *ProcessStorage) Query(filter api.ProcessFilter) []storage.Process
- func (s *ProcessStorage) QueryPostMortem(level api.LogLevel, statusList ...api.PMProcess_Status) []storage.Process
- func (s *ProcessStorage) SetEventHandlers(onStatusChanged storage.ProcessEventStatusHandler)
- func (s *ProcessStorage) SetPipeline(processID api.UUID, pipelineID api.UUID, sourceOutput storage.NodeOutputRef)
- func (s *ProcessStorage) SetPostMortemState(id api.UUID, pmState storage.PostMortemState)
- func (s *ProcessStorage) SetProcessLogEntryHandler(handler storage.ProcessLogEntryHandler)
- func (s *ProcessStorage) SetStatus(processID api.UUID, status api.Process_Status, reason string) error
- func (s *ProcessStorage) UpdateReceiveStatus(reception storage.Reception, status storage.ReceptionStatus) error
- type RawSQLBase
- type RawSQLTxHelper
- type SessionStorage
- func (store *SessionStorage) Delete(token string) error
- func (store *SessionStorage) DeleteOlderThan(time time.Time) error
- func (store *SessionStorage) Get(token string) (*storage.Session, error)
- func (store *SessionStorage) List() ([]storage.Session, error)
- func (store *SessionStorage) Set(session storage.Session) error
- type StateUpdater
- type TxHelper
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccountStorage ¶
type AccountStorage struct {
Base
}
AccountStorage implements all the persistor interfaces
func NewAccountStorage ¶
func NewAccountStorage(model *model.Model, db *yago.DB, logger xbus.Logger) *AccountStorage
NewAccountStorage initialize a new AccountStorage
func (AccountStorage) Delete ¶
func (p AccountStorage) Delete(account api.Account)
Delete deletes an account
func (AccountStorage) GetByAPIKey ¶
func (p AccountStorage) GetByAPIKey(key string) (account api.Account, exists bool)
GetByAPIKey returns an account with the given api key if exists
func (AccountStorage) GetByCert ¶
func (p AccountStorage) GetByCert(cert string) (account api.Account, exists bool)
GetByCert returns an account with the given cert if exists
func (AccountStorage) GetByName ¶
func (p AccountStorage) GetByName(name string) (api.Account, bool)
GetByName returns an account with the given name if exists
func (AccountStorage) List ¶
func (p AccountStorage) List() []api.Account
List returns a list of all the accounts
type ActorStorage ¶
type ActorStorage struct { Base // contains filtered or unexported fields }
ActorStorage implements all the persistor interfaces
func NewActorStorage ¶
func NewActorStorage(model *model.Model, db *yago.DB, logger xbus.Logger) *ActorStorage
NewActorStorage initialize a new ActorStorage
func (ActorStorage) GetByName ¶
func (p ActorStorage) GetByName(name string) (api.Actor, bool)
GetByName returns an actor from its name
func (ActorStorage) JustSeen ¶
func (p ActorStorage) JustSeen(id api.UUID) error
JustSeen update the actor lastSeen timestamp
func (ActorStorage) ListAccount ¶
func (p ActorStorage) ListAccount(accountID api.UUID) []api.Actor
ListAccount returns a list of all actors the specified account provides access to.
func (ActorStorage) ListKind ¶
func (p ActorStorage) ListKind(actorKind api.Actor_Kind) []api.Actor
ListKind returns a list of all actors of one type
func (ActorStorage) ListRole ¶
func (p ActorStorage) ListRole(actorKind api.Actor_Kind, role string) []api.Actor
ListRole returns a list of all actors of one role
func (ActorStorage) MarkUnresponsive ¶
MarkUnresponsive marks the actor as unresponsive if the actor was not seen since decisionTime
func (*ActorStorage) SetEventHandlers ¶
func (p *ActorStorage) SetEventHandlers(onActorUp, onActorDown storage.ActorEventHandler)
SetEventHandlers sets the event handlers
type Base ¶
type Base struct {
// contains filtered or unexported fields
}
Base provides a helpful base for sql-based storage implementation
type EnvelopeStorage ¶
type EnvelopeStorage struct { Base // contains filtered or unexported fields }
EnvelopeStorage implements the storage.EnvelopeStorage interface
func NewEnvelopeStorage ¶
NewEnvelopeStorage initialize a EnvelopeStorage
func (EnvelopeStorage) CheckStalled ¶
CheckStalled mark the given envelope as stalled if no fragment was received in the given time _and_ the envelope is in state "Receiving" Returns true if the envelope is indeed stalled If the envelope is already marked stalled, the given time is ignored.
func (EnvelopeStorage) GetEventTypes ¶
func (es EnvelopeStorage) GetEventTypes(id api.UUID) (types []string, exists bool)
GetEventTypes returns the eventtypes of an envelope, or nil if unknown exists is false if the envelope is unknown, true if exists. will panic on any SQL error.
func (EnvelopeStorage) Purge ¶
func (es EnvelopeStorage) Purge(filter storage.EnvelopeFilter)
Purge removes all envelopes selected by the given filter
func (EnvelopeStorage) ReadEnvelope ¶
func (es EnvelopeStorage) ReadEnvelope(id api.UUID, position storage.EnvelopeReadPos, maxSize int) (api.Envelope, storage.EnvelopeReadPos, error)
ReadEnvelope reads an envelope from the given position
func (*EnvelopeStorage) SetEventHandler ¶
func (es *EnvelopeStorage) SetEventHandler(handler storage.EnvelopeStorageEventHandler)
SetEventHandler set the current handler
func (EnvelopeStorage) StoreEnvelope ¶
func (es EnvelopeStorage) StoreEnvelope(envelope api.Envelope) (storage.EnvelopeState, error)
StoreEnvelope stores an envelope chunk and returns the envelope state in the storage
type JobStorage ¶
type JobStorage struct {
Base
}
JobStorage implements storage.JobStorage
func NewJobStorage ¶
func NewJobStorage(db *yago.DB, logger xbus.Logger) *JobStorage
NewJobStorage initialize a new JobStorage
func (*JobStorage) GetState ¶
func (s *JobStorage) GetState(id storage.JobID) storage.JobState
GetState returns the job state
func (*JobStorage) ListDetachedJobByActor ¶
ListDetachedJobByActor ...
func (*JobStorage) ListJobByProcess ¶
func (s *JobStorage) ListJobByProcess(processID api.UUID) []storage.Job
ListJobByProcess returns the active jobs of a given process
func (*JobStorage) ListJobByStatus ¶
func (s *JobStorage) ListJobByStatus( status api.ActorProcessingState_Status, ) []storage.Job
ListJobByStatus ...
func (*JobStorage) ListPendingJobByActor ¶
func (s *JobStorage) ListPendingJobByActor( actorID api.UUID, after *storage.JobID, limit int, ) []storage.Job
ListPendingJobByActor returns the active jobs of a given actor
func (*JobStorage) ListRunningJobByEnvelopeID ¶
func (s *JobStorage) ListRunningJobByEnvelopeID(envelopeID api.UUID) []storage.Job
ListRunningJobByEnvelopeID returns the active jobs that references a given envelope
func (*JobStorage) NewJob ¶
func (s *JobStorage) NewJob( id storage.JobID, state storage.JobState, )
NewJob ...
type LogStorage ¶
type LogStorage struct {
Base
}
LogStorage implements storage.LogStorage
func NewLogStorage ¶
func NewLogStorage(model *model.Model, db *yago.DB, logger xbus.Logger) LogStorage
NewLogStorage initialize a LogStorage
func (LogStorage) GetProcessMessages ¶
GetProcessMessages returns all the messages related to the given envelope
func (LogStorage) Log ¶
func (s LogStorage) Log(entry api.LogEntry)
Log adds a log entry to the storage
func (LogStorage) PurgeBefore ¶
func (s LogStorage) PurgeBefore(t time.Time) int64
PurgeBefore removes all logs older than the given date
type PasswordStorage ¶
type PasswordStorage struct {
RawSQLBase
}
PasswordStorage provides a SQL based storage.PasswordStorage implementaion
func NewPasswordStorage ¶
func NewPasswordStorage(db *yago.DB, logger xbus.Logger) *PasswordStorage
NewPasswordStorage returns a PasswordStorage instance
func (*PasswordStorage) Reset ¶
func (s *PasswordStorage) Reset(id api.UUID)
Reset removed any password for the given id
type PipelineStorage ¶
type PipelineStorage struct {
Base
}
PipelineStorage implements storage.PipelineStorage
func NewPipelineStorage ¶
func NewPipelineStorage(model *model.Model, db *yago.DB, logger xbus.Logger) *PipelineStorage
NewPipelineStorage initialize a PipelineStorage
func (*PipelineStorage) LoadActivePipeline ¶
func (s *PipelineStorage) LoadActivePipeline() []storage.Pipeline
LoadActivePipeline loads all the active graphs
func (*PipelineStorage) Query ¶
func (s *PipelineStorage) Query(name, version string, activeOnly bool) (graphs []storage.Pipeline)
Query searches for matching graphs
func (*PipelineStorage) SetStatus ¶
func (s *PipelineStorage) SetStatus(id api.UUID, status api.PipelineInfo_Status) error
SetStatus changes a graph status
type ProcessStorage ¶
type ProcessStorage struct { Base // contains filtered or unexported fields }
ProcessStorage implements storage.ProcessStorage
func NewProcessStorage ¶
NewProcessStorage initialize a new ProcessStorage
func (*ProcessStorage) AckResult ¶
func (s *ProcessStorage) AckResult(processID api.UUID)
AckResult ...
func (*ProcessStorage) CreateProcess ¶
CreateProcess saves a process
func (*ProcessStorage) FindProcessForEnvelope ¶
FindProcessForEnvelope finds the process(es) that reference a bunch of envelopes
func (*ProcessStorage) GetInputs ¶
func (s *ProcessStorage) GetInputs( processID api.UUID, targetNode storage.ProcessNode, ) (inputs []storage.ProcessNodeInput)
GetInputs returns the incoming routes for a processnode
func (*ProcessStorage) GetPostMortemState ¶
func (s *ProcessStorage) GetPostMortemState(id api.UUID) storage.PostMortemState
GetPostMortemState returns the postmortem state of a process
func (*ProcessStorage) GetProcess ¶
GetProcess returns a process data
func (*ProcessStorage) GetProcessByTrigger ¶
func (s *ProcessStorage) GetProcessByTrigger(emitter api.Actor, envelopeID api.UUID) (storage.Process, bool)
GetProcessByTrigger returns a process matching a trigger or an empty value if not found TODO in the future we should support multiple process per trigger.
func (*ProcessStorage) GetProcessLogEntries ¶
func (s *ProcessStorage) GetProcessLogEntries(id api.UUID) (entries []storage.ProcessLogEntry)
GetProcessLogEntries returns the unprocessed log entries
func (*ProcessStorage) GetState ¶
func (s *ProcessStorage) GetState(processID api.UUID) storage.ProcessState
GetState returns an instant copy of the state of a process
func (*ProcessStorage) GetStateUpdater ¶
func (s *ProcessStorage) GetStateUpdater(processID api.UUID) storage.StateUpdater
GetStateUpdater returns a StateUpdater for a process
func (*ProcessStorage) GetTargets ¶
func (s *ProcessStorage) GetTargets( processID api.UUID, source storage.NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool, calcTargets func() ([]api.EnvelopeTarget, bool, error), ) (targets []api.EnvelopeTarget, calcErr error)
GetTargets returns the targets for an envelope if already known, or 'nil' if not
func (*ProcessStorage) GetUnprocessedProcessLogEntries ¶
func (s *ProcessStorage) GetUnprocessedProcessLogEntries() (entries []storage.ProcessLogEntry)
GetUnprocessedProcessLogEntries returns the unprocessed log entries
func (*ProcessStorage) ProcessNodeEnd ¶
func (s *ProcessStorage) ProcessNodeEnd( processID api.UUID, node storage.ProcessNode, status api.ActorProcessingState_Status, ) error
ProcessNodeEnd signals the end of a ProcessNode.
func (*ProcessStorage) PurgeProcess ¶
PurgeProcess removes all data of a process. The process must be terminated TODO Make a version of the function that takes several ids, and return the list of actually purged processes.
func (*ProcessStorage) Query ¶
func (s *ProcessStorage) Query(filter api.ProcessFilter) []storage.Process
Query lookup for processes
func (*ProcessStorage) QueryPostMortem ¶
func (s *ProcessStorage) QueryPostMortem(level api.LogLevel, statusList ...api.PMProcess_Status) []storage.Process
QueryPostMortem returns a list of dead processes
func (*ProcessStorage) SetEventHandlers ¶
func (s *ProcessStorage) SetEventHandlers(onStatusChanged storage.ProcessEventStatusHandler)
SetEventHandlers sets the event handlers
func (*ProcessStorage) SetPipeline ¶
func (s *ProcessStorage) SetPipeline( processID api.UUID, pipelineID api.UUID, sourceOutput storage.NodeOutputRef, )
SetPipeline updates the pipeline & sourceOutput of a process
func (*ProcessStorage) SetPostMortemState ¶
func (s *ProcessStorage) SetPostMortemState(id api.UUID, pmState storage.PostMortemState)
SetPostMortemState sets the postmortem state of a process
func (*ProcessStorage) SetProcessLogEntryHandler ¶
func (s *ProcessStorage) SetProcessLogEntryHandler( handler storage.ProcessLogEntryHandler, )
SetProcessLogEntryHandler sets the process logentry handler
func (*ProcessStorage) SetStatus ¶
func (s *ProcessStorage) SetStatus( processID api.UUID, status api.Process_Status, reason string, ) error
SetStatus changes the current status. Only Runnning->Paused and Paused->Running are allowed at this point
func (*ProcessStorage) UpdateReceiveStatus ¶
func (s *ProcessStorage) UpdateReceiveStatus(reception storage.Reception, status storage.ReceptionStatus) error
UpdateReceiveStatus updates a reception status
type RawSQLBase ¶
type RawSQLBase struct {
// contains filtered or unexported fields
}
RawSQLBase provides a base type for implementing pure-sql storages
func (*RawSQLBase) MustBegin ¶
func (sb *RawSQLBase) MustBegin() *RawSQLTxHelper
MustBegin begins a transaction or panics
type RawSQLTxHelper ¶
RawSQLTxHelper wraps a SQL tx and make it easier to enforce commit or rollback
func RawSQLMustBegin ¶
func RawSQLMustBegin(db *yago.DB, logger xbus.Logger) *RawSQLTxHelper
RawSQLMustBegin starts a sql transaction and wrap it
func (*RawSQLTxHelper) Commit ¶
func (tx *RawSQLTxHelper) Commit() error
Commit commits the transaction
func (*RawSQLTxHelper) MustCommit ¶
func (tx *RawSQLTxHelper) MustCommit()
MustCommit commits of rollback
func (*RawSQLTxHelper) Rollback ¶
func (tx *RawSQLTxHelper) Rollback() error
Rollback rollbacks and log error if any
func (*RawSQLTxHelper) RollbackIfOpened ¶
func (tx *RawSQLTxHelper) RollbackIfOpened()
RollbackIfOpened rollback the transaction is still open
type SessionStorage ¶
type SessionStorage struct {
Base
}
SessionStorage implements storage.SessionStorage
func NewSessionStorage ¶
func NewSessionStorage(db *yago.DB, logger xbus.Logger) *SessionStorage
NewSessionStorage implements session storage
func (*SessionStorage) Delete ¶
func (store *SessionStorage) Delete(token string) error
Delete deletes a token
func (*SessionStorage) DeleteOlderThan ¶
func (store *SessionStorage) DeleteOlderThan(time time.Time) error
DeleteOlderThan deletes the sessions based on their 'validUntil'
func (*SessionStorage) Get ¶
func (store *SessionStorage) Get(token string) (*storage.Session, error)
Get returns a Session from its token
type StateUpdater ¶
type StateUpdater struct {
// contains filtered or unexported fields
}
StateUpdater implements storage.StateUpdater
func NewStateUpdater ¶
func NewStateUpdater( processStorage *ProcessStorage, tx *TxHelper, processID api.UUID) (*StateUpdater, error)
NewStateUpdater creates a new StateUpdater
func (*StateUpdater) Get ¶
func (u *StateUpdater) Get() (storage.ProcessState, error)
Get returns the current state
func (*StateUpdater) MarkLogEntry ¶
func (u *StateUpdater) MarkLogEntry(id string, withError bool) error
MarkLogEntry marks a ProcessLogEntry as processed
func (*StateUpdater) Set ¶
func (u *StateUpdater) Set(state storage.ProcessState) error
Set updates the state
func (*StateUpdater) Unlock ¶
func (u *StateUpdater) Unlock() error
Unlock release the lock on the data
type TxHelper ¶
TxHelper wraps a tx and make it easier to enforce commit or rollback
func (*TxHelper) RollbackIfOpened ¶
func (tx *TxHelper) RollbackIfOpened()
RollbackIfOpened rollback the transaction is still open