conveyor

package
v0.0.0-...-05bc493 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BeatData

type BeatData = managed.BeatData

type BeatDataService

type BeatDataService interface {
	LoadBeatData(context.Context, pulse.Number) (BeatData, bool)
}

type CycleState

type CycleState uint8
const (
	Scanning CycleState = iota
	ScanActive
	ScanIdle
)

type EventInputer

type EventInputer interface {
	AddInput(ctx context.Context, pn pulse.Number, event InputEvent) error
	AddInputExt(pn pulse.Number, event InputEvent, createDefaults smachine.CreateDefaultValues) error
}

type GlobalAliases

type GlobalAliases struct {
	// contains filtered or unexported fields
}

func (*GlobalAliases) GetPublishedAlias

func (p *GlobalAliases) GetPublishedAlias(key interface{}) smachine.SlotAliasValue

func (*GlobalAliases) PublishAlias

func (p *GlobalAliases) PublishAlias(key interface{}, slot smachine.SlotAliasValue) bool

func (*GlobalAliases) ReplaceAlias

func (p *GlobalAliases) ReplaceAlias(key interface{}, slot smachine.SlotAliasValue)

func (*GlobalAliases) UnpublishAlias

func (p *GlobalAliases) UnpublishAlias(key interface{})

type InputContext

type InputContext struct {
	PulseNumber pulse.Number
	PulseRange  pulse.Range
}

type InputEvent

type InputEvent = interface{}

type InputSetup

type InputSetup struct {
	TargetPulse pulse.Number
	CreateFn    smachine.CreateFunc
	PreInitFn   smachine.PreInitHandlerFunc
}

type PreparePulseCallbackFunc

type PreparePulseCallbackFunc = func(PreparedState)

type PreparedState

type PreparedState = beat.AckData

type PulseChanger

type PulseChanger interface {
	PreparePulseChange(out PreparePulseCallbackFunc)
	CancelPulseChange()
	CommitPulseChange()
}

type PulseConveyor

type PulseConveyor struct {
	// contains filtered or unexported fields
}

func NewPulseConveyor

func NewPulseConveyor(
	ctx context.Context,
	config PulseConveyorConfig,
	factoryFn PulseEventFactoryFunc,
	registry injector.DependencyRegistry,
) *PulseConveyor

func (*PulseConveyor) AddDependency

func (p *PulseConveyor) AddDependency(v interface{})

func (*PulseConveyor) AddInput

func (p *PulseConveyor) AddInput(ctx context.Context, pn pulse.Number, event InputEvent) error

func (*PulseConveyor) AddInputExt

func (p *PulseConveyor) AddInputExt(pn pulse.Number, event InputEvent,
	createDefaults smachine.CreateDefaultValues,
) error

func (*PulseConveyor) AddInterfaceDependency

func (p *PulseConveyor) AddInterfaceDependency(v interface{})

func (*PulseConveyor) AddManagedComponent

func (p *PulseConveyor) AddManagedComponent(c managed.Component)

func (*PulseConveyor) CancelPulseChange

func (p *PulseConveyor) CancelPulseChange() (err error)

func (*PulseConveyor) CommitPulseChange

func (p *PulseConveyor) CommitPulseChange(pr pulse.Range, pulseStart time.Time, online census.OnlinePopulation) error

func (*PulseConveyor) FindDependency

func (p *PulseConveyor) FindDependency(id string) (interface{}, bool)

func (*PulseConveyor) GetDataManager

func (p *PulseConveyor) GetDataManager() *PulseDataManager

func (*PulseConveyor) GetPublishedGlobalAliasAndBargeIn

func (p *PulseConveyor) GetPublishedGlobalAliasAndBargeIn(key interface{}) (smachine.SlotLink, smachine.BargeInHolder)

func (*PulseConveyor) PreparePulseChange

func (p *PulseConveyor) PreparePulseChange(out PreparePulseCallbackFunc) (err error)

func (*PulseConveyor) PutDependency

func (p *PulseConveyor) PutDependency(id string, v interface{})

func (*PulseConveyor) SetFactoryFunc

func (p *PulseConveyor) SetFactoryFunc(factory PulseEventFactoryFunc)

func (*PulseConveyor) StartWorker

func (p *PulseConveyor) StartWorker(emergencyStop <-chan struct{}, completedFn func())

func (*PulseConveyor) StartWorkerExt

func (p *PulseConveyor) StartWorkerExt(emergencyStop <-chan struct{}, completedFn func(), cycleFn PulseConveyorCycleFunc)

func (*PulseConveyor) Stop

func (p *PulseConveyor) Stop()

func (*PulseConveyor) StopNoWait

func (p *PulseConveyor) StopNoWait()

func (*PulseConveyor) TryPutDependency

func (p *PulseConveyor) TryPutDependency(id string, v interface{}) bool

func (*PulseConveyor) WakeUpWorker

func (p *PulseConveyor) WakeUpWorker()

type PulseConveyorConfig

type PulseConveyorConfig struct {
	ConveyorMachineConfig             smachine.SlotMachineConfig
	SlotMachineConfig                 smachine.SlotMachineConfig
	EventlessSleep                    time.Duration
	MinCachePulseAge, MaxPastPulseAge uint32
	PulseDataService                  PulseDataServicePrepareFunc
	PulseSlotMigration                PulseSlotPostMigrateFunc
}

type PulseConveyorCycleFunc

type PulseConveyorCycleFunc = func(CycleState)

type PulseDataCache

type PulseDataCache struct {
	// contains filtered or unexported fields
}

Cache that keeps (1) a PD younger than minRange (2) PD touched less than accessRotations ago. Safe for concurrent access. WARNING! Cache size is not directly limited. TODO PLAT-19 eviction function is not efficient for 100+ PDs and/or accessRotations > 10

func (*PulseDataCache) Check

func (p *PulseDataCache) Check(pn pulse.Number) BeatData

func (*PulseDataCache) Contains

func (p *PulseDataCache) Contains(pn pulse.Number) bool

func (*PulseDataCache) EvictAndRotate

func (p *PulseDataCache) EvictAndRotate(currentPN pulse.Number)

func (*PulseDataCache) EvictNoRotate

func (p *PulseDataCache) EvictNoRotate(currentPN pulse.Number)

func (*PulseDataCache) Get

func (p *PulseDataCache) Get(pn pulse.Number) BeatData

func (*PulseDataCache) GetMinRange

func (p *PulseDataCache) GetMinRange() uint32

func (*PulseDataCache) Init

func (p *PulseDataCache) Init(pdm *PulseDataManager, minRange uint32, accessRotations int)

func (*PulseDataCache) Put

func (p *PulseDataCache) Put(pd BeatData)

func (*PulseDataCache) Rotate

func (p *PulseDataCache) Rotate()

func (*PulseDataCache) Touch

func (p *PulseDataCache) Touch(pn pulse.Number) bool

type PulseDataManager

type PulseDataManager struct {
	// contains filtered or unexported fields
}

func (*PulseDataManager) GetBeatData

func (p *PulseDataManager) GetBeatData(pn pulse.Number) BeatData

func (*PulseDataManager) GetPresentPulse

func (p *PulseDataManager) GetPresentPulse() (present pulse.Number, nearestFuture pulse.Number)

func (*PulseDataManager) GetPrevBeatData

func (p *PulseDataManager) GetPrevBeatData() (pulse.Number, BeatData)

func (*PulseDataManager) GetPulseData

func (p *PulseDataManager) GetPulseData(pn pulse.Number) (pulse.Data, bool)

func (*PulseDataManager) HasPulseData

func (p *PulseDataManager) HasPulseData(pn pulse.Number) bool

for non-recent past HasPulseData() can be incorrect / incomplete

func (*PulseDataManager) IsAllowedFutureSpan

func (p *PulseDataManager) IsAllowedFutureSpan(futurePN pulse.Number) bool

IsAllowedFutureSpan Returns true when the given PN can be accepted into Future pulse slot, otherwise must be rejected

func (*PulseDataManager) IsAllowedPastSpan

func (p *PulseDataManager) IsAllowedPastSpan(pastPN pulse.Number) bool

func (*PulseDataManager) IsRecentPastRange

func (p *PulseDataManager) IsRecentPastRange(pastPN pulse.Number) bool

func (*PulseDataManager) TouchPulseData

func (p *PulseDataManager) TouchPulseData(pn pulse.Number) bool

type PulseEventFactoryFunc

type PulseEventFactoryFunc = func(context.Context, InputEvent, InputContext) (InputSetup, error)

PulseEventFactoryFunc should return pulse.Unknown or current.Pulse when SM doesn't need to be put into a different pulse slot. Arg (pulse.Range) can be nil for future slot.

type PulseSlot

type PulseSlot struct {
	// contains filtered or unexported fields
}

func NewPastPulseSlot

func NewPastPulseSlot(pulseManager *PulseDataManager, pr pulse.Range) PulseSlot

NewPastPulseSlot is for test use only

func NewPresentPulseSlot

func NewPresentPulseSlot(pulseManager *PulseDataManager, pr pulse.Range) PulseSlot

NewPresentPulseSlot is for test use only

func (*PulseSlot) BeatData

func (p *PulseSlot) BeatData() (BeatData, PulseSlotState)

func (*PulseSlot) CurrentPulseData

func (p *PulseSlot) CurrentPulseData() pulse.Data

func (*PulseSlot) CurrentPulseNumber

func (p *PulseSlot) CurrentPulseNumber() pulse.Number

func (*PulseSlot) HasPulseData

func (p *PulseSlot) HasPulseData(pn pulse.Number) bool

func (*PulseSlot) PrevOperationPulseNumber

func (p *PulseSlot) PrevOperationPulseNumber() pulse.Number

func (*PulseSlot) PulseData

func (p *PulseSlot) PulseData() pulse.Data

func (*PulseSlot) PulseNumber

func (p *PulseSlot) PulseNumber() pulse.Number

func (*PulseSlot) PulseRange

func (p *PulseSlot) PulseRange() (pulse.Range, PulseSlotState)

func (*PulseSlot) PulseRelativeDeadline

func (p *PulseSlot) PulseRelativeDeadline(portion float64) time.Time

func (*PulseSlot) PulseStartedAt

func (p *PulseSlot) PulseStartedAt() time.Time

func (*PulseSlot) SetPulseChanger

func (p *PulseSlot) SetPulseChanger(changer PulseChanger) bool

func (*PulseSlot) State

func (p *PulseSlot) State() PulseSlotState

type PulseSlotConfig

type PulseSlotConfig struct {
	// contains filtered or unexported fields
}

type PulseSlotMachine

type PulseSlotMachine struct {
	smachine.StateMachineDeclTemplate
	// contains filtered or unexported fields
}

func NewPulseSlotMachine

func NewPulseSlotMachine(config PulseSlotConfig, pulseManager *PulseDataManager) *PulseSlotMachine

func (*PulseSlotMachine) GetInitStateFor

func (p *PulseSlotMachine) GetInitStateFor(sm smachine.StateMachine) smachine.InitFunc

func (*PulseSlotMachine) GetStateMachineDeclaration

func (p *PulseSlotMachine) GetStateMachineDeclaration() smachine.StateMachineDeclaration
func (p *PulseSlotMachine) SlotLink() smachine.SlotLink

type PulseSlotPostMigrateFunc

type PulseSlotPostMigrateFunc = func(prevState PulseSlotState, slot *PulseSlot, h smachine.SlotMachineHolder)

PulseSlotPostMigrateFunc is called on migration and on creation of the slot. For creation (prevState) will be zero.

type PulseSlotState

type PulseSlotState uint8
const (
	Future PulseSlotState
	Present
	Past
	Antique // non-individual past
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL