Documentation ¶
Index ¶
- Constants
- func ClientMaxBufferSize(s int) func(c *Client)
- func LogDebug(args ...interface{})
- func LogDebugf(format string, args ...interface{})
- func LogError(args ...interface{})
- func LogErrorf(format string, args ...interface{})
- func LogFatal(args ...interface{})
- func LogFatalf(format string, args ...interface{})
- func LogInfo(args ...interface{})
- func LogInfof(format string, args ...interface{})
- func LogWarn(args ...interface{})
- func LogWarnf(format string, args ...interface{})
- func SetOperation(ctx context.Context, op string)
- type Any
- type Client
- func (c *Client) OnConnect(fn ConnCallback)
- func (c *Client) OnDisconnect(fn ConnCallback)
- func (c *Client) Subscribe(stream string, handler func(msg *Event)) error
- func (c *Client) SubscribeChan(stream string, ch chan *Event) error
- func (c *Client) SubscribeChanRaw(ch chan *Event) error
- func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error
- func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error
- func (c *Client) SubscribeRaw(handler func(msg *Event)) error
- func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error
- func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error
- func (c *Client) Unsubscribe(ch chan *Event)
- type ClientOption
- type ConnCallback
- type Event
- type EventLog
- type EventStreamReader
- type MessagePayload
- type ResponseValidator
- type Server
- func (s *Server) CreateStream(streamId StreamID) *Stream
- func (s *Server) Endpoint() (*url.URL, error)
- func (s *Server) Handle(path string, h http.Handler)
- func (s *Server) HandleFunc(path string, h http.HandlerFunc)
- func (s *Server) HandleHeader(key, val string, h http.HandlerFunc)
- func (s *Server) HandlePrefix(prefix string, h http.Handler)
- func (s *Server) HandleServeHTTP(path string)
- func (s *Server) Name() string
- func (s *Server) Publish(_ context.Context, streamId StreamID, event *Event)
- func (s *Server) PublishData(ctx context.Context, streamId StreamID, data MessagePayload) error
- func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *Server) Start(ctx context.Context) error
- func (s *Server) Stop(ctx context.Context) error
- func (s *Server) TryPublish(_ context.Context, streamId StreamID, event *Event) bool
- type ServerOption
- func WithAddress(addr string) ServerOption
- func WithAutoReply(enable bool) ServerOption
- func WithAutoStream(enable bool) ServerOption
- func WithBufferSize(size int) ServerOption
- func WithCodec(c string) ServerOption
- func WithEncodeBase64(enable bool) ServerOption
- func WithEventTTL(timeout time.Duration) ServerOption
- func WithHeaders(headers map[string]string) ServerOption
- func WithListener(lis net.Listener) ServerOption
- func WithNetwork(network string) ServerOption
- func WithPath(path string) ServerOption
- func WithSplitData(enable bool) ServerOption
- func WithSubscriberFunction(sub SubscriberFunction, unsub SubscriberFunction) ServerOption
- func WithTLSConfig(c *tls.Config) ServerOption
- func WithTimeout(timeout time.Duration) ServerOption
- type Stream
- type StreamID
- type StreamManager
- func (s *StreamManager) Add(stream *Stream)
- func (s *StreamManager) Clean()
- func (s *StreamManager) Count() int
- func (s *StreamManager) Exist(streamId StreamID) bool
- func (s *StreamManager) Get(streamId StreamID) *Stream
- func (s *StreamManager) Range(fn func(*Stream))
- func (s *StreamManager) Remove(stream *Stream)
- func (s *StreamManager) RemoveWithID(streamId StreamID)
- type StreamMap
- type Subscriber
- type SubscriberFunction
- type Transport
- func (tr *Transport) Endpoint() string
- func (tr *Transport) Kind() kratosTransport.Kind
- func (tr *Transport) Operation() string
- func (tr *Transport) PathTemplate() string
- func (tr *Transport) ReplyHeader() kratosTransport.Header
- func (tr *Transport) Request() *http.Request
- func (tr *Transport) RequestHeader() kratosTransport.Header
- type Transporter
Constants ¶
View Source
const ( FieldId = "id" FieldData = "data" FieldEvent = "event" FieldRetry = "retry" FieldComment = ":" )
View Source
const DefaultBufferSize = 1024
View Source
const (
KindSSE kratosTransport.Kind = "sse"
)
Variables ¶
This section is empty.
Functions ¶
func ClientMaxBufferSize ¶
func SetOperation ¶
SetOperation sets the transport operation.
Types ¶
type Client ¶
type Client struct { Retry time.Time ReconnectStrategy backoff.BackOff Headers map[string]string ReconnectNotify backoff.Notify ResponseValidator ResponseValidator Connection *http.Client URL string LastEventID atomic.Value EncodingBase64 bool Connected bool // contains filtered or unexported fields }
func (*Client) OnConnect ¶
func (c *Client) OnConnect(fn ConnCallback)
func (*Client) OnDisconnect ¶
func (c *Client) OnDisconnect(fn ConnCallback)
func (*Client) SubscribeChanRaw ¶
func (*Client) SubscribeChanRawWithContext ¶
func (*Client) SubscribeChanWithContext ¶
func (*Client) SubscribeRaw ¶
func (*Client) SubscribeRawWithContext ¶
func (*Client) SubscribeWithContext ¶
func (*Client) Unsubscribe ¶
type ClientOption ¶
type ClientOption func(o *Client)
func WithEndpoint ¶
func WithEndpoint(uri string) ClientOption
type ConnCallback ¶
type ConnCallback func(c *Client)
type EventLog ¶
type EventLog []*Event
func (*EventLog) Replay ¶
func (e *EventLog) Replay(s *Subscriber)
type EventStreamReader ¶
type EventStreamReader struct {
// contains filtered or unexported fields
}
func NewEventStreamReader ¶
func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader
func (*EventStreamReader) ReadEvent ¶
func (e *EventStreamReader) ReadEvent() ([]byte, error)
type MessagePayload ¶
type MessagePayload Any
type Server ¶
func NewServer ¶
func NewServer(opts ...ServerOption) *Server
func (*Server) CreateStream ¶
func (*Server) HandleFunc ¶
func (s *Server) HandleFunc(path string, h http.HandlerFunc)
func (*Server) HandleHeader ¶
func (s *Server) HandleHeader(key, val string, h http.HandlerFunc)
func (*Server) HandleServeHTTP ¶
func (*Server) PublishData ¶
type ServerOption ¶
type ServerOption func(o *Server)
func WithAddress ¶
func WithAddress(addr string) ServerOption
func WithAutoReply ¶
func WithAutoReply(enable bool) ServerOption
func WithAutoStream ¶
func WithAutoStream(enable bool) ServerOption
func WithBufferSize ¶
func WithBufferSize(size int) ServerOption
func WithCodec ¶
func WithCodec(c string) ServerOption
func WithEncodeBase64 ¶
func WithEncodeBase64(enable bool) ServerOption
func WithEventTTL ¶
func WithEventTTL(timeout time.Duration) ServerOption
func WithHeaders ¶
func WithHeaders(headers map[string]string) ServerOption
func WithListener ¶
func WithListener(lis net.Listener) ServerOption
func WithNetwork ¶
func WithNetwork(network string) ServerOption
func WithPath ¶ added in v1.2.1
func WithPath(path string) ServerOption
func WithSplitData ¶
func WithSplitData(enable bool) ServerOption
func WithSubscriberFunction ¶
func WithSubscriberFunction(sub SubscriberFunction, unsub SubscriberFunction) ServerOption
func WithTLSConfig ¶
func WithTLSConfig(c *tls.Config) ServerOption
func WithTimeout ¶
func WithTimeout(timeout time.Duration) ServerOption
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
func NewStreamManager ¶
func NewStreamManager() *StreamManager
func (*StreamManager) Add ¶
func (s *StreamManager) Add(stream *Stream)
func (*StreamManager) Clean ¶
func (s *StreamManager) Clean()
func (*StreamManager) Count ¶
func (s *StreamManager) Count() int
func (*StreamManager) Exist ¶
func (s *StreamManager) Exist(streamId StreamID) bool
func (*StreamManager) Get ¶
func (s *StreamManager) Get(streamId StreamID) *Stream
func (*StreamManager) Range ¶
func (s *StreamManager) Range(fn func(*Stream))
func (*StreamManager) Remove ¶
func (s *StreamManager) Remove(stream *Stream)
func (*StreamManager) RemoveWithID ¶
func (s *StreamManager) RemoveWithID(streamId StreamID)
type Subscriber ¶
type SubscriberFunction ¶
type SubscriberFunction func(streamID StreamID, sub *Subscriber)
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is a websocket transport.
func (*Transport) Kind ¶
func (tr *Transport) Kind() kratosTransport.Kind
Kind returns the transport kind.
func (*Transport) PathTemplate ¶
PathTemplate returns the http path template.
func (*Transport) ReplyHeader ¶
func (tr *Transport) ReplyHeader() kratosTransport.Header
ReplyHeader returns the reply header.
func (*Transport) RequestHeader ¶
func (tr *Transport) RequestHeader() kratosTransport.Header
RequestHeader returns the request header.
type Transporter ¶
type Transporter interface { kratosTransport.Transporter Request() *http.Request PathTemplate() string }
Click to show internal directories.
Click to hide internal directories.