datatransfer

package module
v1.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2022 License: Apache-2.0, MIT Imports: 13 Imported by: 104

README

go-data-transfer

CircleCI codecov

A go module to perform data transfers over ipfs/go-graphsync

Description

This module encapsulates protocols for exchanging piece data between storage clients and miners, both when consummating a storage deal and when retrieving the piece later.

Table of Contents

Usage

Requires go 1.13

Install the module in your package or app with go get "github.com/filecoin-project/go-data-transfer/datatransfer"

Initialize a data transfer module
  1. Set up imports. You need, minimally, the following imports:

    package mypackage
    
    import (
        gsimpl "github.com/ipfs/go-graphsync/impl"
        datatransfer "github.com/filecoin-project/go-data-transfer/impl"
        gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
        "github.com/libp2p/go-libp2p-core/host"
    )
    
    
  2. Provide or create a libp2p host.Host

  3. You will need a transport protocol. The current default transport is graphsync. go-graphsync GraphExchange

  4. Create a data transfer by building a transport interface and then initializing a new data transfer instance

    func NewGraphsyncDataTransfer(h host.Host, gs graphsync.GraphExchange) {
        tp := gstransport.NewTransport(h.ID(), gs)
        dt := impl.NewDataTransfer(h, tp)
    }
    
  5. If needed, build out your voucher struct and its validator.

    A push or pull request must include a voucher. The voucher's type must have been registered with the node receiving the request before it's sent, otherwise the request will be rejected.

    datatransfer.Voucher and datatransfer.Validator are the interfaces used for validation of graphsync datatransfer messages. Voucher types plus a Validator for them must be registered with the peer to whom requests will be sent.

Example Toy Voucher and Validator
type myVoucher struct {
	data string
}

func (v *myVoucher) ToBytes() ([]byte, error) {
	return []byte(v.data), nil
}

func (v *myVoucher) FromBytes(data []byte) error {
	v.data = string(data)
	return nil
}

func (v *myVoucher) Type() string {
	return "FakeDTType"
}

type myValidator struct {
	ctx                 context.Context
	ValidationsReceived chan receivedValidation
}

func (vl *myValidator) ValidatePush(
	sender peer.ID,
	voucher datatransfer.Voucher,
	baseCid cid.Cid,
	selector ipld.Node) error {
    
    v := voucher.(*myVoucher)
    if v.data == "" || v.data != "validpush" {
        return errors.New("invalid")
    }   

	return nil
}

func (vl *myValidator) ValidatePull(
	receiver peer.ID,
	voucher datatransfer.Voucher,
	baseCid cid.Cid,
	selector ipld.Node) error {

    v := voucher.(*myVoucher)
    if v.data == "" || v.data != "validpull" {
        return errors.New("invalid")
    }   

	return nil
}

Please see go-data-transfer/blob/master/types.go for more detail.

Register a validator

Before sending push or pull requests, you must register a datatransfer.Voucher by its reflect.Type and dataTransfer.RequestValidator for vouchers that must be sent with the request. Using the trivial examples above:

    func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) {
        tp := gstransport.NewTransport(h.ID(), gs)
        dt := impl.NewDataTransfer(h, tp)

        vouch := &myVoucher{}
        mv := &myValidator{} 
        dt.RegisterVoucherType(reflect.TypeOf(vouch), mv)
    }

For more detail, please see the unit tests.

Open a Push or Pull Request

For a push or pull request, provide a context, a datatransfer.Voucher, a host recipient peer.ID, a baseCID cid.CID and a selector ipld.Node. These calls return a datatransfer.ChannelID and any error:

    channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)
    // OR
    channelID, err := dtm.OpenPushDataChannel(ctx, recipient, voucher, baseCid, selector)

Subscribe to Events

The module allows the consumer to be notified when a graphsync Request is sent or a datatransfer push or pull request response is received:

    func ToySubscriberFunc (event Event, channelState ChannelState) {
        if event.Code == datatransfer.Error {
            // log error, flail about helplessly
            return
        }
        // 
        if channelState.Recipient() == our.PeerID && channelState.Received() > 0 {
            // log some stuff, update some state somewhere, send data to a channel, etc.
        }
    }

    dtm := SetupDataTransferManager(ctx, h, gs, baseCid, snode)
    unsubFunc := dtm.SubscribeToEvents(ToySubscriberFunc)

    // . . . later, when you don't need to know about events any more:
    unsubFunc()

Contributing

PRs are welcome! Please first read the design docs and look over the current code. PRs against master require approval of at least two maintainers. For the rest, please see our CONTRIBUTING guide.

License

This repository is dual-licensed under Apache 2.0 and MIT terms.

Copyright 2019. Protocol Labs, Inc.

Documentation

Index

Constants

View Source
const EmptyTypeIdentifier = TypeIdentifier("")

EmptyTypeIdentifier means there is no voucher present

View Source
const ErrChannelNotFound = errorType("channel not found")

ErrChannelNotFound means the channel this command was issued for does not exist

View Source
const ErrHandlerAlreadySet = errorType("already set event handler")

ErrHandlerAlreadySet means an event handler was already set for this instance of hooks

View Source
const ErrHandlerNotSet = errorType("event handler has not been set")

ErrHandlerNotSet means you cannot issue commands to this interface because the handler has not been set

View Source
const ErrPause = errorType("pause channel")

ErrPause is a special error that the DataReceived / DataSent hooks can use to pause the channel

View Source
const ErrRejected = errorType("response rejected")

ErrRejected indicates a request was not accepted

View Source
const ErrResume = errorType("resume channel")

ErrResume is a special error that the RequestReceived / ResponseReceived hooks can use to resume the channel

View Source
const ErrUnsupported = errorType("unsupported")

ErrUnsupported indicates an operation is not supported by the transport protocol

Variables

View Source
var Events = map[EventCode]string{
	Open:                        "Open",
	Accept:                      "Accept",
	Restart:                     "Restart",
	DataReceived:                "DataReceived",
	DataSent:                    "DataSent",
	Cancel:                      "Cancel",
	Error:                       "Error",
	CleanupComplete:             "CleanupComplete",
	NewVoucher:                  "NewVoucher",
	NewVoucherResult:            "NewVoucherResult",
	PauseInitiator:              "PauseInitiator",
	ResumeInitiator:             "ResumeInitiator",
	PauseResponder:              "PauseResponder",
	ResumeResponder:             "ResumeResponder",
	FinishTransfer:              "FinishTransfer",
	ResponderCompletes:          "ResponderCompletes",
	ResponderBeginsFinalization: "ResponderBeginsFinalization",
	BeginFinalizing:             "BeginFinalizing",
	Disconnected:                "Disconnected",
	Complete:                    "Complete",
	CompleteCleanupOnRestart:    "CompleteCleanupOnRestart",
	DataQueued:                  "DataQueued",
	DataQueuedProgress:          "DataQueuedProgress",
	DataSentProgress:            "DataSentProgress",
	DataReceivedProgress:        "DataReceivedProgress",
	RequestTimedOut:             "RequestTimedOut",
	SendDataError:               "SendDataError",
	ReceiveDataError:            "ReceiveDataError",
	TransferRequestQueued:       "TransferRequestQueued",
	RequestCancelled:            "RequestCancelled",
}

Events are human readable names for data transfer events

View Source
var (
	// ProtocolDataTransfer1_2 is the protocol identifier for the latest
	// version of data-transfer (supports do-not-send-first-blocks extension)
	ProtocolDataTransfer1_2 protocol.ID = "/fil/datatransfer/1.2.0"
)
View Source
var Statuses = map[Status]string{

	Requested:                           "Requested",
	Ongoing:                             "Ongoing",
	TransferFinished:                    "TransferFinished",
	ResponderCompleted:                  "ResponderCompleted",
	Finalizing:                          "Finalizing",
	Completing:                          "Completing",
	Completed:                           "Completed",
	Failing:                             "Failing",
	Failed:                              "Failed",
	Cancelling:                          "Cancelling",
	Cancelled:                           "Cancelled",
	InitiatorPaused:                     "InitiatorPaused",
	ResponderPaused:                     "ResponderPaused",
	BothPaused:                          "BothPaused",
	ResponderFinalizing:                 "ResponderFinalizing",
	ResponderFinalizingTransferFinished: "ResponderFinalizingTransferFinished",
	ChannelNotFoundError:                "ChannelNotFoundError",
}

Statuses are human readable names for data transfer states

Functions

This section is empty.

Types

type Channel

type Channel interface {
	// TransferID returns the transfer id for this channel
	TransferID() TransferID

	// BaseCID returns the CID that is at the root of this data transfer
	BaseCID() cid.Cid

	// Selector returns the IPLD selector for this data transfer (represented as
	// an IPLD node)
	Selector() ipld.Node

	// Voucher returns the voucher for this data transfer
	Voucher() Voucher

	// Sender returns the peer id for the node that is sending data
	Sender() peer.ID

	// Recipient returns the peer id for the node that is receiving data
	Recipient() peer.ID

	// TotalSize returns the total size for the data being transferred
	TotalSize() uint64

	// IsPull returns whether this is a pull request
	IsPull() bool

	// ChannelID returns the ChannelID for this request
	ChannelID() ChannelID

	// OtherPeer returns the counter party peer for this channel
	OtherPeer() peer.ID
}

Channel represents all the parameters for a single data transfer

type ChannelID

type ChannelID struct {
	Initiator peer.ID
	Responder peer.ID
	ID        TransferID
}

ChannelID is a unique identifier for a channel, distinct by both the other party's peer ID + the transfer ID

func (*ChannelID) MarshalCBOR added in v0.5.0

func (t *ChannelID) MarshalCBOR(w io.Writer) error

func (ChannelID) OtherParty added in v0.4.0

func (c ChannelID) OtherParty(thisPeer peer.ID) peer.ID

OtherParty returns the peer on the other side of the request, depending on whether this peer is the initiator or responder

func (ChannelID) String added in v0.4.0

func (c ChannelID) String() string

func (*ChannelID) UnmarshalCBOR added in v0.5.0

func (t *ChannelID) UnmarshalCBOR(r io.Reader) error

type ChannelStage added in v1.4.1

type ChannelStage struct {
	// Human-readable fields.
	// TODO: these _will_ need to be converted to canonical representations, so
	//  they are machine readable.
	Name        string
	Description string

	// Timestamps.
	// TODO: may be worth adding an exit timestamp. It _could_ be inferred from
	//  the start of the next stage, or from the timestamp of the last log line
	//  if this is a terminal stage. But that's non-determistic and it relies on
	//  assumptions.
	CreatedTime cbg.CborTime
	UpdatedTime cbg.CborTime

	// Logs contains a detailed timeline of events that occurred inside
	// this stage.
	Logs []*Log
}

ChannelStage traces the execution of a data transfer channel stage.

EXPERIMENTAL; subject to change.

func (*ChannelStage) MarshalCBOR added in v1.4.1

func (t *ChannelStage) MarshalCBOR(w io.Writer) error

func (*ChannelStage) UnmarshalCBOR added in v1.4.1

func (t *ChannelStage) UnmarshalCBOR(r io.Reader) error

type ChannelStages added in v1.4.1

type ChannelStages struct {
	// Stages contains an entry for every stage the channel has gone through.
	// Each stage then contains logs.
	Stages []*ChannelStage
}

ChannelStages captures a timeline of the progress of a data transfer channel, grouped by stages.

EXPERIMENTAL; subject to change.

func (*ChannelStages) AddLog added in v1.4.1

func (cs *ChannelStages) AddLog(stage, msg string)

AddLog adds a log to the specified stage, creating the stage if it doesn't exist yet.

EXPERIMENTAL; subject to change.

func (*ChannelStages) GetStage added in v1.4.1

func (cs *ChannelStages) GetStage(stage string) *ChannelStage

GetStage returns the ChannelStage object for a named stage, or nil if not found.

TODO: the input should be a strongly-typed enum instead of a free-form string. TODO: drop Get from GetStage to make this code more idiomatic. Return a

second ok boolean to make it even more idiomatic.

EXPERIMENTAL; subject to change.

func (*ChannelStages) MarshalCBOR added in v1.4.1

func (t *ChannelStages) MarshalCBOR(w io.Writer) error

func (*ChannelStages) UnmarshalCBOR added in v1.4.1

func (t *ChannelStages) UnmarshalCBOR(r io.Reader) error

type ChannelState

type ChannelState interface {
	Channel

	// SelfPeer returns the peer this channel belongs to
	SelfPeer() peer.ID

	// Status is the current status of this channel
	Status() Status

	// Sent returns the number of bytes sent
	Sent() uint64

	// Received returns the number of bytes received
	Received() uint64

	// Message offers additional information about the current status
	Message() string

	// Vouchers returns all vouchers sent on this channel
	Vouchers() []Voucher

	// VoucherResults are results of vouchers sent on the channel
	VoucherResults() []VoucherResult

	// LastVoucher returns the last voucher sent on the channel
	LastVoucher() Voucher

	// LastVoucherResult returns the last voucher result sent on the channel
	LastVoucherResult() VoucherResult

	// ReceivedCidsTotal returns the number of (non-unique) cids received so far
	// on the channel - note that a block can exist in more than one place in the DAG
	ReceivedCidsTotal() int64

	// QueuedCidsTotal returns the number of (non-unique) cids queued so far
	// on the channel - note that a block can exist in more than one place in the DAG
	QueuedCidsTotal() int64

	// SentCidsTotal returns the number of (non-unique) cids sent so far
	// on the channel - note that a block can exist in more than one place in the DAG
	SentCidsTotal() int64

	// Queued returns the number of bytes read from the node and queued for sending
	Queued() uint64

	// Stages returns the timeline of events this data transfer has gone through,
	// for observability purposes.
	//
	// It is unsafe for the caller to modify the return value, and changes
	// may not be persisted. It should be treated as immutable.
	Stages() *ChannelStages
}

ChannelState is channel parameters plus it's current state

type Event

type Event struct {
	Code      EventCode // What type of event it is
	Message   string    // Any clarifying information about the event
	Timestamp time.Time // when the event happened
}

Event is a struct containing information about a data transfer event

type EventCode

type EventCode int

EventCode is a name for an event that occurs on a data transfer channel

const (
	// Open is an event occurs when a channel is first opened
	Open EventCode = iota

	// Accept is an event that emits when the data transfer is first accepted
	Accept

	// Restart is an event that emits when the data transfer is restarted
	Restart

	// DataReceived is emitted when data is received on the channel from a remote peer
	DataReceived

	// DataSent is emitted when data is sent on the channel to the remote peer
	DataSent

	// Cancel indicates one side has cancelled the transfer
	Cancel

	// Error is an event that emits when an error occurs in a data transfer
	Error

	// CleanupComplete emits when a request is cleaned up
	CleanupComplete

	// NewVoucher means we have a new voucher on this channel
	NewVoucher

	// NewVoucherResult means we have a new voucher result on this channel
	NewVoucherResult

	// PauseInitiator emits when the data sender pauses transfer
	PauseInitiator

	// ResumeInitiator emits when the data sender resumes transfer
	ResumeInitiator

	// PauseResponder emits when the data receiver pauses transfer
	PauseResponder

	// ResumeResponder emits when the data receiver resumes transfer
	ResumeResponder

	// FinishTransfer emits when the initiator has completed sending/receiving data
	FinishTransfer

	// ResponderCompletes emits when the initiator receives a message that the responder is finished
	ResponderCompletes

	// ResponderBeginsFinalization emits when the initiator receives a message that the responder is finilizing
	ResponderBeginsFinalization

	// BeginFinalizing emits when the responder completes its operations but awaits a response from the
	// initiator
	BeginFinalizing

	// Disconnected emits when we are not able to connect to the other party
	Disconnected

	// Complete is emitted when a data transfer is complete
	Complete

	// CompleteCleanupOnRestart is emitted when a data transfer channel is restarted to signal
	// that channels that were cleaning up should finish cleanup
	CompleteCleanupOnRestart

	// DataQueued is emitted when data is read and queued for sending to the remote peer
	DataQueued

	// DataQueuedProgress is emitted when a block is queued for sending to the
	// remote peer. It is not emitted when the block is resent.
	// It is used to measure progress of how much of the total data has been
	// queued.
	DataQueuedProgress

	// DataSentProgress is emitted when a block is sent to the remote peer.
	// It is not emitted when the block is resent.
	// It is used to measure progress of how much of the total data has
	// been sent.
	DataSentProgress

	// DataReceivedProgress is emitted the first time a block is received from
	// the remote peer. It is used to measure progress of how much of the total
	// data has been received.
	DataReceivedProgress

	// Deprecated in favour of RequestCancelled
	RequestTimedOut

	// SendDataError indicates that the transport layer had an error trying
	// to send data to the remote peer
	SendDataError

	// ReceiveDataError indicates that the transport layer had an error
	// receiving data from the remote peer
	ReceiveDataError

	// TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer
	TransferRequestQueued

	// RequestCancelled indicates that a transport layer request was cancelled by the request opener
	RequestCancelled

	// Opened is fired when a request for data is sent from this node to a peer
	Opened
)

type EventsHandler added in v0.5.1

type EventsHandler interface {
	// OnChannelOpened is called when we send a request for data to the other
	// peer on the given channel ID
	// return values are:
	// - error = ignore incoming data for this channel
	OnChannelOpened(chid ChannelID) error
	// OnResponseReceived is called when we receive a response to a request
	// - nil = continue receiving data
	// - error = cancel this request
	OnResponseReceived(chid ChannelID, msg Response) error
	// OnDataReceive is called when we receive data for the given channel ID
	// return values are:
	// - nil = proceed with sending data
	// - error = cancel this request
	// - err == ErrPause - pause this request
	OnDataReceived(chid ChannelID, link ipld.Link, size uint64, index int64, unique bool) error

	// OnDataQueued is called when data is queued for sending for the given channel ID
	// return values are:
	// message = data transfer message along with data
	// err = error
	// - nil = proceed with sending data
	// - error = cancel this request
	// - err == ErrPause - pause this request
	OnDataQueued(chid ChannelID, link ipld.Link, size uint64, index int64, unique bool) (Message, error)

	// OnDataSent is called when we send data for the given channel ID
	OnDataSent(chid ChannelID, link ipld.Link, size uint64, index int64, unique bool) error

	// OnTransferQueued is called when a new data transfer request is queued in the transport layer.
	OnTransferQueued(chid ChannelID)

	// OnRequestReceived is called when we receive a new request to send data
	// for the given channel ID
	// return values are:
	// message = data transfer message along with reply
	// err = error
	// - nil = proceed with sending data
	// - error = cancel this request
	// - err == ErrPause - pause this request (only for new requests)
	// - err == ErrResume - resume this request (only for update requests)
	OnRequestReceived(chid ChannelID, msg Request) (Response, error)
	// OnChannelCompleted is called when we finish transferring data for the given channel ID
	// Error returns are logged but otherwise have no effect
	OnChannelCompleted(chid ChannelID, err error) error

	// OnRequestCancelled is called when a request we opened (with the given channel Id) to
	// receive data is cancelled by us.
	// Error returns are logged but otherwise have no effect
	OnRequestCancelled(chid ChannelID, err error) error

	// OnRequestDisconnected is called when a network error occurs trying to send a request
	OnRequestDisconnected(chid ChannelID, err error) error

	// OnSendDataError is called when a network error occurs sending data
	// at the transport layer
	OnSendDataError(chid ChannelID, err error) error

	// OnReceiveDataError is called when a network error occurs receiving data
	// at the transport layer
	OnReceiveDataError(chid ChannelID, err error) error

	// OnContextAugment allows the transport to attach data transfer tracing information
	// to its local context, in order to create a hierarchical trace
	OnContextAugment(chid ChannelID) func(context.Context) context.Context
}

EventsHandler are semantic data transfer events that happen as a result of graphsync hooks

type Log added in v1.4.1

type Log struct {
	// Log is a human readable message.
	//
	// TODO: this _may_ need to be converted to a canonical data model so it
	//  is machine-readable.
	Log string

	UpdatedTime cbg.CborTime
}

Log represents a point-in-time event that occurred inside a channel stage.

EXPERIMENTAL; subject to change.

func (*Log) MarshalCBOR added in v1.4.1

func (t *Log) MarshalCBOR(w io.Writer) error

func (*Log) UnmarshalCBOR added in v1.4.1

func (t *Log) UnmarshalCBOR(r io.Reader) error

type Manager

type Manager interface {

	// Start initializes data transfer processing
	Start(ctx context.Context) error

	// OnReady registers a listener for when the data transfer comes on line
	OnReady(ReadyFunc)

	// Stop terminates all data transfers and ends processing
	Stop(ctx context.Context) error

	// RegisterVoucherType registers a validator for the given voucher type
	// will error if voucher type does not implement voucher
	// or if there is a voucher type registered with an identical identifier
	RegisterVoucherType(voucherType Voucher, validator RequestValidator) error

	// RegisterRevalidator registers a revalidator for the given voucher type
	// Note: this is the voucher type used to revalidate. It can share a name
	// with the initial validator type and CAN be the same type, or a different type.
	// The revalidator can simply be the sampe as the original request validator,
	// or a different validator that satisfies the revalidator interface.
	RegisterRevalidator(voucherType Voucher, revalidator Revalidator) error

	// RegisterVoucherResultType allows deserialization of a voucher result,
	// so that a listener can read the metadata
	RegisterVoucherResultType(resultType VoucherResult) error

	// RegisterTransportConfigurer registers the given transport configurer to be run on requests with the given voucher
	// type
	RegisterTransportConfigurer(voucherType Voucher, configurer TransportConfigurer) error

	// open a data transfer that will send data to the recipient peer and
	// transfer parts of the piece that match the selector
	OpenPushDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error)

	// open a data transfer that will request data from the sending peer and
	// transfer parts of the piece that match the selector
	OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error)

	// send an intermediate voucher as needed when the receiver sends a request for revalidation
	SendVoucher(ctx context.Context, chid ChannelID, voucher Voucher) error

	// close an open channel (effectively a cancel)
	CloseDataTransferChannel(ctx context.Context, chid ChannelID) error

	// pause a data transfer channel (only allowed if transport supports it)
	PauseDataTransferChannel(ctx context.Context, chid ChannelID) error

	// resume a data transfer channel (only allowed if transport supports it)
	ResumeDataTransferChannel(ctx context.Context, chid ChannelID) error

	// get status of a transfer
	TransferChannelStatus(ctx context.Context, x ChannelID) Status

	// get channel state
	ChannelState(ctx context.Context, chid ChannelID) (ChannelState, error)

	// get notified when certain types of events happen
	SubscribeToEvents(subscriber Subscriber) Unsubscribe

	// get all in progress transfers
	InProgressChannels(ctx context.Context) (map[ChannelID]ChannelState, error)

	// RestartDataTransferChannel restarts an existing data transfer channel
	RestartDataTransferChannel(ctx context.Context, chid ChannelID) error
}

Manager is the core interface presented by all implementations of of the data transfer sub system

type Message added in v0.5.1

type Message interface {
	IsRequest() bool
	IsRestart() bool
	IsNew() bool
	IsUpdate() bool
	IsPaused() bool
	IsCancel() bool
	TransferID() TransferID
	ToNet(w io.Writer) error
	ToIPLD() (datamodel.Node, error)
	MessageForProtocol(targetProtocol protocol.ID) (newMsg Message, err error)
}

Message is a message for the data transfer protocol (either request or response) that can serialize to a protobuf

type PauseableTransport added in v0.5.1

type PauseableTransport interface {
	Transport
	// PauseChannel paused the given channel ID
	PauseChannel(ctx context.Context,
		chid ChannelID,
	) error
	// ResumeChannel resumes the given channel
	ResumeChannel(ctx context.Context,
		msg Message,
		chid ChannelID,
	) error
}

PauseableTransport is a transport that can also pause and resume channels

type ReadyFunc added in v0.9.0

type ReadyFunc func(error)

ReadyFunc is function that gets called once when the data transfer module is ready

type Registerable added in v0.3.0

type Registerable interface {
	encoding.Encodable
	// Type is a unique string identifier for this voucher type
	Type() TypeIdentifier
}

Registerable is a type of object in a registry. It must be encodable and must have a single method that uniquely identifies its type

type Request added in v0.5.1

type Request interface {
	Message
	IsPull() bool
	IsVoucher() bool
	VoucherType() TypeIdentifier
	Voucher(decoder encoding.Decoder) (encoding.Encodable, error)
	BaseCid() cid.Cid
	Selector() (ipld.Node, error)
	IsRestartExistingChannelRequest() bool
	RestartChannelId() (ChannelID, error)
}

Request is a response message for the data transfer protocol

type RequestValidator

type RequestValidator interface {
	// ValidatePush validates a push request received from the peer that will send data
	ValidatePush(
		isRestart bool,
		chid ChannelID,
		sender peer.ID,
		voucher Voucher,
		baseCid cid.Cid,
		selector ipld.Node) (VoucherResult, error)
	// ValidatePull validates a pull request received from the peer that will receive data
	ValidatePull(
		isRestart bool,
		chid ChannelID,
		receiver peer.ID,
		voucher Voucher,
		baseCid cid.Cid,
		selector ipld.Node) (VoucherResult, error)
}

RequestValidator is an interface implemented by the client of the data transfer module to validate requests

type Response added in v0.5.1

type Response interface {
	Message
	IsVoucherResult() bool
	IsComplete() bool
	Accepted() bool
	VoucherResultType() TypeIdentifier
	VoucherResult(decoder encoding.Decoder) (encoding.Encodable, error)
	EmptyVoucherResult() bool
}

Response is a response message for the data transfer protocol

type Revalidator added in v0.4.0

type Revalidator interface {
	// Revalidate revalidates a request with a new voucher
	Revalidate(channelID ChannelID, voucher Voucher) (VoucherResult, error)
	// OnPullDataSent is called on the responder side when more bytes are sent
	// for a given pull request. The first value indicates whether the request was
	// recognized by this revalidator and should be considered 'handled'. If true,
	// the remaining two values are interpreted. If 'false' the request is passed on
	// to the next revalidators.
	// It should return a VoucherResult + ErrPause to
	// request revalidation or nil to continue uninterrupted,
	// other errors will terminate the request.
	OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (bool, VoucherResult, error)
	// OnPushDataReceived is called on the responder side when more bytes are received
	// for a given push request. The first value indicates whether the request was
	// recognized by this revalidator and should be considered 'handled'. If true,
	// the remaining two values are interpreted. If 'false' the request is passed on
	// to the next revalidators. It should return a VoucherResult + ErrPause to
	// request revalidation or nil to continue uninterrupted,
	// other errors will terminate the request
	OnPushDataReceived(chid ChannelID, additionalBytesReceived uint64) (bool, VoucherResult, error)
	// OnComplete is called to make a final request for revalidation -- often for the
	// purpose of settlement. The first value indicates whether the request was
	// recognized by this revalidator and should be considered 'handled'. If true,
	// the remaining two values are interpreted. If 'false' the request is passed on
	// to the next revalidators.
	// if VoucherResult is non nil, the request will enter a settlement phase awaiting
	// a final update
	OnComplete(chid ChannelID) (bool, VoucherResult, error)
}

Revalidator is a request validator revalidates in progress requests by requesting request additional vouchers, and resuming when it receives them

type Status

type Status uint64

Status is the status of transfer for a given channel

const (
	// Requested means a data transfer was requested by has not yet been approved
	Requested Status = iota

	// Ongoing means the data transfer is in progress
	Ongoing

	// TransferFinished indicates the initiator is done sending/receiving
	// data but is awaiting confirmation from the responder
	TransferFinished

	// ResponderCompleted indicates the initiator received a message from the
	// responder that it's completed
	ResponderCompleted

	// Finalizing means the responder is awaiting a final message from the initator to
	// consider the transfer done
	Finalizing

	// Completing just means we have some final cleanup for a completed request
	Completing

	// Completed means the data transfer is completed successfully
	Completed

	// Failing just means we have some final cleanup for a failed request
	Failing

	// Failed means the data transfer failed
	Failed

	// Cancelling just means we have some final cleanup for a cancelled request
	Cancelling

	// Cancelled means the data transfer ended prematurely
	Cancelled

	// InitiatorPaused means the data sender has paused the channel (only the sender can unpause this)
	InitiatorPaused

	// ResponderPaused means the data receiver has paused the channel (only the receiver can unpause this)
	ResponderPaused

	// BothPaused means both sender and receiver have paused the channel seperately (both must unpause)
	BothPaused

	// ResponderFinalizing is a unique state where the responder is awaiting a final voucher
	ResponderFinalizing

	// ResponderFinalizingTransferFinished is a unique state where the responder is awaiting a final voucher
	// and we have received all data
	ResponderFinalizingTransferFinished

	// ChannelNotFoundError means the searched for data transfer does not exist
	ChannelNotFoundError
)

type Subscriber

type Subscriber func(event Event, channelState ChannelState)

Subscriber is a callback that is called when events are emitted

type TransferID

type TransferID uint64

TransferID is an identifier for a data transfer, shared between request/responder and unique to the requester

type Transport added in v0.5.1

type Transport interface {
	// OpenChannel initiates an outgoing request for the other peer to send data
	// to us on this channel
	// Note: from a data transfer symantic standpoint, it doesn't matter if the
	// request is push or pull -- OpenChannel is called by the party that is
	// intending to receive data
	OpenChannel(
		ctx context.Context,
		dataSender peer.ID,
		channelID ChannelID,
		root ipld.Link,
		stor ipld.Node,
		channel ChannelState,
		msg Message,
	) error

	// CloseChannel closes the given channel
	CloseChannel(ctx context.Context, chid ChannelID) error
	// SetEventHandler sets the handler for events on channels
	SetEventHandler(events EventsHandler) error
	// CleanupChannel is called on the otherside of a cancel - removes any associated
	// data for the channel
	CleanupChannel(chid ChannelID)
	Shutdown(ctx context.Context) error
}

Transport defines the interface for a transport layer for data transfer. Where the data transfer manager will coordinate setting up push and pull requests, validation, etc, the transport layer is responsible for moving data back and forth, and may be medium specific. For example, some transports may have the ability to pause and resume requests, while others may not. Some may support individual data events, while others may only support message events. Some transport layers may opt to use the actual data transfer network protocols directly while others may be able to encode messages in their own data protocol.

Transport is the minimum interface that must be satisfied to serve as a datatransfer transport layer. Transports must be able to open (open is always called by the receiving peer) and close channels, and set at an event handler

type TransportConfigurer added in v0.5.1

type TransportConfigurer func(chid ChannelID, voucher Voucher, transport Transport)

TransportConfigurer provides a mechanism to provide transport specific configuration for a given voucher type

type TypeIdentifier added in v0.3.0

type TypeIdentifier string

TypeIdentifier is a unique string identifier for a type of encodable object in a registry

type Unsubscribe

type Unsubscribe func()

Unsubscribe is a function that gets called to unsubscribe from data transfer events

type Voucher

type Voucher Registerable

Voucher is used to validate a data transfer request against the underlying storage or retrieval deal that precipitated it. The only requirement is a voucher can read and write from bytes, and has a string identifier type

type VoucherResult added in v0.4.0

type VoucherResult Registerable

VoucherResult is used to provide option additional information about a voucher being rejected or accepted

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL