Documentation ¶
Index ¶
- Variables
- func IsChannelCleaningUp(st datatransfer.Status) bool
- func IsChannelTerminated(st datatransfer.Status) bool
- func NewErrNotFound(chid datatransfer.ChannelID) error
- type ChannelEnvironment
- type Channels
- func (c *Channels) Accept(chid datatransfer.ChannelID) error
- func (c *Channels) BeginFinalizing(chid datatransfer.ChannelID) error
- func (c *Channels) Cancel(chid datatransfer.ChannelID) error
- func (c *Channels) ChannelOpened(chid datatransfer.ChannelID) error
- func (c *Channels) Complete(chid datatransfer.ChannelID) error
- func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error
- func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, ...) (datatransfer.ChannelID, error)
- func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)
- func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)
- func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)
- func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error
- func (c *Channels) Error(chid datatransfer.ChannelID, err error) error
- func (c *Channels) FinishTransfer(chid datatransfer.ChannelID) error
- func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (datatransfer.ChannelState, error)
- func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error)
- func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.ChannelState, error)
- func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error
- func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error
- func (c *Channels) PauseInitiator(chid datatransfer.ChannelID) error
- func (c *Channels) PauseResponder(chid datatransfer.ChannelID) error
- func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error
- func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error
- func (c *Channels) ResponderBeginsFinalization(chid datatransfer.ChannelID) error
- func (c *Channels) ResponderCompletes(chid datatransfer.ChannelID) error
- func (c *Channels) Restart(chid datatransfer.ChannelID) error
- func (c *Channels) ResumeInitiator(chid datatransfer.ChannelID) error
- func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error
- func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error
- func (c *Channels) Start(ctx context.Context) error
- func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error
- type DecoderByTypeFunc
- type ErrNotFound
- type Notifier
Constants ¶
This section is empty.
Variables ¶
var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.TransferRequestQueued).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.Message = "" chst.AddLog("") return nil }), fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.Message = "" chst.AddLog("") return nil }), fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.Opened).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.Message = "" chst.AddLog("") return nil }), fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange(). Action(func(chst *internal.ChannelState, rcvdBlocksTotal int64) error { if rcvdBlocksTotal > chst.ReceivedBlocksTotal { chst.ReceivedBlocksTotal = rcvdBlocksTotal } chst.AddLog("") return nil }), fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange(). Action(func(chst *internal.ChannelState, delta uint64) error { chst.Received += delta chst.AddLog("received data") return nil }), fsm.Event(datatransfer.DataSent). FromMany(transferringStates...).ToNoChange(). From(datatransfer.TransferFinished).ToNoChange(). Action(func(chst *internal.ChannelState, sentBlocksTotal int64) error { if sentBlocksTotal > chst.SentBlocksTotal { chst.SentBlocksTotal = sentBlocksTotal } chst.AddLog("") return nil }), fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange(). Action(func(chst *internal.ChannelState, delta uint64) error { chst.Sent += delta chst.AddLog("sending data") return nil }), fsm.Event(datatransfer.DataQueued). FromMany(transferringStates...).ToNoChange(). From(datatransfer.TransferFinished).ToNoChange(). Action(func(chst *internal.ChannelState, queuedBlocksTotal int64) error { if queuedBlocksTotal > chst.QueuedBlocksTotal { chst.QueuedBlocksTotal = queuedBlocksTotal } chst.AddLog("") return nil }), fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange(). Action(func(chst *internal.ChannelState, delta uint64) error { chst.Queued += delta chst.AddLog("") return nil }), fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer disconnected: %s", chst.Message) return nil }), fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer send error: %s", chst.Message) return nil }), fsm.Event(datatransfer.ReceiveDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer receive error: %s", chst.Message) return nil }), fsm.Event(datatransfer.RequestCancelled).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer request cancelled: %s", chst.Message) return nil }), fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer erred: %s", chst.Message) return nil }), fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange(). Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error { chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}}) chst.AddLog("got new voucher") return nil }), fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange(). Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error { chst.VoucherResults = append(chst.VoucherResults, internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}}) chst.AddLog("got new voucher result") return nil }), fsm.Event(datatransfer.PauseInitiator). FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused). From(datatransfer.ResponderPaused).To(datatransfer.BothPaused). FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.PauseResponder). FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused). From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused). FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.ResumeInitiator). From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing). From(datatransfer.BothPaused).To(datatransfer.ResponderPaused). FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.ResumeResponder). From(datatransfer.ResponderPaused).To(datatransfer.Ongoing). From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused). From(datatransfer.Finalizing).To(datatransfer.Completing). FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.FinishTransfer). FromAny().To(datatransfer.TransferFinished). FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord(). From(datatransfer.ResponderCompleted).To(datatransfer.Completing). From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished). From(datatransfer.Requested).To(datatransfer.Completing). Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.ResponderBeginsFinalization). FromAny().To(datatransfer.ResponderFinalizing). FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord(). From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.ResponderCompletes). FromAny().To(datatransfer.ResponderCompleted). FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord(). From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing). From(datatransfer.TransferFinished).To(datatransfer.Completing). From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted). From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.CleanupComplete). From(datatransfer.Cancelling).To(datatransfer.Cancelled). From(datatransfer.Failing).To(datatransfer.Failed). From(datatransfer.Completing).To(datatransfer.Completed).Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error { chst.AddLog("") return nil }), }
ChannelEvents describe the events taht can
var ChannelFinalityStates = []fsm.StateKey{ datatransfer.Cancelled, datatransfer.Completed, datatransfer.Failed, }
ChannelFinalityStates are the final states for a channel
var ChannelStateEntryFuncs = fsm.StateEntryFuncs{ datatransfer.Cancelling: cleanupConnection, datatransfer.Failing: cleanupConnection, datatransfer.Completing: cleanupConnection, }
ChannelStateEntryFuncs are handlers called as we enter different states (currently unused for this fsm)
var CleanupStates = []fsm.StateKey{ datatransfer.Cancelling, datatransfer.Completing, datatransfer.Failing, }
CleanupStates are the penultimate states for a channel
var EmptyChannelState = channelState{}
EmptyChannelState is the zero value for channel state, meaning not present
var ErrWrongType = errors.New("Cannot change type of implementation specific data after setting it")
ErrWrongType is returned when a caller attempts to change the type of implementation data after setting it
Functions ¶
func IsChannelCleaningUp ¶ added in v0.9.0
func IsChannelCleaningUp(st datatransfer.Status) bool
IsChannelCleaningUp returns true if channel was being cleaned up and finished
func IsChannelTerminated ¶ added in v0.9.0
func IsChannelTerminated(st datatransfer.Status) bool
IsChannelTerminated returns true if the channel is in a finality state
func NewErrNotFound ¶ added in v1.2.7
func NewErrNotFound(chid datatransfer.ChannelID) error
Types ¶
type ChannelEnvironment ¶ added in v0.4.0
type ChannelEnvironment interface { Protect(id peer.ID, tag string) Unprotect(id peer.ID, tag string) bool ID() peer.ID CleanupChannel(chid datatransfer.ChannelID) }
ChannelEnvironment -- just a proxy for DTNetwork for now
type Channels ¶
type Channels struct {
// contains filtered or unexported fields
}
Channels is a thread safe list of channels
func New ¶
func New(ds datastore.Batching, notifier Notifier, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, env ChannelEnvironment, selfPeer peer.ID) (*Channels, error)
New returns a new thread safe list of channels
func (*Channels) Accept ¶ added in v0.4.0
func (c *Channels) Accept(chid datatransfer.ChannelID) error
Accept marks a data transfer as accepted
func (*Channels) BeginFinalizing ¶ added in v0.4.0
func (c *Channels) BeginFinalizing(chid datatransfer.ChannelID) error
BeginFinalizing indicates a responder has finished processing but is awaiting confirmation from the initiator
func (*Channels) Cancel ¶ added in v0.4.0
func (c *Channels) Cancel(chid datatransfer.ChannelID) error
Cancel indicates a channel was cancelled prematurely
func (*Channels) ChannelOpened ¶ added in v1.11.4
func (c *Channels) ChannelOpened(chid datatransfer.ChannelID) error
func (*Channels) Complete ¶ added in v0.4.0
func (c *Channels) Complete(chid datatransfer.ChannelID) error
Complete indicates responder has completed sending/receiving data
func (*Channels) CompleteCleanupOnRestart ¶ added in v0.9.0
func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error
func (*Channels) CreateNew ¶
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error)
CreateNew creates a new channel id and channel state and saves to channels. returns error if the channel exists already.
func (*Channels) DataQueued ¶ added in v1.0.0
func (*Channels) DataReceived ¶ added in v0.9.0
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error)
Returns true if this is the first time the block has been received
func (*Channels) Disconnected ¶ added in v0.9.0
func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error
Disconnected indicates that the connection went down and it was not possible to restart it
func (*Channels) Error ¶ added in v0.4.0
func (c *Channels) Error(chid datatransfer.ChannelID, err error) error
Error indicates something that went wrong on a channel
func (*Channels) FinishTransfer ¶ added in v0.4.0
func (c *Channels) FinishTransfer(chid datatransfer.ChannelID) error
FinishTransfer an initiator has finished sending/receiving data
func (*Channels) GetByID ¶ added in v0.4.0
func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (datatransfer.ChannelState, error)
GetByID searches for a channel in the slice of channels with id `chid`. Returns datatransfer.EmptyChannelState if there is no channel with that id
func (*Channels) HasChannel ¶ added in v0.4.0
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error)
HasChannel returns true if the given channel id is being tracked
func (*Channels) InProgress ¶
func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.ChannelState, error)
InProgress returns a list of in progress channels
func (*Channels) NewVoucher ¶ added in v0.4.0
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error
NewVoucher records a new voucher for this channel
func (*Channels) NewVoucherResult ¶ added in v0.4.0
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error
NewVoucherResult records a new voucher result for this channel
func (*Channels) PauseInitiator ¶ added in v0.4.0
func (c *Channels) PauseInitiator(chid datatransfer.ChannelID) error
PauseInitiator pauses the initator of this channel
func (*Channels) PauseResponder ¶ added in v0.4.0
func (c *Channels) PauseResponder(chid datatransfer.ChannelID) error
PauseResponder pauses the responder of this channel
func (*Channels) ReceiveDataError ¶ added in v1.5.0
func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error
ReceiveDataError indicates that the transport layer had an error receiving data from the remote peer
func (*Channels) RequestCancelled ¶ added in v1.7.3
func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error
RequestCancelled indicates that a transport layer request was cancelled by the request opener
func (*Channels) ResponderBeginsFinalization ¶ added in v0.4.0
func (c *Channels) ResponderBeginsFinalization(chid datatransfer.ChannelID) error
ResponderBeginsFinalization indicates a responder has finished processing but is awaiting confirmation from the initiator
func (*Channels) ResponderCompletes ¶ added in v0.4.0
func (c *Channels) ResponderCompletes(chid datatransfer.ChannelID) error
ResponderCompletes indicates an initator has finished receiving data from a responder
func (*Channels) Restart ¶ added in v0.9.0
func (c *Channels) Restart(chid datatransfer.ChannelID) error
Restart marks a data transfer as restarted
func (*Channels) ResumeInitiator ¶ added in v0.4.0
func (c *Channels) ResumeInitiator(chid datatransfer.ChannelID) error
ResumeInitiator resumes the initator of this channel
func (*Channels) ResumeResponder ¶ added in v0.4.0
func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error
ResumeResponder resumes the responder of this channel
func (*Channels) SendDataError ¶ added in v1.2.9
func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error
SendDataError indicates that the transport layer had an error trying to send data to the remote peer
func (*Channels) TransferRequestQueued ¶ added in v1.7.0
func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error
type DecoderByTypeFunc ¶ added in v0.4.0
type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)
type ErrNotFound ¶ added in v0.4.0
type ErrNotFound struct {
ChannelID datatransfer.ChannelID
}
ErrNotFound is returned when a channel cannot be found with a given channel ID
func (*ErrNotFound) Error ¶ added in v1.2.7
func (e *ErrNotFound) Error() string
type Notifier ¶ added in v0.4.0
type Notifier func(datatransfer.Event, datatransfer.ChannelState)