stream

package
v0.0.0-...-39cdb81 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2019 License: GPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Low uint8 = iota
	Mid
	High
	Top
	PriorityQueue    = 4    //优先级队列数-低、中、高、顶
	PriorityQueueCap = 4096 //队列容量
	HashSize         = 32
)
View Source
const (
	BatchSize = 128
)

Variables

View Source
var ErrMaxPeerServers = errors.New("max peer servers")

如果达到对等服务器限制,将返回errmaxpeerservers。 它将在subscriberormsg中发送。

Functions

func FormatSyncBinKey

func FormatSyncBinKey(bin uint8) string

FormatSyncBinkey返回的字符串表示形式 要用作同步流密钥的Kademlia bin号。

func ParseSyncBinKey

func ParseSyncBinKey(s string) (uint8, error)

ParseSyncBinKey分析字符串表示形式 并返回Kademlia bin编号。

func RegisterSwarmSyncerClient

func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)

registerwarmsyncerclient为注册客户端构造函数函数 处理传入的同步流

func RegisterSwarmSyncerServer

func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)

Types

type API

type API struct {
	// contains filtered or unexported fields
}

func NewAPI

func NewAPI(r *Registry) *API

func (*API) SubscribeStream

func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error

func (*API) UnsubscribeStream

func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error

type ChunkDeliveryMsg

type ChunkDeliveryMsg struct {
	Addr  storage.Address
	SData []byte //存储的块数据(包括大小)
	// contains filtered or unexported fields
}

区块传送总是使用相同的讯息类型….

type ChunkDeliveryMsgRetrieval

type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg

定义用于检索的块传递(使用记帐)

type ChunkDeliveryMsgSyncing

type ChunkDeliveryMsgSyncing ChunkDeliveryMsg

定义用于同步的块传递(无记帐)

type Client

type Client interface {
	NeedData(context.Context, []byte) func(context.Context) error
	BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
	Close()
}

传入对等拖缆的客户端接口

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

func NewDelivery

func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery

func (*Delivery) RequestFromPeers

func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error)

RequestFromPeers将块检索请求发送到

type Handover

type Handover struct {
	Stream     Stream //河流名称
	Start, End uint64 //散列索引
	Root       []byte //索引段包含证明的根哈希
}

移交表示上游对等方移交流段的声明。

type HandoverProof

type HandoverProof struct {
	Sig []byte //签名(哈希(串行化(移交)))
	*Handover
}

handOverfloof表示上游对等端移交流部分的签名语句

type OfferedHashesMsg

type OfferedHashesMsg struct {
	Stream         Stream //河流名称
	From, To       uint64 //对等和数据库特定条目计数
	Hashes         []byte //哈希流(128)
	*HandoverProof        //防交
}

offeredhashemsg是一个协议消息,用于提供 流段

func (OfferedHashesMsg) String

func (m OfferedHashesMsg) String() string

提供的字符串漂亮打印

type Peer

type Peer struct {
	*protocols.Peer
	// contains filtered or unexported fields
}

Peer是流协议的对等扩展

func NewPeer

func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer

newpeer是peer的构造函数

func (*Peer) Deliver

func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error

传递向对等端发送storerequestmsg协议消息 根据“同步”参数,我们发送不同的消息类型

func (*Peer) HandleMsg

func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error

handlemsg是委托传入消息的消息处理程序

func (*Peer) SendOfferedHashes

func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error

sendforferedhashes发送offeredhashemsg协议消息

func (*Peer) SendPriority

func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error

sendpriority使用传出优先级队列向对等端发送消息

type QuitMsg

type QuitMsg struct {
	Stream Stream
}

type Range

type Range struct {
	From, To uint64
}

func NewRange

func NewRange(from, to uint64) *Range

func (*Range) String

func (r *Range) String() string

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

传出和传入拖缆构造函数的注册表

func NewRegistry

func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry

NewRegistry是拖缆构造函数

func (*Registry) APIs

func (r *Registry) APIs() []rpc.API

func (*Registry) Close

func (r *Registry) Close() error

func (*Registry) GetClientFunc

func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)

用于传入拖缆构造函数的getclient访问器

func (*Registry) GetServerFunc

func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)

用于传入拖缆构造函数的getserver访问器

func (*Registry) GetSpec

func (r *Registry) GetSpec() *protocols.Spec

getspec将拖缆规格返回给调用者 这曾经是一个全局变量,但用于模拟 多个节点其字段(尤其是钩子)将被覆盖

func (*Registry) Protocols

func (r *Registry) Protocols() []p2p.Protocol

func (*Registry) Quit

func (r *Registry) Quit(peerId enode.ID, s Stream) error

quit将quitmsg发送到对等端以删除 流对等客户端并终止流。

func (*Registry) RegisterClientFunc

func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))

RegisterClient注册一个传入的拖缆构造函数

func (*Registry) RegisterServerFunc

func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))

RegisterServer注册传出拖缆构造函数

func (*Registry) RequestSubscription

func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error

func (*Registry) Run

func (r *Registry) Run(p *network.BzzPeer) error

运行协议运行函数

func (*Registry) Start

func (r *Registry) Start(server *p2p.Server) error

func (*Registry) Stop

func (r *Registry) Stop() error

func (*Registry) Subscribe

func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error

订阅启动拖缆

func (*Registry) Unsubscribe

func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error

type RegistryOptions

type RegistryOptions struct {
	SkipCheck       bool
	Syncing         SyncingOption   //定义同步行为
	Retrieval       RetrievalOption //定义检索行为
	SyncUpdateDelay time.Duration
	MaxPeerServers  int //注册表中每个对等服务器的限制
}

RegistryOptions保留NewRegistry构造函数的可选值。

type RequestSubscriptionMsg

type RequestSubscriptionMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  //通过优先渠道交付
}

request subscription msg是节点请求订阅的协议msg 特定流

type RetrievalOption

type RetrievalOption int
const (
	//检索已禁用。主要用于隔离同步功能的测试(即仅同步)
	RetrievalDisabled RetrievalOption = iota
	//仅注册检索请求的客户端。
	//(轻节点不提供检索请求)
	//一旦注册了客户端,总是发送用于检索请求流的订阅
	RetrievalClientOnly
	//客户端和服务器功能都已注册,订阅将自动发送
	RetrievalEnabled
)

type RetrieveRequestMsg

type RetrieveRequestMsg struct {
	Addr      storage.Address
	SkipCheck bool
	HopCount  uint8
}

retrieverequestmsg是块检索请求的协议msg

type Server

type Server interface {
	//初始化服务器时调用sessionindex
	//获取流数据的当前光标状态。
	//基于此索引,实时和历史流间隔
	//将在调用setnextbatch之前进行调整。
	SessionIndex() (uint64, error)
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
	GetData(context.Context, []byte) ([]byte, error)
	Close()
}

传出对等拖缆的服务器接口

type Stream

type Stream struct {
	//名称用于标识客户机和服务器功能。
	Name string
	//key是特定流数据的名称。
	Key string
	//Live定义流是否只传递新数据
	//对于特定流。
	Live bool
}

流定义唯一的流标识符。

func NewStream

func NewStream(name string, key string, live bool) Stream

func (Stream) String

func (s Stream) String() string

字符串基于所有流字段返回流ID。

type StreamerPrices

type StreamerPrices struct {
	// contains filtered or unexported fields
}

有责任感的信息需要附加一些元信息 为了评估正确的价格

func (*StreamerPrices) Price

func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price

price实现会计接口并返回特定消息的价格

type SubscribeErrorMsg

type SubscribeErrorMsg struct {
	Error string
}

type SubscribeMsg

type SubscribeMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  //通过优先渠道交付
}

subcribeMsg是用于请求流的协议消息(节)

type SwarmChunkServer

type SwarmChunkServer struct {
	// contains filtered or unexported fields
}

swarmchunkserver实现服务器

func NewSwarmChunkServer

func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer

newswarmchunkserver是swarmchunkserver构造函数

func (*SwarmChunkServer) Close

func (s *SwarmChunkServer) Close()

需要在流服务器上调用Close

func (*SwarmChunkServer) GetData

func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error)

GetData从数据库存储检索块数据

func (*SwarmChunkServer) SessionIndex

func (s *SwarmChunkServer) SessionIndex() (uint64, error)

对于swarmchunkserver,sessionindex在所有情况下都返回零。

func (*SwarmChunkServer) SetNextBatch

func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)

设置下一批

type SwarmSyncerClient

type SwarmSyncerClient struct {
	// contains filtered or unexported fields
}

垃圾同步机

func NewSwarmSyncerClient

func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error)

NewsWarmSyncerClient是可验证数据交换同步器的控制器

func (*SwarmSyncerClient) BatchDone

func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)

巴奇多

func (*SwarmSyncerClient) Close

func (s *SwarmSyncerClient) Close()

func (*SwarmSyncerClient) NeedData

func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error)

需求数据

type SwarmSyncerServer

type SwarmSyncerServer struct {
	// contains filtered or unexported fields
}

swarmsyncerserver实现在存储箱上同步历史记录的服务器 提供的流: *带或不带支票的实时请求交付 (实时/非实时历史记录)每个邻近箱的块同步

func NewSwarmSyncerServer

func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error)

newswarmsyncerserver是swarmsyncerserver的构造函数

func (*SwarmSyncerServer) Close

func (s *SwarmSyncerServer) Close()

需要在流服务器上调用Close

func (*SwarmSyncerServer) GetData

func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error)

getdata从netstore检索实际块

func (*SwarmSyncerServer) SessionIndex

func (s *SwarmSyncerServer) SessionIndex() (uint64, error)

sessionindex返回当前存储箱(po)索引。

func (*SwarmSyncerServer) SetNextBatch

func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)

getbatch从dbstore检索下一批哈希

type SyncingOption

type SyncingOption int

枚举用于同步和检索的选项

const (
	//同步已禁用
	SyncingDisabled SyncingOption = iota
	//注册客户端和服务器,但不订阅
	SyncingRegisterOnly
	//客户端和服务器功能都已注册,订阅将自动发送
	SyncingAutoSubscribe
)

同步选项

type Takeover

type Takeover Handover

接管表示下游对等方接管的语句(存储所有数据) 移交

type TakeoverProof

type TakeoverProof struct {
	Sig []byte //符号(哈希(序列化(接管)))
	*Takeover
}

takeoveroof表示下游对等方接管的签名声明 河道断面

type TakeoverProofMsg

type TakeoverProofMsg TakeoverProof

takeoveroofmsg是下游对等机发送的协议消息

func (TakeoverProofMsg) String

func (m TakeoverProofMsg) String() string

字符串漂亮打印takeoveroofmsg

type UnsubscribeMsg

type UnsubscribeMsg struct {
	Stream Stream
}

type WantedHashesMsg

type WantedHashesMsg struct {
	Stream   Stream
	Want     []byte //位向量,指示批处理中需要哪些键
	From, To uint64 //下一个间隔偏移量-如果不继续,则为空
}

WantedHashesMsg是用于发送哈希的协议消息数据 在offeredhashemsg中提供的下游对等方实际希望发送

func (WantedHashesMsg) String

func (m WantedHashesMsg) String() string

字符串漂亮打印WantedHashesMsg

type WrappedPriorityMsg

type WrappedPriorityMsg struct {
	Context context.Context
	Msg     interface{}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL