Documentation ¶
Index ¶
- Constants
- Variables
- func AppendIfMissing(topics []string, topic string) []string
- func CloseClientConnection(connection quic.Connection, code int, err error) error
- func DefaultAuthenticationFunc(token string) bool
- func DefaultAuthorizationFunc(token, topic string) bool
- func IsSubscribeTopicValid(topic string, topics []string) bool
- func NewLogger() *zap.Logger
- func SendError(sendStream quic.SendStream, e *Error) error
- func TopicHasWildcard(topic string) bool
- func WriteData(data any, sendStream quic.SendStream) error
- type Client
- type DistributeWork
- type Error
- type Event
- type EventSource
- type Finder
- type Metrics
- type Offer
- type Server
- func (s *Server) GenerateEventSources(topics []string)
- func (s *Server) MetricHandler() http.Handler
- func (s *Server) Publish(topic string, event []byte)
- func (s *Server) SetAuthenticator(authenticator auth.Authenticator)
- func (s *Server) SetAuthenticatorFunc(authenticator auth.AuthenticatorFunc)
- func (s *Server) SetAuthorizer(authorizer auth.Authorizer)
- func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc)
- type Subscriber
- type Worker
- type WorkerConfig
Constants ¶
const ( CodeNotAuthorized = iota + 1 CodeTopicNotAvailable CodeFailedToCreateStream CodeFailedToSendOffer CodeUnknown )
const ( DistributeEvent = "Distribute" AcceptClient = "Accept" )
const DELIMITER = '\n'
DELIMITER is the delimiter used to separate messages in streams.
const ErrorTopic = "error"
Variables ¶
var ( ErrNotAuthorized = errors.New("not authorized") ErrFailedToCreateStream = errors.New("failed to create send/receive stream to client") ErrFailedToReadOffer = errors.New("failed to read offer from client") ErrFailedToSendOffer = errors.New("failed to send offer to server") ErrFailedToMarshal = errors.New("failed to marshal/unmarshal data") )
var DefaultOnError = func(code int, data map[string]any) { log.Printf("code: %d, data: %v", code, data) }
DefaultOnError Default handler for processing errors. it listen to topic "error".
var DefaultOnMessage = func(topic string, message []byte) { log.Printf("topic: %s, message: %s\n", topic, string(message)) }
DefaultOnMessage Default handler for processing incoming events without a handler.
Functions ¶
func AppendIfMissing ¶ added in v1.1.2
AppendIfMissing check if item missing append item to list.
func CloseClientConnection ¶ added in v1.1.7
func DefaultAuthenticationFunc ¶
DefaultAuthenticationFunc is the default authentication function. it accepts all clients.
func DefaultAuthorizationFunc ¶
DefaultAuthorizationFunc is the default authorization function. it accepts all clients.
func IsSubscribeTopicValid ¶ added in v1.1.2
IsSubscribeTopicValid check the subscribed topic exist or matched with client topics.
func TopicHasWildcard ¶
TopicHasWildcard checks if the topic is a wildcard.
Types ¶
type Client ¶
type Client struct { Connection quic.Connection Token string Topics []string Logger *zap.Logger Finder Finder OnEvent map[string]func(event []byte) OnMessage func(topic string, message []byte) OnError func(code int, data map[string]any) }
func (*Client) AcceptEvents ¶
AcceptEvents reads events from the stream and calls the proper handler. order of calling handlers is as follows: 1. OnError if topic is "error" 2. OnEvent[topic] 3. OnMessage.
func (*Client) SetErrorHandler ¶
SetErrorHandler sets the handler for "error" topic.
func (*Client) SetEventHandler ¶
SetEventHandler sets the handler for the given topic.
func (*Client) SetMessageHandler ¶
SetMessageHandler sets the handler for all topics without handler and "error" topic.
type DistributeWork ¶ added in v1.2.0
type DistributeWork struct { Event []byte EventSource *EventSource }
func NewDistributeWork ¶ added in v1.2.0
func NewDistributeWork(event []byte, eventSource *EventSource) *DistributeWork
type Error ¶
func UnmarshalError ¶
type EventSource ¶
type EventSource struct { Topic string DataChannel chan []byte Subscribers []Subscriber IncomingSubscribers chan Subscriber SubscriberWaitingList []Subscriber Metrics Metrics Cleaning *atomic.Bool CleaningInterval time.Duration }
EventSource is a struct for topic channel and its subscribers.
func NewEventSource ¶
func NewEventSource( topic string, dataChannel chan []byte, subscribers []Subscriber, metric Metrics, cleaningInterval time.Duration, ) *EventSource
func (*EventSource) CleanCorruptSubscribers ¶ added in v1.2.0
func (e *EventSource) CleanCorruptSubscribers()
func (*EventSource) DistributeEvents ¶ added in v1.2.0
func (e *EventSource) DistributeEvents(worker Worker)
DistributeEvents distribute events from channel between subscribers.
func (*EventSource) HandleNewSubscriber ¶ added in v1.2.0
func (e *EventSource) HandleNewSubscriber()
type Finder ¶ added in v1.2.0
Finder for topics.
func (*Finder) FindRelatedWildcardTopics ¶ added in v1.2.0
FindRelatedWildcardTopics find topics patterns that are applicable to the given topic.
type Metrics ¶ added in v1.1.4
type Metrics struct { EventCounter *prometheus.GaugeVec SubscriberCounter *prometheus.GaugeVec }
func NewMetrics ¶ added in v1.1.4
func (Metrics) DecSubscriber ¶ added in v1.1.7
func (Metrics) IncSubscriber ¶ added in v1.1.7
type Offer ¶
type Offer struct { Token string `json:"token,omitempty"` Topics []string `json:"topics,omitempty"` }
func AcceptOffer ¶ added in v1.2.0
type Server ¶
type Server struct { Worker Worker Listener quic.Listener EventSources map[string]*EventSource Topics []string Logger *zap.Logger Finder Finder Authenticator auth.Authenticator Authorizer auth.Authorizer Metrics Metrics CleaningInterval time.Duration }
Server is the main struct for the server.
func (*Server) GenerateEventSources ¶
GenerateEventSources generates eventSources for each topic.
func (*Server) MetricHandler ¶ added in v1.1.8
func (*Server) SetAuthenticator ¶
func (s *Server) SetAuthenticator(authenticator auth.Authenticator)
SetAuthenticator replaces the authentication function.
func (*Server) SetAuthenticatorFunc ¶
func (s *Server) SetAuthenticatorFunc(authenticator auth.AuthenticatorFunc)
SetAuthenticatorFunc replaces the authentication function.
func (*Server) SetAuthorizer ¶
func (s *Server) SetAuthorizer(authorizer auth.Authorizer)
SetAuthorizer replaces the authentication function.
func (*Server) SetAuthorizerFunc ¶
func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc)
SetAuthorizerFunc replaces the authentication function.
type Subscriber ¶
func NewSubscriber ¶
func NewSubscriber(stream quic.SendStream) Subscriber
type Worker ¶
func (*Worker) AddAcceptClientWork ¶ added in v1.2.0
func (*Worker) AddDistributeWork ¶ added in v1.2.0
func (w *Worker) AddDistributeWork(work *DistributeWork)