Documentation ¶
Index ¶
- Constants
- Variables
- func AwaitEOF(s network.Stream) error
- func CloseStream(stream network.Stream) error
- func HandlerWithClose(f network.StreamHandler) network.StreamHandler
- func HandlerWithRW(f func(request *RequestMessage, response *ResponseMessage) (int32, string)) network.StreamHandler
- func HandlerWithRead(f func(request *RequestMessage)) network.StreamHandler
- func HandlerWithWrite(f func(request *RequestMessage) error) network.StreamHandler
- func ReadStream(stream network.Stream) ([]byte, error)
- func RegisterStreamHandler(h host.Host, p protocol.ID, handler network.StreamHandler)
- func SendErrorResponse(stream network.Stream, code int32, msg string)
- func WriteStream(msg []byte, stream network.Stream) error
- type Message
- func (*Message) Descriptor() ([]byte, []int)
- func (m *Message) GetReceiver() string
- func (m *Message) GetSender() string
- func (m *Message) GetType() string
- func (m *Message) Marshal() (dAtA []byte, err error)
- func (m *Message) MarshalTo(dAtA []byte) (int, error)
- func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Message) ProtoMessage()
- func (m *Message) Reset()
- func (m *Message) Size() (n int)
- func (m *Message) String() string
- func (m *Message) Unmarshal(dAtA []byte) error
- func (m *Message) XXX_DiscardUnknown()
- func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Message) XXX_Merge(src proto.Message)
- func (m *Message) XXX_Size() int
- func (m *Message) XXX_Unmarshal(b []byte) error
- type RequestMessage
- func (*RequestMessage) Descriptor() ([]byte, []int)
- func (m *RequestMessage) GetMessage() *Message
- func (m *RequestMessage) GetPayload() []byte
- func (m *RequestMessage) Marshal() (dAtA []byte, err error)
- func (m *RequestMessage) MarshalTo(dAtA []byte) (int, error)
- func (m *RequestMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*RequestMessage) ProtoMessage()
- func (m *RequestMessage) Reset()
- func (m *RequestMessage) Size() (n int)
- func (m *RequestMessage) String() string
- func (m *RequestMessage) Unmarshal(dAtA []byte) error
- func (m *RequestMessage) XXX_DiscardUnknown()
- func (m *RequestMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *RequestMessage) XXX_Merge(src proto.Message)
- func (m *RequestMessage) XXX_Size() int
- func (m *RequestMessage) XXX_Unmarshal(b []byte) error
- type ResponseMessage
- func (*ResponseMessage) Descriptor() ([]byte, []int)
- func (m *ResponseMessage) GetCode() int32
- func (m *ResponseMessage) GetData() []byte
- func (m *ResponseMessage) GetMessage() *Message
- func (m *ResponseMessage) GetMsg() string
- func (m *ResponseMessage) Marshal() (dAtA []byte, err error)
- func (m *ResponseMessage) MarshalTo(dAtA []byte) (int, error)
- func (m *ResponseMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ResponseMessage) ProtoMessage()
- func (m *ResponseMessage) Reset()
- func (m *ResponseMessage) Size() (n int)
- func (m *ResponseMessage) String() string
- func (m *ResponseMessage) Unmarshal(dAtA []byte) error
- func (m *ResponseMessage) XXX_DiscardUnknown()
- func (m *ResponseMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ResponseMessage) XXX_Merge(src proto.Message)
- func (m *ResponseMessage) XXX_Size() int
- func (m *ResponseMessage) XXX_Unmarshal(b []byte) error
Constants ¶
View Source
const (
// DefaultSteamProtocol 默认 dep2p 流协议
DefaultSteamProtocol = "/dep2p/stream/1.0.0"
)
Variables ¶
View Source
var ( ErrInvalidLengthStream = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowStream = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupStream = fmt.Errorf("proto: unexpected end of group") )
View Source
var EOFTimeout = time.Second * 60
EOFTimeout 是等待成功观察流上的 EOF 的最长时间。 默认为 60 秒。
View Source
var ErrExpectedEOF = fmt.Errorf("期望 EOF 时读取数据")
ErrExpectedEOF 当我们在期望 EOF 的情况下读取数据时返回。
View Source
var ( // 最大消息大小 MaxBlockSize = 1 << 25 // 32MB )
Functions ¶
func HandlerWithClose ¶
func HandlerWithClose(f network.StreamHandler) network.StreamHandler
HandlerWithClose 用关闭流和从恐慌中恢复来包装处理程序
func HandlerWithRW ¶
func HandlerWithRW(f func(request *RequestMessage, response *ResponseMessage) (int32, string)) network.StreamHandler
HandlerWithRW 用于读取、写入、关闭流以及从恐慌中恢复,来包装处理程序。 处理程序 f 现在接收 RequestMessage 和 ResponseMessage,允许直接在函数内部定义成功或错误的响应。
func HandlerWithRead ¶
func HandlerWithRead(f func(request *RequestMessage)) network.StreamHandler
HandlerWithRead 用读取、关闭流和从恐慌中恢复来包装处理程序
func HandlerWithWrite ¶
func HandlerWithWrite(f func(request *RequestMessage) error) network.StreamHandler
HandlerWithWrite 通过写入、关闭流和从恐慌中恢复来包装处理程序
func ReadStream ¶
ReadStream 从流中读取消息,带指数退避重试 特别是考虑到避免资源的过度消耗和处理网络故障情况,引入指数退避重试策略,并设置合理的重试次数上限。 重要的改进点: 指数退避:每次重试的超时时间都会增加,减少在网络不稳定时的重试频率,减轻服务器压力。 有限的重试次数:通过maxRetries限制重试次数,防止在遇到持续的网络问题时无限重试。 清晰的错误处理:根据错误类型决定是否重试。例如,只有在遇到网络超时或其他指定的网络错误时才进行重试。
func RegisterStreamHandler ¶
RegisterStreamHandler 注册流处理程序
func SendErrorResponse ¶
SendErrorResponse 是一个辅助函数,用于向流中发送错误响应。
Types ¶
type Message ¶
type Message struct { Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` Sender string `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"` Receiver string `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
Message
func (*Message) Descriptor ¶
func (*Message) GetReceiver ¶
func (*Message) MarshalToSizedBuffer ¶
func (*Message) ProtoMessage ¶
func (*Message) ProtoMessage()
func (*Message) XXX_DiscardUnknown ¶
func (m *Message) XXX_DiscardUnknown()
func (*Message) XXX_Marshal ¶
func (*Message) XXX_Unmarshal ¶
type RequestMessage ¶
type RequestMessage struct { Message *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
请求信息
func (*RequestMessage) Descriptor ¶
func (*RequestMessage) Descriptor() ([]byte, []int)
func (*RequestMessage) GetMessage ¶
func (m *RequestMessage) GetMessage() *Message
func (*RequestMessage) GetPayload ¶
func (m *RequestMessage) GetPayload() []byte
func (*RequestMessage) Marshal ¶
func (m *RequestMessage) Marshal() (dAtA []byte, err error)
func (*RequestMessage) MarshalToSizedBuffer ¶
func (m *RequestMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*RequestMessage) ProtoMessage ¶
func (*RequestMessage) ProtoMessage()
func (*RequestMessage) Reset ¶
func (m *RequestMessage) Reset()
func (*RequestMessage) Size ¶
func (m *RequestMessage) Size() (n int)
func (*RequestMessage) String ¶
func (m *RequestMessage) String() string
func (*RequestMessage) Unmarshal ¶
func (m *RequestMessage) Unmarshal(dAtA []byte) error
func (*RequestMessage) XXX_DiscardUnknown ¶
func (m *RequestMessage) XXX_DiscardUnknown()
func (*RequestMessage) XXX_Marshal ¶
func (m *RequestMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*RequestMessage) XXX_Merge ¶
func (m *RequestMessage) XXX_Merge(src proto.Message)
func (*RequestMessage) XXX_Size ¶
func (m *RequestMessage) XXX_Size() int
func (*RequestMessage) XXX_Unmarshal ¶
func (m *RequestMessage) XXX_Unmarshal(b []byte) error
type ResponseMessage ¶
type ResponseMessage struct { Message *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` Code int32 `protobuf:"varint,2,opt,name=code,proto3" json:"code,omitempty"` Msg string `protobuf:"bytes,3,opt,name=msg,proto3" json:"msg,omitempty"` Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
回应信息
func (*ResponseMessage) Descriptor ¶
func (*ResponseMessage) Descriptor() ([]byte, []int)
func (*ResponseMessage) GetCode ¶
func (m *ResponseMessage) GetCode() int32
func (*ResponseMessage) GetData ¶
func (m *ResponseMessage) GetData() []byte
func (*ResponseMessage) GetMessage ¶
func (m *ResponseMessage) GetMessage() *Message
func (*ResponseMessage) GetMsg ¶
func (m *ResponseMessage) GetMsg() string
func (*ResponseMessage) Marshal ¶
func (m *ResponseMessage) Marshal() (dAtA []byte, err error)
func (*ResponseMessage) MarshalToSizedBuffer ¶
func (m *ResponseMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ResponseMessage) ProtoMessage ¶
func (*ResponseMessage) ProtoMessage()
func (*ResponseMessage) Reset ¶
func (m *ResponseMessage) Reset()
func (*ResponseMessage) Size ¶
func (m *ResponseMessage) Size() (n int)
func (*ResponseMessage) String ¶
func (m *ResponseMessage) String() string
func (*ResponseMessage) Unmarshal ¶
func (m *ResponseMessage) Unmarshal(dAtA []byte) error
func (*ResponseMessage) XXX_DiscardUnknown ¶
func (m *ResponseMessage) XXX_DiscardUnknown()
func (*ResponseMessage) XXX_Marshal ¶
func (m *ResponseMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ResponseMessage) XXX_Merge ¶
func (m *ResponseMessage) XXX_Merge(src proto.Message)
func (*ResponseMessage) XXX_Size ¶
func (m *ResponseMessage) XXX_Size() int
func (*ResponseMessage) XXX_Unmarshal ¶
func (m *ResponseMessage) XXX_Unmarshal(b []byte) error
Click to show internal directories.
Click to hide internal directories.