streams

package
v2.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultSteamProtocol 默认 dep2p 流协议
	DefaultSteamProtocol = "/dep2p/stream/1.0.0"
)

Variables

View Source
var (
	ErrInvalidLengthStream        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowStream          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupStream = fmt.Errorf("proto: unexpected end of group")
)
View Source
var EOFTimeout = time.Second * 60

EOFTimeout 是等待成功观察流上的 EOF 的最长时间。 默认为 60 秒。

View Source
var ErrExpectedEOF = fmt.Errorf("期望 EOF 时读取数据")

ErrExpectedEOF 当我们在期望 EOF 的情况下读取数据时返回。

View Source
var (

	// 最大消息大小
	MaxBlockSize = 1 << 25 // 32MB
)

Functions

func AwaitEOF

func AwaitEOF(s network.Stream) error

AwaitEOF 等待给定流上的 EOF,如果失败则返回错误。 它最多等待 EOFTimeout(默认为 1 分钟),然后重置流。

func CloseStream

func CloseStream(stream network.Stream) error

CloseStream 写入后关闭流,并等待EOF。

func HandlerWithClose

func HandlerWithClose(f network.StreamHandler) network.StreamHandler

HandlerWithClose 用关闭流和从恐慌中恢复来包装处理程序

func HandlerWithRW

func HandlerWithRW(f func(request *RequestMessage, response *ResponseMessage) (int32, string)) network.StreamHandler

HandlerWithRW 用于读取、写入、关闭流以及从恐慌中恢复,来包装处理程序。 处理程序 f 现在接收 RequestMessage 和 ResponseMessage,允许直接在函数内部定义成功或错误的响应。

func HandlerWithRead

func HandlerWithRead(f func(request *RequestMessage)) network.StreamHandler

HandlerWithRead 用读取、关闭流和从恐慌中恢复来包装处理程序

func HandlerWithWrite

func HandlerWithWrite(f func(request *RequestMessage) error) network.StreamHandler

HandlerWithWrite 通过写入、关闭流和从恐慌中恢复来包装处理程序

func ReadStream

func ReadStream(stream network.Stream) ([]byte, error)

ReadStream 从流中读取消息,带指数退避重试 特别是考虑到避免资源的过度消耗和处理网络故障情况,引入指数退避重试策略,并设置合理的重试次数上限。 重要的改进点: 指数退避:每次重试的超时时间都会增加,减少在网络不稳定时的重试频率,减轻服务器压力。 有限的重试次数:通过maxRetries限制重试次数,防止在遇到持续的网络问题时无限重试。 清晰的错误处理:根据错误类型决定是否重试。例如,只有在遇到网络超时或其他指定的网络错误时才进行重试。

func RegisterStreamHandler

func RegisterStreamHandler(h host.Host, p protocol.ID, handler network.StreamHandler)

RegisterStreamHandler 注册流处理程序

func SendErrorResponse

func SendErrorResponse(stream network.Stream, code int32, msg string)

SendErrorResponse 是一个辅助函数,用于向流中发送错误响应。

func WriteStream

func WriteStream(msg []byte, stream network.Stream) error

WriteStream 将消息写入流 采用类似于ReadStream的优化策略来增强其鲁棒性,尤其是在面对网络问题时。 重点将包括设置写操作的超时时间和对潜在的写操作失败进行适当的错误处理。 考虑到WriteStream的操作通常比读操作更简单(通常是一次性写入而非多次读取),通过设置整体的写超时来简化流程。

Types

type Message

type Message struct {
	Type                 string   `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
	Sender               string   `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"`
	Receiver             string   `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

Message

func (*Message) Descriptor

func (*Message) Descriptor() ([]byte, []int)

func (*Message) GetReceiver

func (m *Message) GetReceiver() string

func (*Message) GetSender

func (m *Message) GetSender() string

func (*Message) GetType

func (m *Message) GetType() string

func (*Message) Marshal

func (m *Message) Marshal() (dAtA []byte, err error)

func (*Message) MarshalTo

func (m *Message) MarshalTo(dAtA []byte) (int, error)

func (*Message) MarshalToSizedBuffer

func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) Size

func (m *Message) Size() (n int)

func (*Message) String

func (m *Message) String() string

func (*Message) Unmarshal

func (m *Message) Unmarshal(dAtA []byte) error

func (*Message) XXX_DiscardUnknown

func (m *Message) XXX_DiscardUnknown()

func (*Message) XXX_Marshal

func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Message) XXX_Merge

func (m *Message) XXX_Merge(src proto.Message)

func (*Message) XXX_Size

func (m *Message) XXX_Size() int

func (*Message) XXX_Unmarshal

func (m *Message) XXX_Unmarshal(b []byte) error

type RequestMessage

type RequestMessage struct {
	Message              *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	Payload              []byte   `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

请求信息

func (*RequestMessage) Descriptor

func (*RequestMessage) Descriptor() ([]byte, []int)

func (*RequestMessage) GetMessage

func (m *RequestMessage) GetMessage() *Message

func (*RequestMessage) GetPayload

func (m *RequestMessage) GetPayload() []byte

func (*RequestMessage) Marshal

func (m *RequestMessage) Marshal() (dAtA []byte, err error)

func (*RequestMessage) MarshalTo

func (m *RequestMessage) MarshalTo(dAtA []byte) (int, error)

func (*RequestMessage) MarshalToSizedBuffer

func (m *RequestMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*RequestMessage) ProtoMessage

func (*RequestMessage) ProtoMessage()

func (*RequestMessage) Reset

func (m *RequestMessage) Reset()

func (*RequestMessage) Size

func (m *RequestMessage) Size() (n int)

func (*RequestMessage) String

func (m *RequestMessage) String() string

func (*RequestMessage) Unmarshal

func (m *RequestMessage) Unmarshal(dAtA []byte) error

func (*RequestMessage) XXX_DiscardUnknown

func (m *RequestMessage) XXX_DiscardUnknown()

func (*RequestMessage) XXX_Marshal

func (m *RequestMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*RequestMessage) XXX_Merge

func (m *RequestMessage) XXX_Merge(src proto.Message)

func (*RequestMessage) XXX_Size

func (m *RequestMessage) XXX_Size() int

func (*RequestMessage) XXX_Unmarshal

func (m *RequestMessage) XXX_Unmarshal(b []byte) error

type ResponseMessage

type ResponseMessage struct {
	Message              *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	Code                 int32    `protobuf:"varint,2,opt,name=code,proto3" json:"code,omitempty"`
	Msg                  string   `protobuf:"bytes,3,opt,name=msg,proto3" json:"msg,omitempty"`
	Data                 []byte   `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

回应信息

func (*ResponseMessage) Descriptor

func (*ResponseMessage) Descriptor() ([]byte, []int)

func (*ResponseMessage) GetCode

func (m *ResponseMessage) GetCode() int32

func (*ResponseMessage) GetData

func (m *ResponseMessage) GetData() []byte

func (*ResponseMessage) GetMessage

func (m *ResponseMessage) GetMessage() *Message

func (*ResponseMessage) GetMsg

func (m *ResponseMessage) GetMsg() string

func (*ResponseMessage) Marshal

func (m *ResponseMessage) Marshal() (dAtA []byte, err error)

func (*ResponseMessage) MarshalTo

func (m *ResponseMessage) MarshalTo(dAtA []byte) (int, error)

func (*ResponseMessage) MarshalToSizedBuffer

func (m *ResponseMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ResponseMessage) ProtoMessage

func (*ResponseMessage) ProtoMessage()

func (*ResponseMessage) Reset

func (m *ResponseMessage) Reset()

func (*ResponseMessage) Size

func (m *ResponseMessage) Size() (n int)

func (*ResponseMessage) String

func (m *ResponseMessage) String() string

func (*ResponseMessage) Unmarshal

func (m *ResponseMessage) Unmarshal(dAtA []byte) error

func (*ResponseMessage) XXX_DiscardUnknown

func (m *ResponseMessage) XXX_DiscardUnknown()

func (*ResponseMessage) XXX_Marshal

func (m *ResponseMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ResponseMessage) XXX_Merge

func (m *ResponseMessage) XXX_Merge(src proto.Message)

func (*ResponseMessage) XXX_Size

func (m *ResponseMessage) XXX_Size() int

func (*ResponseMessage) XXX_Unmarshal

func (m *ResponseMessage) XXX_Unmarshal(b []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL