Documentation ¶
Index ¶
- type BeatData
- type BeatDataService
- type CycleState
- type EventInputer
- type GlobalAliases
- func (p *GlobalAliases) GetPublishedAlias(key interface{}) smachine.SlotAliasValue
- func (p *GlobalAliases) PublishAlias(key interface{}, slot smachine.SlotAliasValue) bool
- func (p *GlobalAliases) ReplaceAlias(key interface{}, slot smachine.SlotAliasValue)
- func (p *GlobalAliases) UnpublishAlias(key interface{})
- type InputContext
- type InputEvent
- type InputSetup
- type PreparePulseCallbackFunc
- type PreparedState
- type PulseChanger
- type PulseConveyor
- func (p *PulseConveyor) AddDependency(v interface{})
- func (p *PulseConveyor) AddInput(ctx context.Context, pn pulse.Number, event InputEvent) error
- func (p *PulseConveyor) AddInputExt(pn pulse.Number, event InputEvent, createDefaults smachine.CreateDefaultValues) error
- func (p *PulseConveyor) AddInterfaceDependency(v interface{})
- func (p *PulseConveyor) AddManagedComponent(c managed.Component)
- func (p *PulseConveyor) CancelPulseChange() (err error)
- func (p *PulseConveyor) CommitPulseChange(pr pulse.Range, pulseStart time.Time, online census.OnlinePopulation) error
- func (p *PulseConveyor) FindDependency(id string) (interface{}, bool)
- func (p *PulseConveyor) GetDataManager() *PulseDataManager
- func (p *PulseConveyor) GetPublishedGlobalAliasAndBargeIn(key interface{}) (smachine.SlotLink, smachine.BargeInHolder)
- func (p *PulseConveyor) PreparePulseChange(out PreparePulseCallbackFunc) (err error)
- func (p *PulseConveyor) PutDependency(id string, v interface{})
- func (p *PulseConveyor) SetFactoryFunc(factory PulseEventFactoryFunc)
- func (p *PulseConveyor) StartWorker(emergencyStop <-chan struct{}, completedFn func())
- func (p *PulseConveyor) StartWorkerExt(emergencyStop <-chan struct{}, completedFn func(), ...)
- func (p *PulseConveyor) Stop()
- func (p *PulseConveyor) StopNoWait()
- func (p *PulseConveyor) TryPutDependency(id string, v interface{}) bool
- func (p *PulseConveyor) WakeUpWorker()
- type PulseConveyorConfig
- type PulseConveyorCycleFunc
- type PulseDataCache
- func (p *PulseDataCache) Check(pn pulse.Number) BeatData
- func (p *PulseDataCache) Contains(pn pulse.Number) bool
- func (p *PulseDataCache) EvictAndRotate(currentPN pulse.Number)
- func (p *PulseDataCache) EvictNoRotate(currentPN pulse.Number)
- func (p *PulseDataCache) Get(pn pulse.Number) BeatData
- func (p *PulseDataCache) GetMinRange() uint32
- func (p *PulseDataCache) Init(pdm *PulseDataManager, minRange uint32, accessRotations int)
- func (p *PulseDataCache) Put(pd BeatData)
- func (p *PulseDataCache) Rotate()
- func (p *PulseDataCache) Touch(pn pulse.Number) bool
- type PulseDataManager
- func (p *PulseDataManager) GetBeatData(pn pulse.Number) BeatData
- func (p *PulseDataManager) GetPresentPulse() (present pulse.Number, nearestFuture pulse.Number)
- func (p *PulseDataManager) GetPrevBeatData() (pulse.Number, BeatData)
- func (p *PulseDataManager) GetPulseData(pn pulse.Number) (pulse.Data, bool)
- func (p *PulseDataManager) HasPulseData(pn pulse.Number) bool
- func (p *PulseDataManager) IsAllowedFutureSpan(futurePN pulse.Number) bool
- func (p *PulseDataManager) IsAllowedPastSpan(pastPN pulse.Number) bool
- func (p *PulseDataManager) IsRecentPastRange(pastPN pulse.Number) bool
- func (p *PulseDataManager) TouchPulseData(pn pulse.Number) bool
- type PulseDataServicePrepareFunc
- type PulseEventFactoryFunc
- type PulseSlot
- func (p *PulseSlot) BeatData() (BeatData, PulseSlotState)
- func (p *PulseSlot) CurrentPulseData() pulse.Data
- func (p *PulseSlot) CurrentPulseNumber() pulse.Number
- func (p *PulseSlot) HasPulseData(pn pulse.Number) bool
- func (p *PulseSlot) PrevOperationPulseNumber() pulse.Number
- func (p *PulseSlot) PulseData() pulse.Data
- func (p *PulseSlot) PulseNumber() pulse.Number
- func (p *PulseSlot) PulseRange() (pulse.Range, PulseSlotState)
- func (p *PulseSlot) PulseRelativeDeadline(portion float64) time.Time
- func (p *PulseSlot) PulseStartedAt() time.Time
- func (p *PulseSlot) SetPulseChanger(changer PulseChanger) bool
- func (p *PulseSlot) State() PulseSlotState
- type PulseSlotConfig
- type PulseSlotMachine
- type PulseSlotPostMigrateFunc
- type PulseSlotState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BeatDataService ¶
type EventInputer ¶
type EventInputer interface { AddInput(ctx context.Context, pn pulse.Number, event InputEvent) error AddInputExt(pn pulse.Number, event InputEvent, createDefaults smachine.CreateDefaultValues) error }
type GlobalAliases ¶
type GlobalAliases struct {
// contains filtered or unexported fields
}
func (*GlobalAliases) GetPublishedAlias ¶
func (p *GlobalAliases) GetPublishedAlias(key interface{}) smachine.SlotAliasValue
func (*GlobalAliases) PublishAlias ¶
func (p *GlobalAliases) PublishAlias(key interface{}, slot smachine.SlotAliasValue) bool
func (*GlobalAliases) ReplaceAlias ¶
func (p *GlobalAliases) ReplaceAlias(key interface{}, slot smachine.SlotAliasValue)
func (*GlobalAliases) UnpublishAlias ¶
func (p *GlobalAliases) UnpublishAlias(key interface{})
type InputEvent ¶
type InputEvent = interface{}
type InputSetup ¶
type InputSetup struct { TargetPulse pulse.Number CreateFn smachine.CreateFunc PreInitFn smachine.PreInitHandlerFunc }
type PreparePulseCallbackFunc ¶
type PreparePulseCallbackFunc = func(PreparedState)
type PreparedState ¶
type PulseChanger ¶
type PulseChanger interface { PreparePulseChange(out PreparePulseCallbackFunc) CancelPulseChange() CommitPulseChange() }
type PulseConveyor ¶
type PulseConveyor struct {
// contains filtered or unexported fields
}
func NewPulseConveyor ¶
func NewPulseConveyor( ctx context.Context, config PulseConveyorConfig, factoryFn PulseEventFactoryFunc, registry injector.DependencyRegistry, ) *PulseConveyor
func (*PulseConveyor) AddDependency ¶
func (p *PulseConveyor) AddDependency(v interface{})
func (*PulseConveyor) AddInput ¶
func (p *PulseConveyor) AddInput(ctx context.Context, pn pulse.Number, event InputEvent) error
func (*PulseConveyor) AddInputExt ¶
func (p *PulseConveyor) AddInputExt(pn pulse.Number, event InputEvent, createDefaults smachine.CreateDefaultValues, ) error
func (*PulseConveyor) AddInterfaceDependency ¶
func (p *PulseConveyor) AddInterfaceDependency(v interface{})
func (*PulseConveyor) AddManagedComponent ¶
func (p *PulseConveyor) AddManagedComponent(c managed.Component)
func (*PulseConveyor) CancelPulseChange ¶
func (p *PulseConveyor) CancelPulseChange() (err error)
func (*PulseConveyor) CommitPulseChange ¶
func (p *PulseConveyor) CommitPulseChange(pr pulse.Range, pulseStart time.Time, online census.OnlinePopulation) error
func (*PulseConveyor) FindDependency ¶
func (p *PulseConveyor) FindDependency(id string) (interface{}, bool)
func (*PulseConveyor) GetDataManager ¶
func (p *PulseConveyor) GetDataManager() *PulseDataManager
func (*PulseConveyor) GetPublishedGlobalAliasAndBargeIn ¶
func (p *PulseConveyor) GetPublishedGlobalAliasAndBargeIn(key interface{}) (smachine.SlotLink, smachine.BargeInHolder)
func (*PulseConveyor) PreparePulseChange ¶
func (p *PulseConveyor) PreparePulseChange(out PreparePulseCallbackFunc) (err error)
func (*PulseConveyor) PutDependency ¶
func (p *PulseConveyor) PutDependency(id string, v interface{})
func (*PulseConveyor) SetFactoryFunc ¶
func (p *PulseConveyor) SetFactoryFunc(factory PulseEventFactoryFunc)
func (*PulseConveyor) StartWorker ¶
func (p *PulseConveyor) StartWorker(emergencyStop <-chan struct{}, completedFn func())
func (*PulseConveyor) StartWorkerExt ¶
func (p *PulseConveyor) StartWorkerExt(emergencyStop <-chan struct{}, completedFn func(), cycleFn PulseConveyorCycleFunc)
func (*PulseConveyor) Stop ¶
func (p *PulseConveyor) Stop()
func (*PulseConveyor) StopNoWait ¶
func (p *PulseConveyor) StopNoWait()
func (*PulseConveyor) TryPutDependency ¶
func (p *PulseConveyor) TryPutDependency(id string, v interface{}) bool
func (*PulseConveyor) WakeUpWorker ¶
func (p *PulseConveyor) WakeUpWorker()
type PulseConveyorConfig ¶
type PulseConveyorConfig struct { ConveyorMachineConfig smachine.SlotMachineConfig SlotMachineConfig smachine.SlotMachineConfig EventlessSleep time.Duration MinCachePulseAge, MaxPastPulseAge uint32 PulseDataService PulseDataServicePrepareFunc PulseSlotMigration PulseSlotPostMigrateFunc }
type PulseConveyorCycleFunc ¶
type PulseConveyorCycleFunc = func(CycleState)
type PulseDataCache ¶
type PulseDataCache struct {
// contains filtered or unexported fields
}
Cache that keeps (1) a PD younger than minRange (2) PD touched less than accessRotations ago. Safe for concurrent access. WARNING! Cache size is not directly limited. TODO PLAT-19 eviction function is not efficient for 100+ PDs and/or accessRotations > 10
func (*PulseDataCache) EvictAndRotate ¶
func (p *PulseDataCache) EvictAndRotate(currentPN pulse.Number)
func (*PulseDataCache) EvictNoRotate ¶
func (p *PulseDataCache) EvictNoRotate(currentPN pulse.Number)
func (*PulseDataCache) GetMinRange ¶
func (p *PulseDataCache) GetMinRange() uint32
func (*PulseDataCache) Init ¶
func (p *PulseDataCache) Init(pdm *PulseDataManager, minRange uint32, accessRotations int)
func (*PulseDataCache) Put ¶
func (p *PulseDataCache) Put(pd BeatData)
func (*PulseDataCache) Rotate ¶
func (p *PulseDataCache) Rotate()
type PulseDataManager ¶
type PulseDataManager struct {
// contains filtered or unexported fields
}
func (*PulseDataManager) GetBeatData ¶
func (p *PulseDataManager) GetBeatData(pn pulse.Number) BeatData
func (*PulseDataManager) GetPresentPulse ¶
func (p *PulseDataManager) GetPresentPulse() (present pulse.Number, nearestFuture pulse.Number)
func (*PulseDataManager) GetPrevBeatData ¶
func (p *PulseDataManager) GetPrevBeatData() (pulse.Number, BeatData)
func (*PulseDataManager) GetPulseData ¶
func (*PulseDataManager) HasPulseData ¶
func (p *PulseDataManager) HasPulseData(pn pulse.Number) bool
for non-recent past HasPulseData() can be incorrect / incomplete
func (*PulseDataManager) IsAllowedFutureSpan ¶
func (p *PulseDataManager) IsAllowedFutureSpan(futurePN pulse.Number) bool
IsAllowedFutureSpan Returns true when the given PN can be accepted into Future pulse slot, otherwise must be rejected
func (*PulseDataManager) IsAllowedPastSpan ¶
func (p *PulseDataManager) IsAllowedPastSpan(pastPN pulse.Number) bool
func (*PulseDataManager) IsRecentPastRange ¶
func (p *PulseDataManager) IsRecentPastRange(pastPN pulse.Number) bool
func (*PulseDataManager) TouchPulseData ¶
func (p *PulseDataManager) TouchPulseData(pn pulse.Number) bool
type PulseDataServicePrepareFunc ¶
type PulseDataServicePrepareFunc func(smachine.ExecutionContext, func(context.Context, BeatDataService) smachine.AsyncResultFunc) smachine.AsyncCallRequester
func CreatePulseDataAdapterFn ¶
func CreatePulseDataAdapterFn(ctx context.Context, pds BeatDataService, bufMax, parallelReaders int) PulseDataServicePrepareFunc
type PulseEventFactoryFunc ¶
type PulseEventFactoryFunc = func(context.Context, InputEvent, InputContext) (InputSetup, error)
PulseEventFactoryFunc should return pulse.Unknown or current.Pulse when SM doesn't need to be put into a different pulse slot. Arg (pulse.Range) can be nil for future slot.
type PulseSlot ¶
type PulseSlot struct {
// contains filtered or unexported fields
}
func NewPastPulseSlot ¶
func NewPastPulseSlot(pulseManager *PulseDataManager, pr pulse.Range) PulseSlot
NewPastPulseSlot is for test use only
func NewPresentPulseSlot ¶
func NewPresentPulseSlot(pulseManager *PulseDataManager, pr pulse.Range) PulseSlot
NewPresentPulseSlot is for test use only
func (*PulseSlot) BeatData ¶
func (p *PulseSlot) BeatData() (BeatData, PulseSlotState)
func (*PulseSlot) CurrentPulseData ¶
func (*PulseSlot) CurrentPulseNumber ¶
func (*PulseSlot) PrevOperationPulseNumber ¶
func (*PulseSlot) PulseNumber ¶
func (*PulseSlot) PulseRange ¶
func (p *PulseSlot) PulseRange() (pulse.Range, PulseSlotState)
func (*PulseSlot) PulseRelativeDeadline ¶
func (*PulseSlot) PulseStartedAt ¶
func (*PulseSlot) SetPulseChanger ¶
func (p *PulseSlot) SetPulseChanger(changer PulseChanger) bool
func (*PulseSlot) State ¶
func (p *PulseSlot) State() PulseSlotState
type PulseSlotConfig ¶
type PulseSlotConfig struct {
// contains filtered or unexported fields
}
type PulseSlotMachine ¶
type PulseSlotMachine struct { smachine.StateMachineDeclTemplate // contains filtered or unexported fields }
func NewPulseSlotMachine ¶
func NewPulseSlotMachine(config PulseSlotConfig, pulseManager *PulseDataManager) *PulseSlotMachine
func (*PulseSlotMachine) GetInitStateFor ¶
func (p *PulseSlotMachine) GetInitStateFor(sm smachine.StateMachine) smachine.InitFunc
func (*PulseSlotMachine) GetStateMachineDeclaration ¶
func (p *PulseSlotMachine) GetStateMachineDeclaration() smachine.StateMachineDeclaration
func (*PulseSlotMachine) SlotLink ¶
func (p *PulseSlotMachine) SlotLink() smachine.SlotLink
type PulseSlotPostMigrateFunc ¶
type PulseSlotPostMigrateFunc = func(prevState PulseSlotState, slot *PulseSlot, h smachine.SlotMachineHolder)
PulseSlotPostMigrateFunc is called on migration and on creation of the slot. For creation (prevState) will be zero.
type PulseSlotState ¶
type PulseSlotState uint8
const ( Future PulseSlotState Present Past Antique // non-individual past )