statemachine

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrFatal     = errors.New("fatal error")
	ErrTransient = errors.New("transient error")
)
View Source
var ContractStates = contractstatesContainer{
	INITIAL: ContractState{
		// contains filtered or unexported fields
	},
	REQUESTED: ContractState{
		// contains filtered or unexported fields
	},
	OFFERED: ContractState{
		// contains filtered or unexported fields
	},
	AGREED: ContractState{
		// contains filtered or unexported fields
	},
	ACCEPTED: ContractState{
		// contains filtered or unexported fields
	},
	VERIFIED: ContractState{
		// contains filtered or unexported fields
	},
	FINALIZED: ContractState{
		// contains filtered or unexported fields
	},
	TERMINATED: ContractState{
		// contains filtered or unexported fields
	},
}
View Source
var ErrNotFound = errors.New("not found")
View Source
var ErrNotImplemented = errors.New("not implemented")
View Source
var TransferRequestStates = transferrequeststatesContainer{
	TRANSFERINITIAL: TransferRequestState{
		// contains filtered or unexported fields
	},
	TRANSFERREQUESTED: TransferRequestState{
		// contains filtered or unexported fields
	},
	STARTED: TransferRequestState{
		// contains filtered or unexported fields
	},
	SUSPENDED: TransferRequestState{
		// contains filtered or unexported fields
	},
	COMPLETED: TransferRequestState{
		// contains filtered or unexported fields
	},
	TRANSFERTERMINATED: TransferRequestState{
		// contains filtered or unexported fields
	},
}

Functions

func ExhaustiveContractStates

func ExhaustiveContractStates(f func(ContractState))

func ExhaustiveTransferRequestStates

func ExhaustiveTransferRequestStates(f func(TransferRequestState))

Types

type Archiver

type Archiver interface {
	GetProviderContract(ctx context.Context, providerPID uuid.UUID) (*Contract, error)
	PutProviderContract(ctx context.Context, contract *Contract) error
	GetConsumerContract(ctx context.Context, consumerPID uuid.UUID) (*Contract, error)
	PutConsumerContract(ctx context.Context, contract *Contract) error

	GetAgreement(ctx context.Context, agreementID uuid.UUID) (*odrl.Agreement, error)
	DelAgreement(ctx context.Context, agreementID uuid.UUID) error
	PutAgreement(ctx context.Context, agreement *odrl.Agreement) error

	GetProviderTransfer(ctx context.Context, providerPID uuid.UUID) (*TransferRequest, error)
	PutProviderTransfer(ctx context.Context, contract *TransferRequest) error
	GetConsumerTransfer(ctx context.Context, consumerPID uuid.UUID) (*TransferRequest, error)
	PutConsumerTransfer(ctx context.Context, contract *TransferRequest) error
}

type Contract

type Contract struct {
	// contains filtered or unexported fields
}

Contract represents a contract negotiation.

func (*Contract) Copy

func (cn *Contract) Copy() *Contract

Copy does a deep copy of a contract, here mostly for a workaround that will go away once we implement a reconciliation loop.

func (*Contract) GetAgreement

func (cn *Contract) GetAgreement() odrl.Agreement

func (*Contract) GetCallback

func (cn *Contract) GetCallback() *url.URL

func (*Contract) GetConsumerPID

func (cn *Contract) GetConsumerPID() uuid.UUID

func (*Contract) GetContract

func (cn *Contract) GetContract() *Contract

func (*Contract) GetContractNegotiation

func (cn *Contract) GetContractNegotiation() shared.ContractNegotiation

GetContractNegotiation returns a ContractNegotion message.

func (*Contract) GetOffer

func (cn *Contract) GetOffer() odrl.Offer

func (*Contract) GetProviderPID

func (cn *Contract) GetProviderPID() uuid.UUID

func (*Contract) GetRole

func (cn *Contract) GetRole() DataspaceRole

func (*Contract) GetSelf

func (cn *Contract) GetSelf() *url.URL

func (*Contract) GetState

func (cn *Contract) GetState() ContractState

func (*Contract) SetCallback

func (cn *Contract) SetCallback(u string) error

SetCallback sets the remote callback root.

func (*Contract) SetConsumerPID

func (cn *Contract) SetConsumerPID(u uuid.UUID)

func (*Contract) SetProviderPID

func (cn *Contract) SetProviderPID(u uuid.UUID)

func (*Contract) SetState

func (cn *Contract) SetState(state ContractState) error

type ContractNegotiationAccepted

type ContractNegotiationAccepted struct {
	*Contract
	// contains filtered or unexported fields
}

func (*ContractNegotiationAccepted) GetArchiver

func (cd *ContractNegotiationAccepted) GetArchiver() Archiver

func (*ContractNegotiationAccepted) GetProvider

func (cd *ContractNegotiationAccepted) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationAccepted) GetReconciler

func (cd *ContractNegotiationAccepted) GetReconciler() *Reconciler

func (*ContractNegotiationAccepted) Recv

Recv gets called on the consumer when the provider sends a contract agreement message.

func (*ContractNegotiationAccepted) Send

func (cn *ContractNegotiationAccepted) Send(ctx context.Context) (func(), error)

type ContractNegotiationAgreed

type ContractNegotiationAgreed struct {
	*Contract
	// contains filtered or unexported fields
}

func (*ContractNegotiationAgreed) GetArchiver

func (cd *ContractNegotiationAgreed) GetArchiver() Archiver

func (*ContractNegotiationAgreed) GetProvider

func (cd *ContractNegotiationAgreed) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationAgreed) GetReconciler

func (cd *ContractNegotiationAgreed) GetReconciler() *Reconciler

func (*ContractNegotiationAgreed) Recv

func (*ContractNegotiationAgreed) Send

func (cn *ContractNegotiationAgreed) Send(ctx context.Context) (func(), error)

type ContractNegotiationFinalized

type ContractNegotiationFinalized struct {
	*Contract
	// contains filtered or unexported fields
}

func (*ContractNegotiationFinalized) GetArchiver

func (cd *ContractNegotiationFinalized) GetArchiver() Archiver

func (*ContractNegotiationFinalized) GetProvider

func (cd *ContractNegotiationFinalized) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationFinalized) GetReconciler

func (cd *ContractNegotiationFinalized) GetReconciler() *Reconciler

func (*ContractNegotiationFinalized) Recv

func (*ContractNegotiationFinalized) Send

func (cn *ContractNegotiationFinalized) Send(ctx context.Context) (func(), error)

type ContractNegotiationInitial

type ContractNegotiationInitial struct {
	*Contract
	// contains filtered or unexported fields
}

ContractNegotiationInitial is an initial state for a contract that hasn't been actually been submitted yet.

func (*ContractNegotiationInitial) GetArchiver

func (cd *ContractNegotiationInitial) GetArchiver() Archiver

func (*ContractNegotiationInitial) GetProvider

func (cd *ContractNegotiationInitial) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationInitial) GetReconciler

func (cd *ContractNegotiationInitial) GetReconciler() *Reconciler

func (*ContractNegotiationInitial) Recv

Recv on the initial state gets called on both the provider and consumer, it's only called when a consumer receives an initial request message, or a provider receives an initial offer message. It will set the desired states but not generate the missing PID.

func (*ContractNegotiationInitial) Send

func (cn *ContractNegotiationInitial) Send(ctx context.Context) (func(), error)

Send progresses to the next state for the INITIAL state. This needs either the contract's consumer or provider PID set, but not both. If the provider PID is set, it will send out a contract offer to the callback. If the consumer PID is set, it will send out a contract request to the callback.

type ContractNegotiationOffered

type ContractNegotiationOffered struct {
	*Contract
	// contains filtered or unexported fields
}

func (*ContractNegotiationOffered) GetArchiver

func (cd *ContractNegotiationOffered) GetArchiver() Archiver

func (*ContractNegotiationOffered) GetProvider

func (cd *ContractNegotiationOffered) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationOffered) GetReconciler

func (cd *ContractNegotiationOffered) GetReconciler() *Reconciler

func (*ContractNegotiationOffered) Recv

Recv gets called when a provider receives a request message. It will verify it and set the proper status for the next step.

func (*ContractNegotiationOffered) Send

func (cn *ContractNegotiationOffered) Send(ctx context.Context) (func(), error)

type ContractNegotiationRequested

type ContractNegotiationRequested struct {
	*Contract
	// contains filtered or unexported fields
}

ContractNegotiationRequested represents the requested state.

func (*ContractNegotiationRequested) GetArchiver

func (cd *ContractNegotiationRequested) GetArchiver() Archiver

func (*ContractNegotiationRequested) GetProvider

func (cd *ContractNegotiationRequested) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationRequested) GetReconciler

func (cd *ContractNegotiationRequested) GetReconciler() *Reconciler

func (*ContractNegotiationRequested) Recv

Recv gets called when a consumer receives a request message, it will verify the PIDs, and forcefully set the callback. After that it will set the status of the contract to OFFERED.

func (*ContractNegotiationRequested) Send

func (cn *ContractNegotiationRequested) Send(ctx context.Context) (func(), error)

Send determines if an offer of agreemetn has to be sent.

type ContractNegotiationState

type ContractNegotiationState interface {
	Contracter
	Recv(ctx context.Context, message any) (context.Context, ContractNegotiationState, error)
	Send(ctx context.Context) (func(), error)
	GetArchiver() Archiver
	GetProvider() providerv1.ProviderServiceClient
	GetReconciler() *Reconciler
}

ContractArchiver is an interface to retrieving and saving of contract negotiations. ContractPhase represents a contract in a certain state.

func NewContract

func NewContract(
	ctx context.Context,
	store Archiver,
	provider providerv1.ProviderServiceClient,
	reconciler *Reconciler,
	providerPID, consumerPID uuid.UUID,
	state ContractState,
	offer odrl.Offer,
	callback, self *url.URL,
	role DataspaceRole,
) (context.Context, ContractNegotiationState, error)

type ContractNegotiationTerminated

type ContractNegotiationTerminated struct {
	*Contract
	// contains filtered or unexported fields
}

func (*ContractNegotiationTerminated) GetArchiver

func (cd *ContractNegotiationTerminated) GetArchiver() Archiver

func (*ContractNegotiationTerminated) GetProvider

func (cd *ContractNegotiationTerminated) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationTerminated) GetReconciler

func (cd *ContractNegotiationTerminated) GetReconciler() *Reconciler

func (*ContractNegotiationTerminated) Recv

func (*ContractNegotiationTerminated) Send

func (cn *ContractNegotiationTerminated) Send(ctx context.Context) (func(), error)

type ContractNegotiationVerified

type ContractNegotiationVerified struct {
	*Contract
	// contains filtered or unexported fields
}

func (*ContractNegotiationVerified) GetArchiver

func (cd *ContractNegotiationVerified) GetArchiver() Archiver

func (*ContractNegotiationVerified) GetProvider

func (cd *ContractNegotiationVerified) GetProvider() providerv1.ProviderServiceClient

func (*ContractNegotiationVerified) GetReconciler

func (cd *ContractNegotiationVerified) GetReconciler() *Reconciler

func (*ContractNegotiationVerified) Recv

func (*ContractNegotiationVerified) Send

func (cn *ContractNegotiationVerified) Send(ctx context.Context) (func(), error)

type ContractState

type ContractState struct {
	// contains filtered or unexported fields
}

func ParseContractState

func ParseContractState(a any) (ContractState, error)

func (ContractState) IsValid

func (p ContractState) IsValid() bool

func (ContractState) MarshalJSON

func (p ContractState) MarshalJSON() ([]byte, error)

func (*ContractState) Scan

func (p *ContractState) Scan(value any) error

func (ContractState) String

func (i ContractState) String() string

func (*ContractState) UnmarshalJSON

func (p *ContractState) UnmarshalJSON(b []byte) error

func (ContractState) Value

func (p ContractState) Value() (driver.Value, error)

type Contracter

type Contracter interface {
	GetProviderPID() uuid.UUID
	GetConsumerPID() uuid.UUID
	GetState() ContractState
	GetCallback() *url.URL
	SetCallback(u string) error
	GetSelf() *url.URL
	SetState(state ContractState) error
	GetContract() *Contract
	GetOffer() odrl.Offer
	GetContractNegotiation() shared.ContractNegotiation
}

type DataspaceRole

type DataspaceRole int8
const (
	DataspaceConsumer DataspaceRole = iota
	DataspaceProvider
)

type MemoryArchiver

type MemoryArchiver struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewMemoryArchiver

func NewMemoryArchiver() *MemoryArchiver

func (*MemoryArchiver) DelAgreement

func (ma *MemoryArchiver) DelAgreement(ctx context.Context, agreementID uuid.UUID) error

func (*MemoryArchiver) GetAgreement

func (ma *MemoryArchiver) GetAgreement(ctx context.Context, agreementID uuid.UUID) (*odrl.Agreement, error)

func (*MemoryArchiver) GetConsumerContract

func (ma *MemoryArchiver) GetConsumerContract(ctx context.Context, pid uuid.UUID) (*Contract, error)

func (*MemoryArchiver) GetConsumerTransfer

func (ma *MemoryArchiver) GetConsumerTransfer(ctx context.Context, pid uuid.UUID) (*TransferRequest, error)

func (*MemoryArchiver) GetProviderContract

func (ma *MemoryArchiver) GetProviderContract(ctx context.Context, pid uuid.UUID) (*Contract, error)

func (*MemoryArchiver) GetProviderTransfer

func (ma *MemoryArchiver) GetProviderTransfer(ctx context.Context, pid uuid.UUID) (*TransferRequest, error)

func (*MemoryArchiver) PutAgreement

func (ma *MemoryArchiver) PutAgreement(ctx context.Context, agreement *odrl.Agreement) error

func (*MemoryArchiver) PutConsumerContract

func (ma *MemoryArchiver) PutConsumerContract(ctx context.Context, contract *Contract) error

func (*MemoryArchiver) PutConsumerTransfer

func (ma *MemoryArchiver) PutConsumerTransfer(ctx context.Context, transfer *TransferRequest) error

func (*MemoryArchiver) PutProviderContract

func (ma *MemoryArchiver) PutProviderContract(ctx context.Context, contract *Contract) error

func (*MemoryArchiver) PutProviderTransfer

func (ma *MemoryArchiver) PutProviderTransfer(ctx context.Context, transfer *TransferRequest) error

type Reconciler

type Reconciler struct {
	// contains filtered or unexported fields
}

Reconciler is a naive reconciler loop. This should eventually be in its own package. TODO: This needs to be much more robust, right now it doesn't really do anything with the next attempt time or te amount of attempts. It also doesn't support any persistence.

func NewReconciler

func NewReconciler(ctx context.Context, r shared.Requester, a Archiver) *Reconciler

func (*Reconciler) Add

func (r *Reconciler) Add(entry ReconciliationEntry)

func (*Reconciler) Run

func (r *Reconciler) Run()

type ReconciliationEntry

type ReconciliationEntry struct {
	EntityID    uuid.UUID
	Type        ReconciliationType
	Role        DataspaceRole
	TargetState string
	NextAttempt time.Time
	Method      string
	URL         *url.URL
	Body        []byte
	Attempts    int
	Context     context.Context
}

type ReconciliationType

type ReconciliationType uint
const (
	ReconciliationUndefined ReconciliationType = iota
	ReconciliationContract
	ReconciliationTransferRequest
)

type TransferDirection

type TransferDirection uint8
const (
	DirectionUnknown TransferDirection = iota
	DirectionPull
	DirectionPush
)

type TransferRequest

type TransferRequest struct {
	// contains filtered or unexported fields
}

TransferRequest represents a transfer request and its state.

func (*TransferRequest) GetAgreementID

func (tr *TransferRequest) GetAgreementID() uuid.UUID

func (*TransferRequest) GetCallback

func (tr *TransferRequest) GetCallback() *url.URL

func (*TransferRequest) GetConsumerPID

func (tr *TransferRequest) GetConsumerPID() uuid.UUID

func (*TransferRequest) GetFormat

func (tr *TransferRequest) GetFormat() string

func (*TransferRequest) GetProviderPID

func (tr *TransferRequest) GetProviderPID() uuid.UUID

func (*TransferRequest) GetPublishInfo

func (tr *TransferRequest) GetPublishInfo() *providerv1.PublishInfo

func (*TransferRequest) GetRole

func (tr *TransferRequest) GetRole() DataspaceRole

func (*TransferRequest) GetSelf

func (tr *TransferRequest) GetSelf() *url.URL

func (*TransferRequest) GetState

func (tr *TransferRequest) GetState() TransferRequestState

func (*TransferRequest) GetTarget

func (tr *TransferRequest) GetTarget() string

func (*TransferRequest) GetTransferDirection

func (tr *TransferRequest) GetTransferDirection() TransferDirection

func (*TransferRequest) GetTransferProcess

func (tr *TransferRequest) GetTransferProcess() shared.TransferProcess

func (*TransferRequest) GetTransferRequest

func (tr *TransferRequest) GetTransferRequest() *TransferRequest

func (*TransferRequest) SetState

func (tr *TransferRequest) SetState(state TransferRequestState) error

type TransferRequestNegotiationCompleted

type TransferRequestNegotiationCompleted struct {
	*TransferRequest
	// contains filtered or unexported fields
}

func (*TransferRequestNegotiationCompleted) GetArchiver

func (cd *TransferRequestNegotiationCompleted) GetArchiver() Archiver

func (*TransferRequestNegotiationCompleted) GetProvider

func (cd *TransferRequestNegotiationCompleted) GetProvider() providerv1.ProviderServiceClient

func (*TransferRequestNegotiationCompleted) GetReconciler

func (cd *TransferRequestNegotiationCompleted) GetReconciler() *Reconciler

func (*TransferRequestNegotiationCompleted) Recv

func (*TransferRequestNegotiationCompleted) Send

func (tr *TransferRequestNegotiationCompleted) Send(ctx context.Context) (func(), error)

type TransferRequestNegotiationInitial

type TransferRequestNegotiationInitial struct {
	*TransferRequest
	// contains filtered or unexported fields
}

func (*TransferRequestNegotiationInitial) GetArchiver

func (cd *TransferRequestNegotiationInitial) GetArchiver() Archiver

func (*TransferRequestNegotiationInitial) GetProvider

func (cd *TransferRequestNegotiationInitial) GetProvider() providerv1.ProviderServiceClient

func (*TransferRequestNegotiationInitial) GetReconciler

func (cd *TransferRequestNegotiationInitial) GetReconciler() *Reconciler

func (*TransferRequestNegotiationInitial) Recv

func (*TransferRequestNegotiationInitial) Send

func (tr *TransferRequestNegotiationInitial) Send(ctx context.Context) (func(), error)

type TransferRequestNegotiationRequested

type TransferRequestNegotiationRequested struct {
	*TransferRequest
	// contains filtered or unexported fields
}

func (*TransferRequestNegotiationRequested) GetArchiver

func (cd *TransferRequestNegotiationRequested) GetArchiver() Archiver

func (*TransferRequestNegotiationRequested) GetProvider

func (cd *TransferRequestNegotiationRequested) GetProvider() providerv1.ProviderServiceClient

func (*TransferRequestNegotiationRequested) GetReconciler

func (cd *TransferRequestNegotiationRequested) GetReconciler() *Reconciler

func (*TransferRequestNegotiationRequested) Recv

func (*TransferRequestNegotiationRequested) Send

func (tr *TransferRequestNegotiationRequested) Send(ctx context.Context) (func(), error)

type TransferRequestNegotiationStarted

type TransferRequestNegotiationStarted struct {
	*TransferRequest
	// contains filtered or unexported fields
}

func (*TransferRequestNegotiationStarted) GetArchiver

func (cd *TransferRequestNegotiationStarted) GetArchiver() Archiver

func (*TransferRequestNegotiationStarted) GetProvider

func (cd *TransferRequestNegotiationStarted) GetProvider() providerv1.ProviderServiceClient

func (*TransferRequestNegotiationStarted) GetReconciler

func (cd *TransferRequestNegotiationStarted) GetReconciler() *Reconciler

func (*TransferRequestNegotiationStarted) Recv

func (*TransferRequestNegotiationStarted) Send

func (tr *TransferRequestNegotiationStarted) Send(ctx context.Context) (func(), error)

type TransferRequestNegotiationState

type TransferRequestNegotiationState interface {
	TransferRequester
	Recv(ctx context.Context, message any) (TransferRequestNegotiationState, error)
	Send(ctx context.Context) (func(), error)
	GetArchiver() Archiver
	GetProvider() providerv1.ProviderServiceClient
	GetReconciler() *Reconciler
}

func NewTransferRequest

func NewTransferRequest(
	ctx context.Context,
	store Archiver,
	provider providerv1.ProviderServiceClient,
	reconciler *Reconciler,
	consumerPID, agreementID uuid.UUID,
	format string,
	callback, self *url.URL,
	role DataspaceRole,
	state TransferRequestState,
	publishInfo *providerv1.PublishInfo,
) (TransferRequestNegotiationState, error)

type TransferRequestNegotiationSuspended

type TransferRequestNegotiationSuspended struct {
	*TransferRequest
	// contains filtered or unexported fields
}

func (*TransferRequestNegotiationSuspended) GetArchiver

func (cd *TransferRequestNegotiationSuspended) GetArchiver() Archiver

func (*TransferRequestNegotiationSuspended) GetProvider

func (cd *TransferRequestNegotiationSuspended) GetProvider() providerv1.ProviderServiceClient

func (*TransferRequestNegotiationSuspended) GetReconciler

func (cd *TransferRequestNegotiationSuspended) GetReconciler() *Reconciler

type TransferRequestNegotiationTerminated

type TransferRequestNegotiationTerminated struct {
	*TransferRequest
	// contains filtered or unexported fields
}

func (*TransferRequestNegotiationTerminated) GetArchiver

func (cd *TransferRequestNegotiationTerminated) GetArchiver() Archiver

func (*TransferRequestNegotiationTerminated) GetProvider

func (cd *TransferRequestNegotiationTerminated) GetProvider() providerv1.ProviderServiceClient

func (*TransferRequestNegotiationTerminated) GetReconciler

func (cd *TransferRequestNegotiationTerminated) GetReconciler() *Reconciler

type TransferRequestState

type TransferRequestState struct {
	// contains filtered or unexported fields
}

func ParseTransferRequestState

func ParseTransferRequestState(a any) (TransferRequestState, error)

func (TransferRequestState) IsValid

func (p TransferRequestState) IsValid() bool

func (TransferRequestState) MarshalJSON

func (p TransferRequestState) MarshalJSON() ([]byte, error)

func (*TransferRequestState) Scan

func (p *TransferRequestState) Scan(value any) error

func (TransferRequestState) String

func (i TransferRequestState) String() string

func (*TransferRequestState) UnmarshalJSON

func (p *TransferRequestState) UnmarshalJSON(b []byte) error

func (TransferRequestState) Value

func (p TransferRequestState) Value() (driver.Value, error)

type TransferRequester

type TransferRequester interface {
	GetProviderPID() uuid.UUID
	GetConsumerPID() uuid.UUID
	GetAgreementID() uuid.UUID
	GetTarget() string
	GetFormat() string
	GetCallback() *url.URL
	GetSelf() *url.URL
	GetState() TransferRequestState
	GetRole() DataspaceRole
	SetState(state TransferRequestState) error
	GetTransferRequest() *TransferRequest
	GetPublishInfo() *providerv1.PublishInfo
	GetTransferDirection() TransferDirection
	GetTransferProcess() shared.TransferProcess
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL