streamlet

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2022 License: Apache-2.0 Imports: 10 Imported by: 1

Documentation

Index

Constants

View Source
const (
	ElementCreating    = "Creating"
	ElementCreated     = "Created"
	ElementActivating  = "Activating"
	ElementActivated   = "Activated"
	ElementCompleting  = "Completing"
	ElementCompleted   = "Completed"
	ElementTerminating = "Terminating"
	ElementTerminated  = "Terminated"
)

Variables

View Source
var (
	ErrBPMNEngineNotStarted                 = NewBPMNEngineError(nil, "engine: not started.")
	ErrBPMNEngineCancelled                  = NewBPMNEngineError(nil, "engine: cancelled.")
	ErrBPMNEngineIdGeneratorNotSet          = NewBPMNEngineError(nil, "engine: Id Generator not set.")
	ErrBPMNEngineProcessInstanceStoreNotSet = NewBPMNEngineError(nil, "engine: Process Instance Store not set.")
	ErrBPMNEngineTokenStoreNotSet           = NewBPMNEngineError(nil, "engine: Token Store not set.")
	ErrBPMNEngineStateStoreNotSet           = NewBPMNEngineError(nil, "engine: State Store not set.")
	ErrBPMNEngineStateHistoryStoreNotSet    = NewBPMNEngineError(nil, "engine: State History Store not set.")
	ErrBPMNEngineSendCommandTimeout         = NewBPMNEngineError(nil, "engine: send command timeout.")
	ErrBPMNEngineReceiveCommandTimeout      = NewBPMNEngineError(nil, "engine: receive command timeout.")
	ErrBPMNEngineProcessInstanceNotFound    = NewBPMNEngineError(nil, "engine: process instance not found.")
	ErrBPMNEngineProcessInstanceNotRunning  = NewBPMNEngineError(nil, "engine: process instance not running.")
)
View Source
var (
	ErrProcessStateMachineRequired  = NewBPMNEngineError(nil, "process: process instance implementation must implement ProcessStateMachine")
	ErrProcessInstanceNotRunning    = NewBPMNEngineError(nil, "process: process instance is completed")
	ErrProcessSendCommandTimeout    = NewBPMNEngineError(nil, "process: send command timeout.")
	ErrProcessReceiveCommandTimeout = NewBPMNEngineError(nil, "process: receive command timeout.")
)
View Source
var (
	ErrStoreElementNotFound = NewBPMNEngineError(nil, "element not found")
)

Functions

This section is empty.

Types

type BPMNEngine

type BPMNEngine struct {
	IdGenerator          IdGenerator           // Generates ids for elements
	StartEventProcessors []StartEventProcessor // Start Event Processors to run on startup
	ProcessInstanceStore ProcessInstanceStore  // Stores process instance states received from state stream
	TokenStore           TokenStore            // Stores token states received from state stream
	StateStore           StateStore            // Stores states received from stateStream
	StateHistoryStore    StateHistoryStore     // Stores history of states
	// contains filtered or unexported fields
}

func NewBPMNEngine

NewBPMNEngine creates a new BPMNEngine with given IdGenerator, StateStore, and StateHistoryStore

func NewMemoryBPMNEngine

func NewMemoryBPMNEngine() *BPMNEngine

NewMemoryBPMNEngine creates a new BPMNEngine with an in-memory id generator, state store, and history store.

func (*BPMNEngine) AddStartEventProcesser

func (b *BPMNEngine) AddStartEventProcesser(processor StartEventProcessor)

func (*BPMNEngine) GenerateId

func (b *BPMNEngine) GenerateId(elementType ElementType) string

GenerateId generates a Pid for a given element type

func (*BPMNEngine) ProcessInstanceCount

func (b *BPMNEngine) ProcessInstanceCount() int

func (*BPMNEngine) SendCommand

func (b *BPMNEngine) SendCommand(cmd any) (any, error)

SendCommand sends any command to a process instance

func (*BPMNEngine) SendCommandWithTimeout

func (b *BPMNEngine) SendCommandWithTimeout(cmd any, timeout time.Duration) (any, error)

func (*BPMNEngine) Start

func (b *BPMNEngine) Start() error

Start starts the engine, and starts routines to receive state and commands

func (*BPMNEngine) Stop

func (b *BPMNEngine) Stop()

Stop stops the engine

func (*BPMNEngine) Wait

func (b *BPMNEngine) Wait()

Wait waits for all process instances to complete

func (*BPMNEngine) WriteProcessInstanceState

func (b *BPMNEngine) WriteProcessInstanceState(state ProcessInstanceState)

func (*BPMNEngine) WriteState

func (b *BPMNEngine) WriteState(state ElementState)

WriteState writes element state to the state store and history store

func (*BPMNEngine) WriteToken

func (b *BPMNEngine) WriteToken(state TokenState)

type BPMNEngineError

type BPMNEngineError struct {
	Inner      error
	Message    string
	StackTrace string
	Misc       map[string]any
}

func NewBPMNEngineError

func NewBPMNEngineError(err error, messagef string, msgArgs ...any) BPMNEngineError

func (BPMNEngineError) Error

func (err BPMNEngineError) Error() string

type Comments

type Comments struct {
	Comments string
}

type CompleteTasksResp

type CompleteTasksResp struct {
	Errors []error
}

func NewCompleteTasksResp

func NewCompleteTasksResp() *CompleteTasksResp

type CompleteUserTaskCmd

type CompleteUserTaskCmd struct {
	CompletedBy
	Comments
	ProcessInstanceId string
	TaskId            string
}

type CompleteUserTasksCmd

type CompleteUserTasksCmd struct {
	Tasks []CompleteUserTaskCmd
}

type CompletedBy

type CompletedBy struct {
	CompletedById   string
	CompletedByUser string
}

type ElementState

type ElementState struct {
	ElementType ElementType
	Key         string
	Id          string
	Status      string
	Object      any `json:"-"`
}

type ElementStateProvider

type ElementStateProvider interface {
	GetElementState() ElementState
}

type ElementType

type ElementType string
const (
	ProcessInstanceType            ElementType = "ProcessInstance"
	TokenType                      ElementType = "Token"
	SequenceFlowType               ElementType = "SequenceFlow"
	ExclusiveGatewayType           ElementType = "ExclusiveGateway"
	ParallelGatewayType            ElementType = "ParallelGateway"
	NoneStartEventType             ElementType = "NoneStartEvent"
	NoneIntermediateThrowEventType ElementType = "NoneIntermediateThrowEvent"
	NoneEndEventType               ElementType = "NoneEndEvent"
	TimerStartEventType            ElementType = "TimerStartEventType"
	UserTaskType                   ElementType = "UserTaskType"
	ScriptTaskType                 ElementType = "ScriptTaskType"
)

func (ElementType) String

func (e ElementType) String() string

type ExclusiveGateway

type ExclusiveGateway struct {
	ElementState
	Handler     func() ElementStateProvider
	NextElement ElementStateProvider
}

func NewExclusiveGateway

func NewExclusiveGateway(key, id string, handler func() ElementStateProvider) *ExclusiveGateway

func (*ExclusiveGateway) Completed

func (eg *ExclusiveGateway) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ExclusiveGateway) Completing

func (eg *ExclusiveGateway) Completing(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ExclusiveGateway) GetElementState

func (eg *ExclusiveGateway) GetElementState() ElementState

func (*ExclusiveGateway) RefElement

func (eg *ExclusiveGateway) RefElement() any

func (*ExclusiveGateway) RunLifecycle

func (eg *ExclusiveGateway) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type ExecuteCmd

type ExecuteCmd interface {
	ExecuteCmd(cmd any) any
}

type GetProcessInstanceInfoCmd

type GetProcessInstanceInfoCmd struct{}

type GetProcessInstancesCmd

type GetProcessInstancesCmd struct {
}

type GetProcessInstancesResp

type GetProcessInstancesResp struct {
	Data   []ProcessInstanceState
	Errors []error
}

func NewGetProcessInstancesResp

func NewGetProcessInstancesResp() *GetProcessInstancesResp

type GetTasksCmd

type GetTasksCmd struct{}

type GetTasksInfoCmd

type GetTasksInfoCmd struct{}

type GetTasksResp

type GetTasksResp struct {
	Data   []TaskState
	Errors []error
}

func NewGetTasksResp

func NewGetTasksResp() *GetTasksResp

type GetUserTaskCmd

type GetUserTaskCmd struct {
}

type IdGenerator

type IdGenerator interface {
	GenerateId(elementType ElementType) string
}

type LifeCycleRunner

type LifeCycleRunner interface {
	RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)
}

type MemoryIdGenerator

type MemoryIdGenerator struct {
	// contains filtered or unexported fields
}

func NewMemoryIdGenerator

func NewMemoryIdGenerator() *MemoryIdGenerator

func (*MemoryIdGenerator) GenerateId

func (g *MemoryIdGenerator) GenerateId(_ ElementType) string

func (*MemoryIdGenerator) Start

func (g *MemoryIdGenerator) Start(ctx context.Context)

type MemoryStateHistoryStore

type MemoryStateHistoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStateHistoryStore

func NewMemoryStateHistoryStore() *MemoryStateHistoryStore

func (MemoryStateHistoryStore) GetStateHistories

func (m MemoryStateHistoryStore) GetStateHistories() []ElementState

func (MemoryStateHistoryStore) GetStateHistory

func (m MemoryStateHistoryStore) GetStateHistory(elementType ElementType, id string) ([]ElementState, bool)

func (MemoryStateHistoryStore) WriteStateHistory

func (m MemoryStateHistoryStore) WriteStateHistory(state ElementState)

type MemoryStateStore

type MemoryStateStore struct {
	// contains filtered or unexported fields
}

MemoryStateStore stores states in memory

func NewMemoryStateStore

func NewMemoryStateStore() *MemoryStateStore

NewMemoryStateStore creates a new in memory state store

func (*MemoryStateStore) GetState

func (m *MemoryStateStore) GetState(elementType ElementType, id string) (ElementState, error)

GetState returns the current state for a given elementType and id.

func (*MemoryStateStore) GetStates

func (m *MemoryStateStore) GetStates() ([]ElementState, error)

GetStates returns all states in the store

func (*MemoryStateStore) ReadProcessInstance

func (m *MemoryStateStore) ReadProcessInstance(id string) (ProcessInstanceState, error)

func (*MemoryStateStore) ReadProcessInstances

func (m *MemoryStateStore) ReadProcessInstances() ([]ProcessInstanceState, error)

func (*MemoryStateStore) ReadTokens

func (m *MemoryStateStore) ReadTokens() ([]TokenState, error)

func (*MemoryStateStore) WriteProcessInstanceState

func (m *MemoryStateStore) WriteProcessInstanceState(info ProcessInstanceState) error

func (*MemoryStateStore) WriteState

func (m *MemoryStateStore) WriteState(state ElementState) error

WriteState writes a StateEntry to the store

func (*MemoryStateStore) WriteToken

func (m *MemoryStateStore) WriteToken(info TokenState) error

type NoneEndEvent

type NoneEndEvent struct {
	ElementState
}

func NewNoneEndEvent

func NewNoneEndEvent(key, id string) *NoneEndEvent

func (*NoneEndEvent) GetElementState

func (e *NoneEndEvent) GetElementState() ElementState

func (*NoneEndEvent) RefElement

func (e *NoneEndEvent) RefElement() any

func (*NoneEndEvent) RunLifecycle

func (e *NoneEndEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type NoneIntermediateThrowEvent

type NoneIntermediateThrowEvent struct {
	ElementState
}

func NewNoneIntermediateThrowEvent

func NewNoneIntermediateThrowEvent(key, id string) *NoneIntermediateThrowEvent

func (*NoneIntermediateThrowEvent) GetElementState

func (e *NoneIntermediateThrowEvent) GetElementState() ElementState

func (*NoneIntermediateThrowEvent) RefElement

func (e *NoneIntermediateThrowEvent) RefElement() any

func (*NoneIntermediateThrowEvent) RunLifecycle

func (e *NoneIntermediateThrowEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type ParallelGateway

type ParallelGateway struct {
	ElementState
	CurrentSeqKey string
	InSeqKeys     []string
	OutSeqKeys    []string
}

func NewParallelGateway

func NewParallelGateway(key, id, currentSeqKey string, inSeqKeys, outSeqKeys []string) *ParallelGateway

func (*ParallelGateway) Completed

func (pg *ParallelGateway) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ParallelGateway) Completing

func (pg *ParallelGateway) Completing(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ParallelGateway) GetElementState

func (pg *ParallelGateway) GetElementState() ElementState

func (*ParallelGateway) RefElement

func (pg *ParallelGateway) RefElement() any

func (*ParallelGateway) RunLifecycle

func (pg *ParallelGateway) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type ParallelGatewayCmd

type ParallelGatewayCmd struct {
	// contains filtered or unexported fields
}

type PingProcessInstanceCmd

type PingProcessInstanceCmd struct{}

type ProcessInstance

type ProcessInstance struct {
	Mu           *sync.RWMutex
	Key          string
	Id           string
	Version      string
	Status       string
	Created      time.Time
	StartElement ElementStateProvider
	Tokens       []*Token
	BpmnEngine   *BPMNEngine
	Impl         any // Implementation of BPMN Process
	// contains filtered or unexported fields
}

func (*ProcessInstance) Activated

func (p *ProcessInstance) Activated()

func (*ProcessInstance) Activating

func (p *ProcessInstance) Activating()

func (*ProcessInstance) CompleteTask

func (p *ProcessInstance) CompleteTask(cmd CompleteUserTaskCmd) any

func (*ProcessInstance) Completed

func (p *ProcessInstance) Completed()

func (*ProcessInstance) Completing

func (p *ProcessInstance) Completing()

func (*ProcessInstance) GetProcessInstanceInfo

func (p *ProcessInstance) GetProcessInstanceInfo() ProcessInstanceState

func (*ProcessInstance) GetTasksInfo

func (p *ProcessInstance) GetTasksInfo() []TaskState

func (*ProcessInstance) ProcessParallelGateway

func (p *ProcessInstance) ProcessParallelGateway(v ParallelGateway) <-chan any

func (*ProcessInstance) RefElement

func (p *ProcessInstance) RefElement() any

func (*ProcessInstance) Run

func (*ProcessInstance) Running

func (p *ProcessInstance) Running() bool

func (*ProcessInstance) SendCommand

func (p *ProcessInstance) SendCommand(cmd any) (any, error)

SendCommand Called from external client to communicate with process instance

func (*ProcessInstance) SendCommandWithTimeout

func (p *ProcessInstance) SendCommandWithTimeout(cmd any, timeout time.Duration) (any, error)

func (*ProcessInstance) String

func (p *ProcessInstance) String() string

func (*ProcessInstance) Terminated

func (p *ProcessInstance) Terminated()

func (*ProcessInstance) Terminating

func (p *ProcessInstance) Terminating()

func (*ProcessInstance) WriteState

func (p *ProcessInstance) WriteState()

type ProcessInstanceCmd

type ProcessInstanceCmd struct {
	Id  string
	Cmd any
}

type ProcessInstanceDataRetriever

type ProcessInstanceDataRetriever interface {
	ProcessInstanceData() any
}

type ProcessInstanceReader

type ProcessInstanceReader interface {
	ReadProcessInstance(id string) (ProcessInstanceState, error) // Get rid of?
	ReadProcessInstances() ([]ProcessInstanceState, error)
}

type ProcessInstanceState

type ProcessInstanceState struct {
	Key     string
	Id      string
	Version string
	Status  string
	Created time.Time
	Data    any
}

type ProcessInstanceStore

type ProcessInstanceStore interface {
	ProcessInstanceReader
	ProcessInstanceWriter
}

type ProcessInstanceWriter

type ProcessInstanceWriter interface {
	WriteProcessInstanceState(info ProcessInstanceState) error
}

type ProcessStateMachine

type ProcessStateMachine interface {
	GetNextElement(g IdGenerator, currentElement ElementStateProvider) ElementStateProvider
}

type ScriptTask

type ScriptTask struct {
	ElementState
	// contains filtered or unexported fields
}

func NewScriptTask

func NewScriptTask(key, id string, script func(ctx context.Context, bpmnEngine *BPMNEngine, token *Token, task *ScriptTask)) *ScriptTask

func (*ScriptTask) Completed

func (st *ScriptTask) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ScriptTask) Completing

func (st *ScriptTask) Completing(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ScriptTask) GetElementState

func (st *ScriptTask) GetElementState() ElementState

func (*ScriptTask) RunLifecycle

func (st *ScriptTask) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type SendTask

type SendTask struct {
	ElementState
}

func (*SendTask) RunLifecycle

func (s *SendTask) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type SequenceFlow

type SequenceFlow struct {
	ElementState
}

func NewSequenceFlow

func NewSequenceFlow(key, id string) *SequenceFlow

func (*SequenceFlow) GetElementState

func (s *SequenceFlow) GetElementState() ElementState

func (*SequenceFlow) RunLifecycle

func (s *SequenceFlow) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type StartEvent

type StartEvent struct {
	ElementState
}

func NewNoneStartEvent

func NewNoneStartEvent(key, id string) *StartEvent

func (*StartEvent) GetElementState

func (e *StartEvent) GetElementState() ElementState

func (*StartEvent) RefElement

func (e *StartEvent) RefElement() any

func (*StartEvent) RunLifecycle

func (e *StartEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type StartEventProcessor

type StartEventProcessor interface {
	StartEventProcesses(ctx context.Context)
}

type StartProcessInstanceCmd

type StartProcessInstanceCmd struct {
	Instance *ProcessInstance
}

type StartProcessInstanceResp

type StartProcessInstanceResp struct {
	Err error
}

type StateHistoryStore

type StateHistoryStore interface {
	WriteStateHistory(state ElementState)
	GetStateHistory(elementType ElementType, id string) ([]ElementState, bool)
	GetStateHistories() []ElementState
}

type StateReader

type StateReader interface {
	// GetState returns the current state for a given elementType and id.
	GetState(elementType ElementType, id string) (ElementState, error)
	// GetStates returns all states in the store
	GetStates() ([]ElementState, error)
}

StateReader reads a StateEntry based on filters

type StateStore

StateStore combines a state reader and writer

type StateWriter

type StateWriter interface {
	// WriteState writes a StateEntry
	WriteState(state ElementState) error
}

StateWriter writes a StateEntry

type TaskState

type TaskState struct {
	ProcessInstanceKey     string
	ProcessInstanceId      string
	ProcessInstanceVersion string
	Key                    string
	Id                     string
	Status                 string
}

type TimerStartEvent

type TimerStartEvent struct {
	ElementState
}

func NewTimerStartEvent

func NewTimerStartEvent(key, id string) *TimerStartEvent

func (*TimerStartEvent) GetElementState

func (e *TimerStartEvent) GetElementState() ElementState

func (*TimerStartEvent) RefElement

func (e *TimerStartEvent) RefElement() any

func (*TimerStartEvent) RunLifecycle

func (e *TimerStartEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type Token

type Token struct {
	Id                          string
	PrevElement, CurrentElement ElementStateProvider
	BpmnEngine                  *BPMNEngine
	ProcessInstance             *ProcessInstance
	// contains filtered or unexported fields
}

func NewToken

func NewToken(startElement ElementStateProvider, processInstance *ProcessInstance, engine *BPMNEngine) *Token

func (*Token) AcceptingCommands

func (t *Token) AcceptingCommands() bool

func (*Token) CurrentState

func (t *Token) CurrentState() ElementStateProvider

func (*Token) IsComplete

func (t *Token) IsComplete() bool

func (*Token) PreviousState

func (t *Token) PreviousState() ElementStateProvider

func (*Token) Run

func (t *Token) Run(ctx context.Context, wg *sync.WaitGroup)

func (*Token) SendCommand

func (t *Token) SendCommand(ctx context.Context, cmd any, timeout time.Duration) any

func (*Token) WriteState

func (t *Token) WriteState()

type TokenReader

type TokenReader interface {
	ReadTokens() ([]TokenState, error)
}

type TokenState

type TokenState struct {
	Id                string
	ProcessInstanceId string
	CurrentElementId  string
	Complete          bool
}

type TokenStore

type TokenStore interface {
	TokenReader
	TokenWriter
}

type TokenWriter

type TokenWriter interface {
	WriteToken(info TokenState) error
}

type UserTask

type UserTask struct {
	ElementState
	CompletedBy
	Comments

	CompletedTime time.Time
	// contains filtered or unexported fields
}

func NewUserTask

func NewUserTask(key, id string) *UserTask

func (*UserTask) Clone

func (t *UserTask) Clone() *UserTask

func (*UserTask) Completed

func (t *UserTask) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*UserTask) Completing

func (t *UserTask) Completing(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*UserTask) GetElementState

func (t *UserTask) GetElementState() ElementState

func (*UserTask) RefElement

func (t *UserTask) RefElement() any

func (*UserTask) RunLifecycle

func (t *UserTask) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*UserTask) RunTask

func (t *UserTask) RunTask(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

type WaitingParallelGateway

type WaitingParallelGateway struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
web

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL