Documentation ¶
Index ¶
- Constants
- func SetOperation(ctx context.Context, op string)
- type Any
- type Event
- type EventLog
- type MessagePayload
- type Server
- func (s *Server) CreateStream(streamId StreamID) *Stream
- func (s *Server) Endpoint() (*url.URL, error)
- func (s *Server) Handle(path string, h http.Handler)
- func (s *Server) HandleFunc(path string, h http.HandlerFunc)
- func (s *Server) HandleHeader(key, val string, h http.HandlerFunc)
- func (s *Server) HandlePrefix(prefix string, h http.Handler)
- func (s *Server) HandleServeHTTP(path string)
- func (s *Server) Name() string
- func (s *Server) Publish(_ context.Context, streamId StreamID, event *Event)
- func (s *Server) PublishData(ctx context.Context, streamId StreamID, data MessagePayload) error
- func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *Server) Start(ctx context.Context) error
- func (s *Server) Stop(ctx context.Context) error
- func (s *Server) TryPublish(_ context.Context, streamId StreamID, event *Event) bool
- type ServerOption
- func WithAddress(addr string) ServerOption
- func WithAutoReply(enable bool) ServerOption
- func WithAutoStream(enable bool) ServerOption
- func WithBufferSize(size int) ServerOption
- func WithCodec(c string) ServerOption
- func WithEncodeBase64(enable bool) ServerOption
- func WithEventTTL(timeout time.Duration) ServerOption
- func WithHeaders(headers map[string]string) ServerOption
- func WithListener(lis net.Listener) ServerOption
- func WithNetwork(network string) ServerOption
- func WithPath(path string) ServerOption
- func WithSplitData(enable bool) ServerOption
- func WithSubscriberFunction(sub SubscriberFunction, unsub SubscriberFunction) ServerOption
- func WithTLSConfig(c *tls.Config) ServerOption
- func WithTimeout(timeout time.Duration) ServerOption
- type Stream
- type StreamID
- type StreamManager
- func (s *StreamManager) Add(stream *Stream)
- func (s *StreamManager) Clean()
- func (s *StreamManager) Count() int
- func (s *StreamManager) Exist(streamId StreamID) bool
- func (s *StreamManager) Get(streamId StreamID) *Stream
- func (s *StreamManager) Range(fn func(*Stream))
- func (s *StreamManager) Remove(stream *Stream)
- func (s *StreamManager) RemoveWithID(streamId StreamID)
- type StreamMap
- type Subscriber
- type SubscriberFunction
- type Transport
- func (tr *Transport) Endpoint() string
- func (tr *Transport) Kind() transport.Kind
- func (tr *Transport) Operation() string
- func (tr *Transport) PathTemplate() string
- func (tr *Transport) ReplyHeader() transport.Header
- func (tr *Transport) Request() *http.Request
- func (tr *Transport) RequestHeader() transport.Header
- type Transporter
Constants ¶
const ( FieldId = "id" FieldData = "data" FieldEvent = "event" FieldRetry = "retry" FieldComment = ":" )
const DefaultBufferSize = 1024
const (
KindSSE transport.Kind = "sse"
)
Variables ¶
This section is empty.
Functions ¶
func SetOperation ¶
SetOperation sets the transport operation.
Types ¶
type Event ¶
type Event struct { ID []byte Data []byte Event []byte Retry []byte Comment []byte // contains filtered or unexported fields }
Event holds all the event source fields
type MessagePayload ¶
type MessagePayload Any
type Server ¶
func NewServer ¶
func NewServer(opts ...ServerOption) *Server
NewServer will create a server and setup defaults
func (*Server) CreateStream ¶
CreateStream will create a new stream and register it
func (*Server) HandleFunc ¶
func (s *Server) HandleFunc(path string, h http.HandlerFunc)
HandleFunc registers a new route with a matcher for the URL path.
func (*Server) HandleHeader ¶
func (s *Server) HandleHeader(key, val string, h http.HandlerFunc)
HandleHeader registers a new route with a matcher for request header values.
func (*Server) HandlePrefix ¶
HandlePrefix registers a new route with a matcher for the URL path prefix.
func (*Server) HandleServeHTTP ¶
HandleServeHTTP registers a new route with a matcher for the URL path.
func (*Server) Publish ¶
Publish sends a message to every client in a streamID. If the stream's buffer is full, it blocks until the message is sent out to all subscribers (but not necessarily arrived the clients), or when the stream is closed.
func (*Server) PublishData ¶
PublishData will encode message and Publish it to every client in a streamID
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP serves new connections with events for a given stream ...
func (*Server) TryPublish ¶
TryPublish is the same as Publish except that when the operation would cause the call to be blocked, it simply drops the message and returns false. Together with a small BufferSize, it can be useful when publishing the latest message ASAP is more important than reliable delivery.
type ServerOption ¶
type ServerOption func(o *Server)
func WithAutoReply ¶
func WithAutoReply(enable bool) ServerOption
WithAutoReply with server auto reply or not
func WithAutoStream ¶
func WithAutoStream(enable bool) ServerOption
WithAutoStream with server auto stream or not
func WithBufferSize ¶
func WithBufferSize(size int) ServerOption
WithBufferSize with server buffer size
func WithEncodeBase64 ¶
func WithEncodeBase64(enable bool) ServerOption
WithEncodeBase64 with server encode base64
func WithEventTTL ¶
func WithEventTTL(timeout time.Duration) ServerOption
WithEventTTL with server event TTL
func WithHeaders ¶
func WithHeaders(headers map[string]string) ServerOption
WithHeaders with server custom headers
func WithListener ¶
func WithListener(lis net.Listener) ServerOption
WithListener with server listener
func WithSplitData ¶
func WithSplitData(enable bool) ServerOption
WithSplitData with server split data or not
func WithSubscriberFunction ¶
func WithSubscriberFunction(sub SubscriberFunction, unsub SubscriberFunction) ServerOption
WithSubscriberFunction with server subscriber function
func WithTLSConfig ¶
func WithTLSConfig(c *tls.Config) ServerOption
WithTLSConfig with server TLS config
func WithTimeout ¶
func WithTimeout(timeout time.Duration) ServerOption
WithTimeout with server timeout
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
func NewStreamManager ¶
func NewStreamManager() *StreamManager
NewStreamManager returns a new stream manager
func (*StreamManager) Add ¶
func (s *StreamManager) Add(stream *Stream)
Add puts a new stream if not existed
func (*StreamManager) Count ¶
func (s *StreamManager) Count() int
Count returns currently total number of streams
func (*StreamManager) Exist ¶
func (s *StreamManager) Exist(streamId StreamID) bool
Exist whether the streamID existed or not
func (*StreamManager) Get ¶
func (s *StreamManager) Get(streamId StreamID) *Stream
Get returns an existed stream
func (*StreamManager) Range ¶
func (s *StreamManager) Range(fn func(*Stream))
Range walks through list of streams
func (*StreamManager) Remove ¶
func (s *StreamManager) Remove(stream *Stream)
Remove deletes a stream
func (*StreamManager) RemoveWithID ¶
func (s *StreamManager) RemoveWithID(streamId StreamID)
RemoveWithID deletes a stream by streamID
type Subscriber ¶
Subscriber holds subscriber properties
type SubscriberFunction ¶
type SubscriberFunction func(streamID StreamID, sub *Subscriber)
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is a socket transport.
func (*Transport) PathTemplate ¶
PathTemplate returns the http path template.
func (*Transport) ReplyHeader ¶
ReplyHeader returns the reply header.
func (*Transport) RequestHeader ¶
RequestHeader returns the request header.
type Transporter ¶
type Transporter interface { transport.Transporter Request() *http.Request PathTemplate() string }