sse

package module
v1.2.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2024 License: MIT Imports: 23 Imported by: 0

README

Server-Sent Events (SSE)

Server-Sent Events(简称 SSE)

严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。

总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

但是,SSE 也有自己的优点:

  • SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  • SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  • SSE 默认支持断线重连,WebSocket 需要自己实现。
  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  • SSE 支持自定义发送的消息类型。

因此,两者各有特点,适合不同的场合。

协议描述

数据格式

服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

上面三行之中,第一行的Content-Type必须指定 MIME 类型为event-steam

每一次发送的信息,由若干个message组成,每个message之间用\n\n分隔。每个message内部由若干行组成,每一行都是如下格式:

[field]: value\n

上面的field可以取四个值:

  • data
  • event
  • id
  • retry

此外,还可以有冒号开头的行,表示注释。通常,服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断:

: This is a comment

下面是一个例子:

: this is a test stream\n\n

data: some text\n\n

data: another message\n
data: with two lines \n\n
data 字段

数据内容用data字段表示:

data:  message\n\n

如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。

data: begin message\n
data: continue message\n\n

下面是一个发送 JSON 数据的例子:

data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n
id 字段

数据标识符用id字段表示,相当于每一条数据的编号。

id: msg1\n
data: message\n\n

浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。

event 字段

event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。

event: foo\n
data: a foo event\n\n

data: an unnamed event\n\n

event: bar\n
data: a bar event\n\n

上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件。

下面是另一个例子:

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}

event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}
retry 字段

服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。

retry: 10000\n

两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

简易JS客户端

// JS client
const source = new EventSource("http://localhost:3000/sse")
      source.onmessage = (event) => {
        console.log("OnMessage Called:")
        console.log(event)
        console.log(JSON.parse(event.data))
      }

参考资料 (Reference)

Documentation

Index

Constants

View Source
const (
	FieldId      = "id"
	FieldData    = "data"
	FieldEvent   = "event"
	FieldRetry   = "retry"
	FieldComment = ":"
)
View Source
const DefaultBufferSize = 1024
View Source
const (
	KindSSE kratosTransport.Kind = "sse"
)

Variables

This section is empty.

Functions

func ClientMaxBufferSize

func ClientMaxBufferSize(s int) func(c *Client)

func LogDebug

func LogDebug(args ...interface{})

func LogDebugf

func LogDebugf(format string, args ...interface{})

func LogError

func LogError(args ...interface{})

func LogErrorf

func LogErrorf(format string, args ...interface{})

func LogFatal

func LogFatal(args ...interface{})

func LogFatalf

func LogFatalf(format string, args ...interface{})

func LogInfo

func LogInfo(args ...interface{})

func LogInfof

func LogInfof(format string, args ...interface{})

func LogWarn

func LogWarn(args ...interface{})

func LogWarnf

func LogWarnf(format string, args ...interface{})

func SetOperation

func SetOperation(ctx context.Context, op string)

SetOperation sets the transport operation.

Types

type Any

type Any interface{}

type Client

type Client struct {
	Retry             time.Time
	ReconnectStrategy backoff.BackOff

	Headers           map[string]string
	ReconnectNotify   backoff.Notify
	ResponseValidator ResponseValidator
	Connection        *http.Client
	URL               string
	LastEventID       atomic.Value

	EncodingBase64 bool
	Connected      bool
	// contains filtered or unexported fields
}

func NewClient

func NewClient(url string, opts ...func(c *Client)) *Client

func (*Client) OnConnect

func (c *Client) OnConnect(fn ConnCallback)

func (*Client) OnDisconnect

func (c *Client) OnDisconnect(fn ConnCallback)

func (*Client) Subscribe

func (c *Client) Subscribe(stream string, handler func(msg *Event)) error

func (*Client) SubscribeChan

func (c *Client) SubscribeChan(stream string, ch chan *Event) error

func (*Client) SubscribeChanRaw

func (c *Client) SubscribeChanRaw(ch chan *Event) error

func (*Client) SubscribeChanRawWithContext

func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error

func (*Client) SubscribeChanWithContext

func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error

func (*Client) SubscribeRaw

func (c *Client) SubscribeRaw(handler func(msg *Event)) error

func (*Client) SubscribeRawWithContext

func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error

func (*Client) SubscribeWithContext

func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ch chan *Event)

type ClientOption

type ClientOption func(o *Client)

func WithEndpoint

func WithEndpoint(uri string) ClientOption

type ConnCallback

type ConnCallback func(c *Client)

type Event

type Event struct {
	ID      []byte
	Data    []byte
	Event   []byte
	Retry   []byte
	Comment []byte
	// contains filtered or unexported fields
}

type EventLog

type EventLog []*Event

func (*EventLog) Add

func (e *EventLog) Add(ev *Event)

func (*EventLog) Clear

func (e *EventLog) Clear()

func (*EventLog) Replay

func (e *EventLog) Replay(s *Subscriber)

type EventStreamReader

type EventStreamReader struct {
	// contains filtered or unexported fields
}

func NewEventStreamReader

func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader

func (*EventStreamReader) ReadEvent

func (e *EventStreamReader) ReadEvent() ([]byte, error)

type MessagePayload

type MessagePayload Any

type ResponseValidator

type ResponseValidator func(c *Client, resp *http.Response) error

type Server

type Server struct {
	*http.Server
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) CreateStream

func (s *Server) CreateStream(streamId StreamID) *Stream

func (*Server) Endpoint

func (s *Server) Endpoint() (*url.URL, error)

func (*Server) Handle

func (s *Server) Handle(path string, h http.Handler)

func (*Server) HandleFunc

func (s *Server) HandleFunc(path string, h http.HandlerFunc)

func (*Server) HandleHeader

func (s *Server) HandleHeader(key, val string, h http.HandlerFunc)

func (*Server) HandlePrefix

func (s *Server) HandlePrefix(prefix string, h http.Handler)

func (*Server) HandleServeHTTP

func (s *Server) HandleServeHTTP(path string)

func (*Server) Name

func (s *Server) Name() string

func (*Server) Publish

func (s *Server) Publish(_ context.Context, streamId StreamID, event *Event)

func (*Server) PublishData

func (s *Server) PublishData(ctx context.Context, streamId StreamID, data MessagePayload) error

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

func (*Server) Stop

func (s *Server) Stop(ctx context.Context) error

func (*Server) TryPublish

func (s *Server) TryPublish(_ context.Context, streamId StreamID, event *Event) bool

type ServerOption

type ServerOption func(o *Server)

func WithAddress

func WithAddress(addr string) ServerOption

func WithAutoReply

func WithAutoReply(enable bool) ServerOption

func WithAutoStream

func WithAutoStream(enable bool) ServerOption

func WithBufferSize

func WithBufferSize(size int) ServerOption

func WithCodec

func WithCodec(c string) ServerOption

func WithEncodeBase64

func WithEncodeBase64(enable bool) ServerOption

func WithEventTTL

func WithEventTTL(timeout time.Duration) ServerOption

func WithHeaders

func WithHeaders(headers map[string]string) ServerOption

func WithListener

func WithListener(lis net.Listener) ServerOption

func WithNetwork

func WithNetwork(network string) ServerOption

func WithPath added in v1.2.1

func WithPath(path string) ServerOption

func WithSplitData

func WithSplitData(enable bool) ServerOption

func WithSubscriberFunction

func WithSubscriberFunction(sub SubscriberFunction, unsub SubscriberFunction) ServerOption

func WithTLSConfig

func WithTLSConfig(c *tls.Config) ServerOption

func WithTimeout

func WithTimeout(timeout time.Duration) ServerOption

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) StreamID

func (s *Stream) StreamID() StreamID

type StreamID

type StreamID string

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

func NewStreamManager

func NewStreamManager() *StreamManager

func (*StreamManager) Add

func (s *StreamManager) Add(stream *Stream)

func (*StreamManager) Clean

func (s *StreamManager) Clean()

func (*StreamManager) Count

func (s *StreamManager) Count() int

func (*StreamManager) Exist

func (s *StreamManager) Exist(streamId StreamID) bool

func (*StreamManager) Get

func (s *StreamManager) Get(streamId StreamID) *Stream

func (*StreamManager) Range

func (s *StreamManager) Range(fn func(*Stream))

func (*StreamManager) Remove

func (s *StreamManager) Remove(stream *Stream)

func (*StreamManager) RemoveWithID

func (s *StreamManager) RemoveWithID(streamId StreamID)

type StreamMap

type StreamMap map[StreamID]*Stream

type Subscriber

type Subscriber struct {
	URL *url.URL
	// contains filtered or unexported fields
}

type SubscriberFunction

type SubscriberFunction func(streamID StreamID, sub *Subscriber)

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is a websocket transport.

func (*Transport) Endpoint

func (tr *Transport) Endpoint() string

Endpoint returns the transport endpoint.

func (*Transport) Kind

func (tr *Transport) Kind() kratosTransport.Kind

Kind returns the transport kind.

func (*Transport) Operation

func (tr *Transport) Operation() string

Operation returns the transport operation.

func (*Transport) PathTemplate

func (tr *Transport) PathTemplate() string

PathTemplate returns the http path template.

func (*Transport) ReplyHeader

func (tr *Transport) ReplyHeader() kratosTransport.Header

ReplyHeader returns the reply header.

func (*Transport) Request

func (tr *Transport) Request() *http.Request

Request returns the HTTP request.

func (*Transport) RequestHeader

func (tr *Transport) RequestHeader() kratosTransport.Header

RequestHeader returns the request header.

type Transporter

type Transporter interface {
	kratosTransport.Transporter
	Request() *http.Request
	PathTemplate() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL