dataflow

package
v0.0.0-...-e62a7ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRetry = errors.New("no VMs available, retry")
)

Functions

This section is empty.

Types

type BaseScheduler

type BaseScheduler struct {
	ChildScheduler Scheduler
	Scheduler
	// contains filtered or unexported fields
}

BaseScheduler ..

func NewBaseScheduler

func NewBaseScheduler(rescStates *resmngr.ResourceManagerStates) BaseScheduler

NewBaseScheduler ..

func (*BaseScheduler) CopyResources

func (s *BaseScheduler) CopyResources(source Scheduler)

CopyResources copies the fields to self

func (*BaseScheduler) GetBaseScheduler

func (s *BaseScheduler) GetBaseScheduler() *BaseScheduler

GetBaseScheduler ..

func (*BaseScheduler) GetDataFlow

func (s *BaseScheduler) GetDataFlow(name string) *Dataflow

GetDataFlow ..

func (*BaseScheduler) GetRescStates

func (s *BaseScheduler) GetRescStates() *resmngr.ResourceManagerStates

GetRescStates returns resource manager states

func (*BaseScheduler) RegisterDataFlow

func (s *BaseScheduler) RegisterDataFlow(name string, rawDataflow []byte) error

RegisterDataFlow ..

func (*BaseScheduler) RunDataflow

func (s *BaseScheduler) RunDataflow(name, inputData string) (string, error)

RunDataflow ..

func (*BaseScheduler) RunSingleFunction

func (s *BaseScheduler) RunSingleFunction(name, inputData string,
	schedulerHints *SchedulerHint, dfid string, nested bool) (InterStateMessage, error)

RunSingleFunction ..

func (*BaseScheduler) Schedule

func (s *BaseScheduler) Schedule(task *TaskState) error

Schedule choose where to execute something and return an IP:port

func (*BaseScheduler) SetProxyClient

func (s *BaseScheduler) SetProxyClient(pc *http.Client)

SetProxyClient ..

type BaseState

type BaseState struct {
	Name          string
	NextState     string //could be a pointer later
	DataFlowId    string // for tracing
	NumFanOut     int    // for rebalance on task state; assigned to df under map state
	NextStateIntr State
	IsEnd         bool
	In            chan InterStateMessage   //receive only channel
	Out           chan<- InterStateMessage //send only channel

	State
	// contains filtered or unexported fields
}

BaseState ..

func (*BaseState) ConnectInitial

func (b *BaseState) ConnectInitial() chan<- InterStateMessage

ConnectInitial creates a channel, sets as in for a state, and return send only chan

func (*BaseState) ConnectLast

func (b *BaseState) ConnectLast() <-chan InterStateMessage

ConnectLast creates channel, sets as out for state, returns recv only

func (*BaseState) ConnectTo

func (b *BaseState) ConnectTo(dest *BaseState)

ConnectTo connect two states by a channel

func (*BaseState) GetBaseState

func (b *BaseState) GetBaseState() *BaseState

GetBaseState ..

func (*BaseState) GetInputChannel

func (b *BaseState) GetInputChannel() chan InterStateMessage

GetInputChannel get the input channel, create if not exists

func (*BaseState) SetDoneChannel

func (b *BaseState) SetDoneChannel(done <-chan struct{})

SetDoneChannel ..

func (*BaseState) SetOutputChannel

func (b *BaseState) SetOutputChannel(out chan<- InterStateMessage)

SetOutputChannel ..

type ByIdx

type ByIdx []Output

func (ByIdx) Len

func (a ByIdx) Len() int

func (ByIdx) Less

func (a ByIdx) Less(i, j int) bool

func (ByIdx) Swap

func (a ByIdx) Swap(i, j int)

type ChoiceState

type ChoiceState struct {
	Choices []numericChoice //TODO: this should be a base class Choice or something

	BaseState
	// contains filtered or unexported fields
}

ChoiceState ..

func NewChoiceState

func NewChoiceState(name string, json *gabs.Container) *ChoiceState

NewChoiceState ..

func (*ChoiceState) Clone

func (p *ChoiceState) Clone() State

Clone creates a copy

func (*ChoiceState) Execute

func (p *ChoiceState) Execute()

Execute for ChoiceState TODO: we can make this loop in the base class State

func (*ChoiceState) GetPossibleNextStates

func (p *ChoiceState) GetPossibleNextStates() []string

GetPossibleNextStates returns all possible states we can go to

func (*ChoiceState) GetType

func (p *ChoiceState) GetType() string

GetType ...

func (*ChoiceState) SetNextStateChannel

func (p *ChoiceState) SetNextStateChannel(state string, c chan<- InterStateMessage)

SetNextStateChannel set the channel of a next state. caller is responsible for setting this for all possible states

type Dataflow

type Dataflow struct {
	StartState string
	States     map[string]State
}

Dataflow ..

func ParseJSON

func ParseJSON(json *gabs.Container) (*Dataflow, error)

ParseJSON ..

func ParseJSONFromBytes

func ParseJSONFromBytes(input []byte) (*Dataflow, error)

ParseJSONFromBytes ..

func (*Dataflow) Execute

func (d *Dataflow) Execute(inputData string, schedulerChan chan<- SchedulableMessage,
	dfid string, numFanOut int) string

Execute create all states and connect them current limitations (that might actually not be true): * only one final state * first state cannot be Choice * branches/choices are really sketchy to create with the original design

type InterStateMessage

type InterStateMessage struct {
	Body          string
	SchedulerChan chan<- SchedulableMessage
	Hints         SchedulerHint
}

InterStateMessage ..

type MapState

type MapState struct {
	BaseState
	// contains filtered or unexported fields
}

MapState ..

func NewMapState

func NewMapState(name string, json *gabs.Container, next string, isEnd bool) *MapState

NewMapState ..

func (*MapState) Clone

func (m *MapState) Clone() State

Clone ..

func (*MapState) Execute

func (m *MapState) Execute()

Execute for MapState TODO: we can make this loop in the base class State

func (*MapState) GetType

func (m *MapState) GetType() string

GetType returns type "Map"

type Output

type Output struct {
	// contains filtered or unexported fields
}

type ParallelState

type ParallelState struct {
	BaseState
	// contains filtered or unexported fields
}

ParallelState contains data for Parallel State

func NewParallelState

func NewParallelState(name string, json *gabs.Container, next string, isEnd bool) *ParallelState

NewParallelState creates a new ParallelState

func NewSucceedStateState

func NewSucceedStateState(name string, json *gabs.Container) *ParallelState

NewSucceedStateState creates a new ParallelState

func (*ParallelState) Clone

func (p *ParallelState) Clone() State

Clone creates a copy of ParallelState

func (*ParallelState) Execute

func (p *ParallelState) Execute()

Execute for ParallelState TODO: we can make this loop in the base class State

func (*ParallelState) GetType

func (p *ParallelState) GetType() string

GetType ...

type RebalanceMapState

type RebalanceMapState struct {
	BaseState
	// contains filtered or unexported fields
}

MapState ..

func NewRebalanceMapState

func NewRebalanceMapState(name string, json *gabs.Container, next string, isEnd bool) *RebalanceMapState

NewMapState ..

func (*RebalanceMapState) Clone

func (m *RebalanceMapState) Clone() State

Clone ..

func (*RebalanceMapState) Execute

func (m *RebalanceMapState) Execute()

Execute for RebalanceMapState TODO: we can make this loop in the base class State

func (*RebalanceMapState) GetType

func (m *RebalanceMapState) GetType() string

GetType returns type "RebalanceMap"

type SchedulableMessage

type SchedulableMessage struct {
	State  State
	Signal *sync.Cond
	End    bool
}

type Scheduler

type Scheduler interface {
	//the functions that need overloading are Schedule and RunScheduler
	RunScheduler(schedulerChan <-chan SchedulableMessage) error
	ChooseReplica(state *TaskState) (string, string, uint32, int, *models.GPUNode, error) //TODO: make hint here
	//these are implemented in BaseScheduler and don't need to be overloaded
	Schedule(state *TaskState) error
	RunSingleFunction(name, input string,
		schedulerHints *SchedulerHint, dfid string, nested bool) (InterStateMessage, error)
	RunDataflow(name, input string) (string, error)
	RegisterDataFlow(name string, rawDataflow []byte) error
	GetDataFlow(name string) *Dataflow
	SetProxyClient(pc *http.Client)
	CopyResources(Scheduler)
	GetBaseScheduler() *BaseScheduler
}

Scheduler defines functions that a scheduler need to implement

type SchedulerHint

type SchedulerHint struct {
	Reads, Uploads  int
	ScheduledNodeIP string
	FunctionID      string
}

SchedulerHint ..

type State

type State interface {
	Execute()
	GetBaseState() *BaseState
	Clone() State
	GetType() string
}

State ..

func NewTaskFromJSON

func NewTaskFromJSON(key string, child *gabs.Container) State

NewTaskFromJSON this is nice but at the same time requires all parameters all states can use

type SucceedState

type SucceedState struct {
	BaseState
}

SucceedState contains data for Parallel State

func (*SucceedState) Clone

func (p *SucceedState) Clone() State

Clone ..

func (*SucceedState) Execute

func (p *SucceedState) Execute()

Execute just returns the input succeed doesnt need a loop since it only executes once

func (*SucceedState) GetType

func (p *SucceedState) GetType() string

GetType ...

type TaskState

type TaskState struct {
	Resource string //name of function

	SkipScheduling bool
	IsReady        bool
	//these three things are what the scheduler needs to fill
	ExecutorAddress *url.URL
	FunctionIP      string
	Vmid            string
	ProxyClient     *http.Client
	SchedulingHints *SchedulerHint

	TaskID string
	Nested bool
	BaseState
	// contains filtered or unexported fields
}

func NewTaskState

func NewTaskState(name, fn, inputPath, wipath, uwpath, next string, isEnd bool,
	skipScheduling bool, balance bool) *TaskState

NewTaskState ..

func (*TaskState) Clone

func (t *TaskState) Clone() State

Clone ..

func (*TaskState) Execute

func (t *TaskState) Execute()

Execute .. TODO: we can make this loop in the base class State

func (*TaskState) GetFunctionName

func (t *TaskState) GetFunctionName() string

GetFunctionName ..

func (*TaskState) GetType

func (t *TaskState) GetType() string

GetType returns "Task"

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL