channels

package
v1.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2022 License: Apache-2.0, MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ChannelEvents = fsm.Events{

	fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.TransferRequestQueued).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.Message = ""
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.Message = ""
		chst.AddLog("")
		return nil
	}),
	fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Opened).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.Message = ""
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, rcvdBlocksTotal int64) error {
			if rcvdBlocksTotal > chst.ReceivedBlocksTotal {
				chst.ReceivedBlocksTotal = rcvdBlocksTotal
			}
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
		Action(func(chst *internal.ChannelState, delta uint64) error {
			chst.Received += delta
			chst.AddLog("received data")
			return nil
		}),

	fsm.Event(datatransfer.DataSent).
		FromMany(transferringStates...).ToNoChange().
		From(datatransfer.TransferFinished).ToNoChange().
		Action(func(chst *internal.ChannelState, sentBlocksTotal int64) error {
			if sentBlocksTotal > chst.SentBlocksTotal {
				chst.SentBlocksTotal = sentBlocksTotal
			}
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
		Action(func(chst *internal.ChannelState, delta uint64) error {
			chst.Sent += delta
			chst.AddLog("sending data")
			return nil
		}),

	fsm.Event(datatransfer.DataQueued).
		FromMany(transferringStates...).ToNoChange().
		From(datatransfer.TransferFinished).ToNoChange().
		Action(func(chst *internal.ChannelState, queuedBlocksTotal int64) error {
			if queuedBlocksTotal > chst.QueuedBlocksTotal {
				chst.QueuedBlocksTotal = queuedBlocksTotal
			}
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
		Action(func(chst *internal.ChannelState, delta uint64) error {
			chst.Queued += delta
			chst.AddLog("")
			return nil
		}),
	fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer disconnected: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer send error: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.ReceiveDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer receive error: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.RequestCancelled).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer request cancelled: %s", chst.Message)
		return nil
	}),
	fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
		chst.Message = err.Error()
		chst.AddLog("data transfer erred: %s", chst.Message)
		return nil
	}),

	fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
			chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
			chst.AddLog("got new voucher")
			return nil
		}),
	fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
		Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error {
			chst.VoucherResults = append(chst.VoucherResults,
				internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
			chst.AddLog("got new voucher result")
			return nil
		}),

	fsm.Event(datatransfer.PauseInitiator).
		FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
		From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
		FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.PauseResponder).
		FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
		From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
		FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.ResumeInitiator).
		From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
		From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
		FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.ResumeResponder).
		From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
		From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
		From(datatransfer.Finalizing).To(datatransfer.Completing).
		FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.FinishTransfer).
		FromAny().To(datatransfer.TransferFinished).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
		From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished).
		From(datatransfer.Requested).To(datatransfer.Completing).
		Action(func(chst *internal.ChannelState) error {
			chst.AddLog("")
			return nil
		}),

	fsm.Event(datatransfer.ResponderBeginsFinalization).
		FromAny().To(datatransfer.ResponderFinalizing).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.ResponderCompletes).
		FromAny().To(datatransfer.ResponderCompleted).
		FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
		From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
		From(datatransfer.TransferFinished).To(datatransfer.Completing).
		From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
		From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.CleanupComplete).
		From(datatransfer.Cancelling).To(datatransfer.Cancelled).
		From(datatransfer.Failing).To(datatransfer.Failed).
		From(datatransfer.Completing).To(datatransfer.Completed).Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),

	fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
		chst.AddLog("")
		return nil
	}),
}

ChannelEvents describe the events taht can

ChannelFinalityStates are the final states for a channel

View Source
var ChannelStateEntryFuncs = fsm.StateEntryFuncs{
	datatransfer.Cancelling: cleanupConnection,
	datatransfer.Failing:    cleanupConnection,
	datatransfer.Completing: cleanupConnection,
}

ChannelStateEntryFuncs are handlers called as we enter different states (currently unused for this fsm)

CleanupStates are the penultimate states for a channel

View Source
var EmptyChannelState = channelState{}

EmptyChannelState is the zero value for channel state, meaning not present

View Source
var ErrWrongType = errors.New("Cannot change type of implementation specific data after setting it")

ErrWrongType is returned when a caller attempts to change the type of implementation data after setting it

Functions

func IsChannelCleaningUp added in v0.9.0

func IsChannelCleaningUp(st datatransfer.Status) bool

IsChannelCleaningUp returns true if channel was being cleaned up and finished

func IsChannelTerminated added in v0.9.0

func IsChannelTerminated(st datatransfer.Status) bool

IsChannelTerminated returns true if the channel is in a finality state

func NewErrNotFound added in v1.2.7

func NewErrNotFound(chid datatransfer.ChannelID) error

Types

type ChannelEnvironment added in v0.4.0

type ChannelEnvironment interface {
	Protect(id peer.ID, tag string)
	Unprotect(id peer.ID, tag string) bool
	ID() peer.ID
	CleanupChannel(chid datatransfer.ChannelID)
}

ChannelEnvironment -- just a proxy for DTNetwork for now

type Channels

type Channels struct {
	// contains filtered or unexported fields
}

Channels is a thread safe list of channels

func New

func New(ds datastore.Batching,
	notifier Notifier,
	voucherDecoder DecoderByTypeFunc,
	voucherResultDecoder DecoderByTypeFunc,
	env ChannelEnvironment,
	selfPeer peer.ID) (*Channels, error)

New returns a new thread safe list of channels

func (*Channels) Accept added in v0.4.0

func (c *Channels) Accept(chid datatransfer.ChannelID) error

Accept marks a data transfer as accepted

func (*Channels) BeginFinalizing added in v0.4.0

func (c *Channels) BeginFinalizing(chid datatransfer.ChannelID) error

BeginFinalizing indicates a responder has finished processing but is awaiting confirmation from the initiator

func (*Channels) Cancel added in v0.4.0

func (c *Channels) Cancel(chid datatransfer.ChannelID) error

Cancel indicates a channel was cancelled prematurely

func (*Channels) ChannelOpened added in v1.11.4

func (c *Channels) ChannelOpened(chid datatransfer.ChannelID) error

func (*Channels) Complete added in v0.4.0

func (c *Channels) Complete(chid datatransfer.ChannelID) error

Complete indicates responder has completed sending/receiving data

func (*Channels) CompleteCleanupOnRestart added in v0.9.0

func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error

func (*Channels) CreateNew

func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error)

CreateNew creates a new channel id and channel state and saves to channels. returns error if the channel exists already.

func (*Channels) DataQueued added in v1.0.0

func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)

func (*Channels) DataReceived added in v0.9.0

func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)

Returns true if this is the first time the block has been received

func (*Channels) DataSent added in v0.9.0

func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)

func (*Channels) Disconnected added in v0.9.0

func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error

Disconnected indicates that the connection went down and it was not possible to restart it

func (*Channels) Error added in v0.4.0

func (c *Channels) Error(chid datatransfer.ChannelID, err error) error

Error indicates something that went wrong on a channel

func (*Channels) FinishTransfer added in v0.4.0

func (c *Channels) FinishTransfer(chid datatransfer.ChannelID) error

FinishTransfer an initiator has finished sending/receiving data

func (*Channels) GetByID added in v0.4.0

GetByID searches for a channel in the slice of channels with id `chid`. Returns datatransfer.EmptyChannelState if there is no channel with that id

func (*Channels) HasChannel added in v0.4.0

func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error)

HasChannel returns true if the given channel id is being tracked

func (*Channels) InProgress

InProgress returns a list of in progress channels

func (*Channels) NewVoucher added in v0.4.0

func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error

NewVoucher records a new voucher for this channel

func (*Channels) NewVoucherResult added in v0.4.0

func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error

NewVoucherResult records a new voucher result for this channel

func (*Channels) PauseInitiator added in v0.4.0

func (c *Channels) PauseInitiator(chid datatransfer.ChannelID) error

PauseInitiator pauses the initator of this channel

func (*Channels) PauseResponder added in v0.4.0

func (c *Channels) PauseResponder(chid datatransfer.ChannelID) error

PauseResponder pauses the responder of this channel

func (*Channels) ReceiveDataError added in v1.5.0

func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error

ReceiveDataError indicates that the transport layer had an error receiving data from the remote peer

func (*Channels) RequestCancelled added in v1.7.3

func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error

RequestCancelled indicates that a transport layer request was cancelled by the request opener

func (*Channels) ResponderBeginsFinalization added in v0.4.0

func (c *Channels) ResponderBeginsFinalization(chid datatransfer.ChannelID) error

ResponderBeginsFinalization indicates a responder has finished processing but is awaiting confirmation from the initiator

func (*Channels) ResponderCompletes added in v0.4.0

func (c *Channels) ResponderCompletes(chid datatransfer.ChannelID) error

ResponderCompletes indicates an initator has finished receiving data from a responder

func (*Channels) Restart added in v0.9.0

func (c *Channels) Restart(chid datatransfer.ChannelID) error

Restart marks a data transfer as restarted

func (*Channels) ResumeInitiator added in v0.4.0

func (c *Channels) ResumeInitiator(chid datatransfer.ChannelID) error

ResumeInitiator resumes the initator of this channel

func (*Channels) ResumeResponder added in v0.4.0

func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error

ResumeResponder resumes the responder of this channel

func (*Channels) SendDataError added in v1.2.9

func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error

SendDataError indicates that the transport layer had an error trying to send data to the remote peer

func (*Channels) Start added in v0.9.0

func (c *Channels) Start(ctx context.Context) error

Start migrates the channel data store as needed

func (*Channels) TransferRequestQueued added in v1.7.0

func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error

type DecoderByTypeFunc added in v0.4.0

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type ErrNotFound added in v0.4.0

type ErrNotFound struct {
	ChannelID datatransfer.ChannelID
}

ErrNotFound is returned when a channel cannot be found with a given channel ID

func (*ErrNotFound) Error added in v1.2.7

func (e *ErrNotFound) Error() string

type Notifier added in v0.4.0

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL