Documentation ¶
Overview ¶
Package mercure helps implementing the Mercure protocol (https://mercure.rocks) in Go projects. It provides an implementation of a Mercure hub as a HTTP handler.
Example ¶
package main import ( "log" "net/http" "github.com/dunglas/mercure" ) func main() { h, err := mercure.NewHub( mercure.WithPublisherJWT([]byte("!ChangeMe!"), "HS256"), mercure.WithSubscriberJWT([]byte("!ChangeMe!"), "HS256"), ) if err != nil { log.Fatal(err) } defer h.Stop() http.Handle("/.well-known/mercure", h) log.Panic(http.ListenAndServe(":8080", nil)) }
Output:
Index ¶
- Constants
- Variables
- func AssignUUID(u *Update)
- func Demo(w http.ResponseWriter, r *http.Request)
- func InitConfig(v *viper.Viper)deprecated
- func RegisterTransportFactory(scheme string, factory TransportFactory)
- func SetConfigDefaults(v *viper.Viper)deprecated
- func SetFlags(fs *pflag.FlagSet, v *viper.Viper)deprecated
- func Start()deprecated
- func ValidateConfig(v *viper.Viper) errordeprecated
- type BoltTransport
- type Event
- type Hub
- func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)
- func (h *Hub) Serve()deprecated
- func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (h *Hub) Stop() error
- func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)
- func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)
- func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)
- type LocalTransport
- type LogField
- type Logger
- type Metrics
- type NopMetrics
- type Option
- func WithAllowedHosts(hosts []string) Option
- func WithAnonymous() Option
- func WithCORSOrigins(origins []string) Option
- func WithDebug() Option
- func WithDemo() Option
- func WithDispatchTimeout(timeout time.Duration) Option
- func WithHeartbeat(interval time.Duration) Option
- func WithPublishOrigins(origins []string) Option
- func WithPublisherJWT(key []byte, alg string) Option
- func WithSubscriberJWT(key []byte, alg string) Option
- func WithSubscriptions() Option
- func WithTopicSelectorStore(tss *TopicSelectorStore) Option
- func WithTransport(t Transport) Option
- func WithUI() Option
- func WithWriteTimeout(timeout time.Duration) Option
- type PrometheusMetrics
- type Subscriber
- type TopicSelectorStore
- type Transport
- type TransportError
- type TransportFactory
- type TransportSubscribers
- type Update
Examples ¶
Constants ¶
const ( TopicSelectorStoreDefaultCacheNumCounters = int64(6e7) TopicSelectorStoreCacheMaxCost = int64(1e8) // 100 MB )
Gather stats to find the best default values.
const BoltDefaultCleanupFrequency = 0.3
const EarliestLastEventID = "earliest"
EarliestLastEventID is the reserved value representing the earliest available event id.
Variables ¶
var ( // ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid. ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`) // ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents. ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`) // ErrOriginNotAllowed is returned when the Origin is not allowed to post updates. ErrOriginNotAllowed = errors.New("origin not allowed to post updates") // ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported. ErrUnexpectedSigningMethod = errors.New("unexpected signing method") // ErrInvalidJWT is returned when the JWT is invalid. ErrInvalidJWT = errors.New("invalid JWT") // ErrPublicKey is returned when there is an error with the public key. ErrPublicKey = errors.New("public key error") )
var ErrClosedTransport = errors.New("hub: read/write on closed Transport")
ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.
var ErrInvalidConfig = errors.New("invalid config")
ErrInvalidConfig is returned when the configuration is invalid.
Deprecated: use the Caddy server module or the standalone library instead.
Functions ¶
func AssignUUID ¶
func AssignUUID(u *Update)
AssignUUID generates a new UUID an assign it to the given update if no ID is already set.
func Demo ¶
func Demo(w http.ResponseWriter, r *http.Request)
Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.
func InitConfig
deprecated
func RegisterTransportFactory ¶
func RegisterTransportFactory(scheme string, factory TransportFactory)
func SetConfigDefaults
deprecated
func Start
deprecated
func Start()
Start is an helper method to start the Mercure Hub.
Deprecated: use the Caddy server module or the standalone library instead.
func ValidateConfig
deprecated
Types ¶
type BoltTransport ¶
BoltTransport implements the TransportInterface using the Bolt database.
func (*BoltTransport) AddSubscriber ¶
func (t *BoltTransport) AddSubscriber(s *Subscriber) error
AddSubscriber adds a new subscriber to the transport.
func (*BoltTransport) Close ¶
func (t *BoltTransport) Close() (err error)
Close closes the Transport.
func (*BoltTransport) Dispatch ¶
func (t *BoltTransport) Dispatch(update *Update) error
Dispatch dispatches an update to all subscribers and persists it in Bolt DB.
func (*BoltTransport) GetSubscribers ¶
func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error)
GetSubscribers get the list of active subscribers.
type Event ¶
type Event struct { // The updates' data, encoded in the sever-sent event format: every line starts with the string "data: " // https://www.w3.org/TR/eventsource/#dispatchMessage Data string // The globally unique identifier corresponding to update ID string // The event type, will be attached to the "event" field Type string // The reconnection time Retry uint64 }
Event is the actual Server Sent Event that will be dispatched.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub stores channels with clients currently subscribed and allows to dispatch updates.
func NewHubFromViper
deprecated
func (*Hub) PublishHandler ¶
func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)
PublishHandler allows publisher to broadcast updates to all subscribers.
func (*Hub) SubscribeHandler ¶
func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)
SubscribeHandler creates a keep alive connection and sends the events to the subscribers.
func (*Hub) SubscriptionHandler ¶
func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)
func (*Hub) SubscriptionsHandler ¶
func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)
type LocalTransport ¶
LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.
func (*LocalTransport) AddSubscriber ¶
func (t *LocalTransport) AddSubscriber(s *Subscriber) error
AddSubscriber adds a new subscriber to the transport.
func (*LocalTransport) Close ¶
func (t *LocalTransport) Close() (err error)
Close closes the Transport.
func (*LocalTransport) Dispatch ¶
func (t *LocalTransport) Dispatch(update *Update) error
Dispatch dispatches an update to all subscribers.
func (*LocalTransport) GetSubscribers ¶
func (t *LocalTransport) GetSubscribers() (string, []*Subscriber, error)
GetSubscribers get the list of active subscribers.
type LogField ¶
LogField is an alias of zapcore.Field, it could be replaced by a custom contract when Go will support generics.
type Logger ¶
type Logger interface { Debug(msg string, fields ...LogField) Info(msg string, fields ...LogField) Warn(msg string, fields ...LogField) Error(msg string, fields ...LogField) }
Logger defines the Mercure logger.
type Metrics ¶
type Metrics interface { // SubscriberConnected collects metrics about about subscriber connections. SubscriberConnected(s *Subscriber) // SubscriberDisconnected collects metrics about subscriber disconnections. SubscriberDisconnected(s *Subscriber) // UpdatePublished collects metrics about update publications. UpdatePublished(u *Update) }
type NopMetrics ¶
type NopMetrics struct{}
func (NopMetrics) SubscriberConnected ¶
func (NopMetrics) SubscriberConnected(s *Subscriber)
func (NopMetrics) SubscriberDisconnected ¶
func (NopMetrics) SubscriberDisconnected(s *Subscriber)
func (NopMetrics) UpdatePublished ¶
func (NopMetrics) UpdatePublished(s *Update)
type Option ¶
type Option func(h *opt) error
Option instances allow to configure the library.
func WithAllowedHosts ¶
WithAllowedHosts sets the allowed hosts.
func WithAnonymous ¶
func WithAnonymous() Option
WithAnonymous allows subscribers with no valid JWT.
func WithCORSOrigins ¶
WithCORSOrigins sets the allowed CORS origins.
func WithDispatchTimeout ¶
WithDispatchTimeout sets maximum dispatch duration of an update.
func WithHeartbeat ¶
WithHeartbeat sets the frequency of the heartbeat, disabled by default.
func WithPublishOrigins ¶
WithPublishOrigins sets the origins allowed to publish updates.
func WithPublisherJWT ¶
WithPublisherJWT sets the JWT key and the signing algorithm to use for publishers.
func WithSubscriberJWT ¶
WithSubscriberJWT sets the JWT key and the signing algorithm to use for subscribers.
func WithSubscriptions ¶
func WithSubscriptions() Option
WithSubscriptions allows to dispatch updates when subscriptions are created or terminated.
func WithTopicSelectorStore ¶
func WithTopicSelectorStore(tss *TopicSelectorStore) Option
WithTopicSelectorStore sets the TopicSelectorStore instance to use.
func WithTransport ¶
WithTransport sets the transport to use.
func WithWriteTimeout ¶
WithWriteTimeout sets maximum duration before closing the connection, defaults to 600s, set to 0 to disable.
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics store Hub collected metrics.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics
NewPrometheusMetrics creates a Prometheus metrics collector. This method must be called only one time or it will panic.
func (*PrometheusMetrics) Register
deprecated
func (m *PrometheusMetrics) Register(r *mux.Router)
Register configures the Prometheus registry with all collected metrics.
Deprecated: use the Caddy server module or the standalone library instead.
func (*PrometheusMetrics) SubscriberConnected ¶
func (m *PrometheusMetrics) SubscriberConnected(s *Subscriber)
func (*PrometheusMetrics) SubscriberDisconnected ¶
func (m *PrometheusMetrics) SubscriberDisconnected(s *Subscriber)
func (*PrometheusMetrics) UpdatePublished ¶
func (m *PrometheusMetrics) UpdatePublished(u *Update)
type Subscriber ¶
type Subscriber struct { ID string EscapedID string Claims *claims Topics []string EscapedTopics []string RequestLastEventID string RemoteAddr string TopicSelectors []string Debug bool // contains filtered or unexported fields }
Subscriber represents a client subscribed to a list of topics.
func NewSubscriber ¶
func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber
NewSubscriber creates a new subscriber.
func (*Subscriber) CanDispatch ¶
func (s *Subscriber) CanDispatch(u *Update) bool
CanDispatch checks if an update can be dispatched to this subsriber.
func (*Subscriber) Disconnect ¶
func (s *Subscriber) Disconnect()
Disconnect disconnects the subscriber.
func (*Subscriber) Dispatch ¶
func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool
Dispatch an update to the subscriber.
func (*Subscriber) HistoryDispatched ¶
func (s *Subscriber) HistoryDispatched(responseLastEventID string)
HistoryDispatched must be called when all messages coming from the history have been dispatched.
func (*Subscriber) Receive ¶
func (s *Subscriber) Receive() <-chan *Update
Receive returns a chan when incoming updates are dispatched.
type TopicSelectorStore ¶
type TopicSelectorStore struct {
// contains filtered or unexported fields
}
TopicSelectorStore caches compiled templates to improve memory and CPU usage.
func NewTopicSelectorStore ¶
func NewTopicSelectorStore(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error)
NewTopicSelectorStore creates a TopicSelectorStore instance. See https://github.com/dgraph-io/ristretto, set values to 0 to disable.
type Transport ¶
type Transport interface { // Dispatch dispatches an update to all subscribers. Dispatch(update *Update) error // AddSubscriber adds a new subscriber to the transport. AddSubscriber(s *Subscriber) error // Close closes the Transport. Close() error }
Transport provides methods to dispatch and persist updates.
func NewBoltTransport ¶
func NewBoltTransport(u *url.URL, tss *TopicSelectorStore) (Transport, error)
NewBoltTransport create a new boltTransport.
func NewLocalTransport ¶
func NewLocalTransport(u *url.URL, tss *TopicSelectorStore) (Transport, error)
NewLocalTransport create a new LocalTransport.
func NewTransport ¶
func NewTransport(u *url.URL, tss *TopicSelectorStore) (Transport, error)
type TransportError ¶
type TransportError struct {
// contains filtered or unexported fields
}
TransportError is returned when the Transport's DSN is invalid.
func (*TransportError) Error ¶
func (e *TransportError) Error() string
func (*TransportError) Unwrap ¶
func (e *TransportError) Unwrap() error
type TransportFactory ¶
type TransportFactory = func(u *url.URL, tss *TopicSelectorStore) (Transport, error)
TransportFactory is the factory to initialize a new transport.
type TransportSubscribers ¶
type TransportSubscribers interface { // GetSubscribers gets the last event ID and the list of active subscribers at this time. GetSubscribers() (string, []*Subscriber, error) }
TransportSubscribers provide a method to retrieve the list of active subscribers.
type Update ¶
type Update struct { // The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs). // The first one is the canonical IRI, while next ones are alternate IRIs. Topics []string // Private updates can only be dispatched to subscribers authorized to receive them. Private bool // To print debug informations Debug bool // The Server-Sent Event to send. Event }
Update represents an update to send to subscribers.