qsse

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

README

QSSE logo
SSE Over QUIC

Implementation of Server Sent Events by QUIC. A faster replacement for traditional SSE over HTTP/2.

go version   license version

Installation

go get github.com/snapp-incubator/qsse

Basic Usage

Server

// Server

import (
	"github.com/snapp-incubator/qsse"
	"log"
	"math/rand"
	"time"
)

var (
    people = []Person{...}
    accounts = []Account{...}
) 

func main() {
	topics := []string{"people", "account"}

	server, err := qsse.NewServer("localhost:4242", topics, nil)
	if err != nil {
		panic(err)
	}

	// publish events
	server.Publish("people", people[0])
	server.Publish("accounts", accounts[0])
	// ...

	// more code
}

Client

import "github.com/snapp-incubator/qsse"

func main() {
    topics := []string{"people", "account"}
    
    client, err := qsse.NewClient("localhost:4242", topics, nil)
    if err != nil {
        panic(err)
    }
	
    client.SetEventHandler("people", func(data []byte) {
        // handle data on topic
    })
	
    client.SetErrorHandler(func(code int, data map[string]any) { 
        // handle different error
    })
    
	// more code
}

Security

By default, all the clients are accepted. Use Authentication to check is new clients are valid and use Authorization to check client access on each topic.

// authentication

// func
server.SetAuthenticatorFunc(func(token string) bool {

})
// interface
server.SetAuthenticator()

// authorization on each topic

// func
server.SetAuthorizerFunc(func(token, topic string) bool {

})
// interface
server.SetAuthorizer()

Topic Patterns

topics can be separated by . logically. also * can be used as wildcard placeholder. for example these are valid topics

  • ride
  • ride.*.start
  • ride.passenger.*

Note: Putting * at the end of topic will publish or subscribe to every topic that start with * prefix. For example ride.passenger.* is equivalent of subscribing to ride.passenger.start, ride.passenger.account.name, and so on.

Server Configurations

config description default
Metric.namespace,
Metric.subsystem
namespace and subsystem parameters of the Prometheus metrics "qsse",
"qsse"
TLSConfig TLS config of server qsse.GetDefaultTLSConfig
()
Worker.CleaningInterval interval between cleaning idle clients 10 sec
Worker.ClientAcceptorCount number of Goroutine accepting new clients 1
Worker.ClientAcceptorQueueSize queue size of client acceptors. (this is usually equal to clientAcceptorCount) 1
Worker.EventDistributorCount number of concurrent goroutine distributing events to subscribers for each EventSource[topic] 1
Worker.EventDistributorQueueSize queue size of event distribution work 10

Client Configurations

config description default
token token that will be send to server on the initial connection to verify the client. ""
TLSConfig TLS config of client qsse.GetSimpleTLS
()
ReconnectPolicy.Retry bool that indicate if client should retry connection if couldn't connect to server on the first try. false
ReconnectPolicy.RetryTimes number of reconnect times to connect. 5
ReconnectPolicy.RetryInterval interval between reconnecting to server 5 sec

Examples

Documentation

Index

Constants

View Source
const (
	CodeNotAuthorized = iota + 1
	CodeTopicNotAvailable
	CodeFailedToCreateStream
	CodeFailedToSendOffer
	CodeUnknown
)

error codes.

View Source
const (
	DefCleaningInterval          = 10 * time.Second
	DefClientAcceptorCount       = 1
	DefClientAcceptorQueueSize   = 1
	DefEventDistributorCount     = 1
	DefEVentDistributorQueueSize = 10
)

Variables

This section is empty.

Functions

func GetDefaultTLSConfig

func GetDefaultTLSConfig() *tls.Config

GetDefaultTLSConfig returns a tls.Config with the default settings for server.

func GetSimpleTLS

func GetSimpleTLS() *tls.Config

GetSimpleTLS returns a tls.Config with the default settings for client.

Types

type Client

type Client interface {
	SetEventHandler(topic string, handler func([]byte))

	SetErrorHandler(handler func(code int, data map[string]any))

	SetMessageHandler(handler func(topic string, event []byte))
}

func NewClient

func NewClient(address string, topics []string, config *ClientConfig) (Client, error)

type ClientConfig

type ClientConfig struct {
	Token           string
	TLSConfig       *tls.Config
	ReconnectPolicy *ReconnectPolicy
}

type MetricConfig added in v1.1.7

type MetricConfig struct {
	Namespace string
	Subsystem string
}

type ReconnectPolicy

type ReconnectPolicy struct {
	Retry         bool
	RetryTimes    int
	RetryInterval int // duration between retry intervals in milliseconds
}

type Server

type Server interface {
	Publish(topic string, event []byte)

	SetAuthenticator(auth.Authenticator)
	SetAuthenticatorFunc(auth.AuthenticatorFunc)

	SetAuthorizer(auth.Authorizer)
	SetAuthorizerFunc(auth.AuthorizerFunc)

	MetricHandler() http.Handler
}

func NewServer

func NewServer(address string, topics []string, config *ServerConfig) (Server, error)

NewServer creates a new server and listen for connections on the given address.

type ServerConfig added in v1.1.7

type ServerConfig struct {
	Metric    *MetricConfig
	TLSConfig *tls.Config
	Worker    *WorkerConfig
}

type WorkerConfig added in v1.2.0

type WorkerConfig struct {
	CleaningInterval          time.Duration
	ClientAcceptorCount       int64
	ClientAcceptorQueueSize   int
	EventDistributorCount     int64
	EventDistributorQueueSize int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL