transfer

package
v0.0.0-...-09a75af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2019 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultBlockLength int = 2048

default block length

View Source
const PACKET_CHANNEL_SIZE = 100
View Source
const PACKET_CONTENT_LEN = 500

Variables

View Source
var DELTA_BUF_SIZE = 10
View Source
var FILE_INFO_BUF_SIZE = 10

TODO: make these args?

View Source
var SIGNATURE_BUF_SIZE = 10

Functions

func Daemon

func Daemon(addr string)

func Debug

func Debug(msg string)

func DecodePackets

func DecodePackets(manager Manager)

func Info

func Info(msg string)

func ProcessDeltas

func ProcessDeltas(opts *Options, manager Manager)

func ProcessPatches

func ProcessPatches(opts *Options, manager Manager)

func ProcessSignatures

func ProcessSignatures(opts *Options, manager Manager)

func Signature

func Signature(data []byte) (hash.Hash, error)

func TCPDestinationLoop

func TCPDestinationLoop(conn net.Conn, opts *Options, manager *DestinationManager)

func TCPSourceLoop

func TCPSourceLoop(conn net.Conn, opts *Options, manager *SourceManager)

The order of operations in finishing a transfer is very specific to which side is the source and which is the destination. The destination first calls PatchDone on it's manager, which then sends PatchDone as part of the DestinationTransferStatus. The source then does the normal ReceiveTransferStatus and then calls it's own PatchDone, which sets manager.done to True. At this point it's the responsibility of the TCPSourceLoop to send RequestDone and read another RequestDone before terminating.

func UDPReceiver

func UDPReceiver(host string, port int, opts *Options, manager Manager)

func UDPSender

func UDPSender(host string, port int, opts *Options, manager Manager)

func Walk

func Walk(opts *Options, manager Manager)

Types

type Checksum

type Checksum struct {
	TransferFile FileInfo
	SumLen       int
	Sum          hash.Hash
	Len          int
	Offset       int64
	EOF          bool
	Done         bool
}

type CommArray

type CommArray struct {
	FileInfoIn   chan FileInfo
	FileInfoOut  chan FileInfo
	SignatureIn  chan Checksum
	SignatureOut chan Checksum
	DeltaIn      chan Delta
	DeltaOut     chan Delta
	ErrorChan    chan error
	DoneChan     chan bool
	Stats        *TransferStats
}

func MakeCommArray

func MakeCommArray() *CommArray

func (CommArray) FinishDelta

func (comm CommArray) FinishDelta()

func (CommArray) FinishFileIn

func (comm CommArray) FinishFileIn()

func (CommArray) FinishSignature

func (comm CommArray) FinishSignature()

func (CommArray) LocalCommunication

func (comms CommArray) LocalCommunication()

type Delta

type Delta struct {
	Path    string
	Len     int
	Content []byte
	Offset  int64
	EOF     bool
	NoOp    bool
	Done    bool
}

Delta can be applied to the basis file to produce the desired result file

type DestinationManager

type DestinationManager struct {
	// contains filtered or unexported fields
}

func NewDestinationManager

func NewDestinationManager() *DestinationManager

func (*DestinationManager) DeltaChannel

func (manager *DestinationManager) DeltaChannel() chan Delta

func (*DestinationManager) DeltaDone

func (manager *DestinationManager) DeltaDone()

func (*DestinationManager) Done

func (manager *DestinationManager) Done() bool

func (*DestinationManager) Error

func (manager *DestinationManager) Error() error

func (*DestinationManager) FileInfoChannel

func (manager *DestinationManager) FileInfoChannel() chan FileInfo

func (*DestinationManager) FileInfoDone

func (manager *DestinationManager) FileInfoDone()

func (*DestinationManager) NetDone

func (manager *DestinationManager) NetDone() bool

func (*DestinationManager) Packeter

func (manager *DestinationManager) Packeter() *Packeter

func (*DestinationManager) PatchDone

func (manager *DestinationManager) PatchDone()

func (*DestinationManager) QueueDelta

func (manager *DestinationManager) QueueDelta(delta Delta)

func (*DestinationManager) QueueFileInfo

func (manager *DestinationManager) QueueFileInfo(fi FileInfo)

func (*DestinationManager) QueueSignature

func (manager *DestinationManager) QueueSignature(sig Checksum)

func (*DestinationManager) ReceiveStatusUpdate

func (manager *DestinationManager) ReceiveStatusUpdate(status SourceTransferStatus) DestinationTransferStatus

ReceiveStatusUpdate is called by the TCPer. It's the DestinationManager's responsibility to call the packeter's "ReceivePacketerStatusUpdate" function as well, because the packeter may need to resend some packets, or delete some sent packets.

func (*DestinationManager) ReportError

func (manager *DestinationManager) ReportError(err error)

func (*DestinationManager) SignatureChannel

func (manager *DestinationManager) SignatureChannel() chan Checksum

func (*DestinationManager) SignatureDone

func (manager *DestinationManager) SignatureDone()

func (*DestinationManager) Stats

func (manager *DestinationManager) Stats() *TransferStats

func (*DestinationManager) TCPDone

func (manager *DestinationManager) TCPDone()

type DestinationTransferStatus

type DestinationTransferStatus struct {
	LastSignaturePacket uint64
	PatchDone           bool

	DestinationPacketerStatus PacketerStatus

	Failed string
}

type Direction

type Direction uint8

Direction - a Request is either for a pull or a push

const Incoming Direction = 1

Incoming means the requester wants to read data

const Local Direction = 0

Local means requester will read and write data

const Outgoing Direction = 2

Outgoing means the requester wants to write data

type FileInfo

type FileInfo struct {
	Mode os.FileMode
	Size int64

	ModTime         time.Time
	Target          string
	SourcePath      string
	DestinationPath string
}

type LocalManager

type LocalManager struct {
	// contains filtered or unexported fields
}

func MakeLocalManager

func MakeLocalManager() *LocalManager

func (*LocalManager) DeltaChannel

func (manager *LocalManager) DeltaChannel() chan Delta

func (*LocalManager) DeltaDone

func (manager *LocalManager) DeltaDone()

func (*LocalManager) Done

func (manager *LocalManager) Done() bool

func (*LocalManager) Error

func (manager *LocalManager) Error() error

func (*LocalManager) FileInfoChannel

func (manager *LocalManager) FileInfoChannel() chan FileInfo

func (*LocalManager) FileInfoDone

func (manager *LocalManager) FileInfoDone()

func (*LocalManager) NetDone

func (manager *LocalManager) NetDone() bool

func (*LocalManager) Packeter

func (manager *LocalManager) Packeter() *Packeter

func (*LocalManager) PatchDone

func (manager *LocalManager) PatchDone()

func (*LocalManager) QueueDelta

func (manager *LocalManager) QueueDelta(delta Delta)

func (*LocalManager) QueueFileInfo

func (manager *LocalManager) QueueFileInfo(fi FileInfo)

func (*LocalManager) QueueSignature

func (manager *LocalManager) QueueSignature(sig Checksum)

func (*LocalManager) ReportError

func (manager *LocalManager) ReportError(err error)

func (*LocalManager) SignatureChannel

func (manager *LocalManager) SignatureChannel() chan Checksum

func (*LocalManager) SignatureDone

func (manager *LocalManager) SignatureDone()

func (*LocalManager) Stats

func (manager *LocalManager) Stats() *TransferStats

func (*LocalManager) TCPDone

func (manager *LocalManager) TCPDone()

type Manager

type Manager interface {
	// QueueFileInfo will queue a FileInfo that will be
	// sent to the signature processor
	QueueFileInfo(fi FileInfo)
	// FileInfoDone should be called when there are no more
	// FileInfos to be generated
	FileInfoDone()
	// FileInfoChannel returns a channel that should be used
	// by the signature processor to read FileInfos.  Will be
	// closed when all FileInfos have been put in the channel.
	FileInfoChannel() chan FileInfo
	// QueueSignature will queue a Checksum that will be
	// sent to the delta processor
	QueueSignature(checksum Checksum)
	// SignatureDone should be called when there are no more
	// Checksums to be generated
	SignatureDone()
	// SignatureChannel returns a channel that should be used
	// by the delta processor to read Checksums.  Will be
	// closed when all Checksums have been put in the channel.
	SignatureChannel() chan Checksum
	// QueueDelta will queue a Delta that will be
	// sent to the patch processor
	QueueDelta(delta Delta)
	// DeltaDone should be called when there are no more
	// Deltas to be generated
	DeltaDone()
	// DeltaChannel returns a channel that should be used
	// by the patch processor to read Delta.  Will be
	// closed when all Deltas have been put in the channel.
	DeltaChannel() chan Delta
	// PatchDone should be called when all deltas have been
	// processed by the patch processor and the transfer is
	// complete.
	PatchDone()

	Packeter() *Packeter
	// TCPDone is called by the TCP loops when they are done.
	// It tells the manager that those loops are no longer
	// running.
	TCPDone()
	// ReportError should be called when an error has been
	// reported, it will make sure all channels are closed
	// and it will also make sure that InError() will return
	// True so goroutines will stop doing anything.
	ReportError(err error)
	// Error returns whatever non-nil error that was passed by anyone
	// to ReportError
	Error() error
	// Done returns true PatchDone was called
	Done() bool
	// NetDone returns true when all net communication is done
	NetDone() bool
	// Stats returns the stats recorded by the manager
	Stats() *TransferStats
}

type NetStats

type NetStats struct {
	TCPLoopIterations        int64
	ResentSourcePackets      int64
	ResentDestinationPackets int64
}

type Options

type Options struct {
	Path        string
	Destination string

	FollowLinks bool
	BlockSize   int

	SourceHost    string
	SourceUDPPort int

	DestinationHost    string
	DestinationUDPPort int
}

Once a transfer is requested and responded to, the relevant information is copied into Options. This options contains the request options like Path, Destination, FollowLinks, and BlockSize, as well as host/port options. There is no direction on the Options object since it's the same object at the source and destination.

func (Options) Verify

func (opts Options) Verify() error

Verify will return an error if there's anything wrong with the request. Currently only checks that Path and Destination are absolute.

type Packet

type Packet struct {
	PacketID    uint64
	IsEndPacket bool
	ContentType PacketContentType
	Content     []byte
}

func MakePackets

func MakePackets(buffer *bytes.Buffer, packetType PacketContentType) []Packet

type PacketContentType

type PacketContentType uint8
const DeltaPacket PacketContentType = 2
const FileInfoPacket PacketContentType = 0
const SignaturePacket PacketContentType = 1

type Packeter

type Packeter struct {
	PacketChannel chan Packet

	LastDeletedPacket  uint64
	LastPacketSent     uint64
	LastPacketReceived uint64
	LastPacketDecoded  uint64
	// contains filtered or unexported fields
}

Packeter manages incoming and outgoing packets It keeps a copy of all packets sent until it's confirmed that they have been received. It also gathers incoming packets until all content groups are recieved so the can be decoded.

func NewPacketer

func NewPacketer() *Packeter

func (*Packeter) Close

func (packeter *Packeter) Close()

func (*Packeter) Done

func (packeter *Packeter) Done() bool

func (*Packeter) ReceievePacket

func (packeter *Packeter) ReceievePacket(packet Packet)

ReceivePacket inserts the packet into the receiveCache, which the Decoder goroutine is constantly iterating over and decoding. This function also optionally updates the LastPacketReceived.

func (*Packeter) ReceivePacketerStatusUpdate

func (packeter *Packeter) ReceivePacketerStatusUpdate(status PacketerStatus) PacketerStatus

ReceivePacketerStatusUpdate is called by a manger, it informs this packeter of the status of it's counterpart packeter. With this new information this packeter must:

  • delete unneeded entries from the sendCache
  • resend any packets that the other packeter thinks needs resending
  • determine what packets the other packeter needs to resend
  • respond with this packeter's status, including resend list

TODO: should we include some timing information with the status update

so that we can better determine whether or not it's appropriate
to request resent packets?

func (*Packeter) ReceiverDone

func (packeter *Packeter) ReceiverDone()

func (*Packeter) SendPackets

func (packeter *Packeter) SendPackets(packets []Packet) (uint64, error)

SendPackets inserts the supplied packets into the sendCache, adds them to the PacketChannel, increments LastPacketSent and returns the number of the last packet sent

func (*Packeter) SenderDone

func (packeter *Packeter) SenderDone()

type PacketerStatus

type PacketerStatus struct {
	LastPacketReceived uint64
	ResendPackets      []uint64
	LastPacketSent     uint64
}

PacketerStatus is part of the status that is sent back and forth by the TCPer. It's source/destination agnostic so the Packeter can be used identically on both sides.

type Request

type Request struct {
	RequestID uuid.UUID

	RequesterHost    string
	RequesterUDPPort int

	Host string
	Port int

	Direction Direction

	Path        string
	Destination string

	FollowLinks bool
	BlockSize   int
}

Request - information to initiate a transfer request

type RequestDone

type RequestDone struct {
	Done bool
}

RequestDone

type RequestResponse

type RequestResponse struct {
	Accepted  bool
	Reason    string
	RequestID uuid.UUID
	UDPPort   int
}

RequestResponse - response to a TransferRequest

type SourceManager

type SourceManager struct {
	// contains filtered or unexported fields
}

func NewSourceManager

func NewSourceManager() *SourceManager

func (*SourceManager) DeltaChannel

func (manager *SourceManager) DeltaChannel() chan Delta

func (*SourceManager) DeltaDone

func (manager *SourceManager) DeltaDone()

func (*SourceManager) Done

func (manager *SourceManager) Done() bool

func (*SourceManager) Error

func (manager *SourceManager) Error() error

func (*SourceManager) FileInfoChannel

func (manager *SourceManager) FileInfoChannel() chan FileInfo

func (*SourceManager) FileInfoDone

func (manager *SourceManager) FileInfoDone()

func (*SourceManager) NetDone

func (manager *SourceManager) NetDone() bool

func (*SourceManager) Packeter

func (manager *SourceManager) Packeter() *Packeter

func (*SourceManager) PatchDone

func (manager *SourceManager) PatchDone()

func (*SourceManager) QueueDelta

func (manager *SourceManager) QueueDelta(delta Delta)

func (*SourceManager) QueueFileInfo

func (manager *SourceManager) QueueFileInfo(fi FileInfo)

func (*SourceManager) QueueSignature

func (manager *SourceManager) QueueSignature(sig Checksum)

func (*SourceManager) ReceiveStatusUpdate

func (manager *SourceManager) ReceiveStatusUpdate(status DestinationTransferStatus) SourceTransferStatus

ReceiveStatusUpdate is called by the TCPer. It's the SourceManager's responsibility to call the packeter's "ReceivePacketerStatusUpdate" function as well, because the packeter may need to resend some packets, or delete some sent packets.

func (*SourceManager) ReportError

func (manager *SourceManager) ReportError(err error)

func (*SourceManager) SignatureChannel

func (manager *SourceManager) SignatureChannel() chan Checksum

func (*SourceManager) SignatureDone

func (manager *SourceManager) SignatureDone()

func (*SourceManager) Stats

func (manager *SourceManager) Stats() *TransferStats

func (*SourceManager) TCPDone

func (manager *SourceManager) TCPDone()

type SourceTransferStatus

type SourceTransferStatus struct {
	// Last{FileInfo,Delta}Packet is the packet number of the last
	// {FileInfo,Delta} Packet that will be sent. The
	// DestinationManager will use these indexes to know when
	// they've finished receiving packets and can close various
	// channels.
	LastFileInfoPacket uint64
	LastDeltaPacket    uint64

	SourcePacketerStatus PacketerStatus

	Failed string
}

TransferStatus is a struct that represents the status of the network communication for a transfer. It's the only kind of TCP message that is sent between sides.

type TransferStats

type TransferStats struct {
	Files         int64
	Symlinks      int64
	Directories   int64
	SourceSize    int64
	BytesSent     int64
	BytesSame     int64
	BytesCopyDest int64
	SigCacheHits  int64
	NetStats      *NetStats
}

func NewTransferStats

func NewTransferStats() *TransferStats

func SyncIncoming

func SyncIncoming(conn net.Conn, opts *Options) (*TransferStats, error)

func SyncLocal

func SyncLocal(opts *Options) (*TransferStats, error)

SyncLocal does all filesystem operations locally

func SyncOutgoing

func SyncOutgoing(conn net.Conn, opts *Options) (*TransferStats, error)

func (*TransferStats) RecordDelta

func (s *TransferStats) RecordDelta(delta Delta)

func (*TransferStats) RecordFileInfo

func (s *TransferStats) RecordFileInfo(fi FileInfo)

func (*TransferStats) RecordResentDestinationPackets

func (s *TransferStats) RecordResentDestinationPackets(n int)

func (*TransferStats) RecordResentSourcePackets

func (s *TransferStats) RecordResentSourcePackets(n int)

func (*TransferStats) RecordSignature

func (s *TransferStats) RecordSignature(sig Checksum)

func (*TransferStats) RecordTCPLoopIteration

func (s *TransferStats) RecordTCPLoopIteration()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL