broker

package
v0.0.64 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2019 License: MPL-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EVENT_MESSAGE_PUBLISHED = "message_published"
	EVENT_STATE_UPDATED     = "state_updated"
)

Variables

This section is empty.

Functions

func EncodeSessionToken added in v0.0.64

func EncodeSessionToken(signKey string, session *sessions.Session) (string, error)

func Serve added in v0.0.51

func Serve(port int, handler broker) net.Listener

Types

type Broker

type Broker struct {
	ID string

	Subscriptions SubscriptionStore
	Sessions      SessionStore
	Topics        TopicStore
	Peers         PeerStore
	// contains filtered or unexported fields
}

func New

func New(id string, logger *zap.Logger, mesh cluster.DiscoveryLayer, config Config) *Broker

func (*Broker) Authenticate

func (b *Broker) Authenticate(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error)

func (*Broker) CloseSession added in v0.0.32

func (b *Broker) CloseSession(ctx context.Context, token string) error

func (*Broker) Connect added in v0.0.51

func (b *Broker) Connect(ctx context.Context, metadata transport.Metadata, p *packet.Connect) (string, string, *packet.ConnAck, error)

func (*Broker) Disconnect added in v0.0.51

func (b *Broker) Disconnect(ctx context.Context, token string, p *packet.Disconnect) error

func (*Broker) DistributeMessage added in v0.0.37

func (b *Broker) DistributeMessage(message *pb.MessagePublished) error

func (*Broker) Health added in v0.0.64

func (b *Broker) Health() string

func (*Broker) JoinServiceLayer added in v0.0.52

func (b *Broker) JoinServiceLayer(name string, logger *zap.Logger, config cluster.ServiceConfig, rpcConfig cluster.ServiceConfig, mesh cluster.DiscoveryLayer)

func (*Broker) ListRetainedMessages

func (b *Broker) ListRetainedMessages() (topics.RetainedMessageSet, error)

func (*Broker) PingReq added in v0.0.62

func (b *Broker) PingReq(ctx context.Context, id string, _ *packet.PingReq) (*packet.PingResp, error)

func (*Broker) Publish added in v0.0.51

func (b *Broker) Publish(ctx context.Context, token string, p *packet.Publish) (*packet.PubAck, error)

func (*Broker) Serve added in v0.0.52

func (b *Broker) Serve(port int) net.Listener

func (*Broker) Shutdown added in v0.0.52

func (b *Broker) Shutdown()

func (*Broker) SigningKey added in v0.0.64

func (b *Broker) SigningKey() string

func (*Broker) Start added in v0.0.51

func (broker *Broker) Start(layer types.GossipServiceLayer)

func (*Broker) Stop

func (b *Broker) Stop()

func (*Broker) Subscribe added in v0.0.51

func (b *Broker) Subscribe(ctx context.Context, token string, p *packet.Subscribe) (*packet.SubAck, error)

func (*Broker) Unsubscribe added in v0.0.51

func (b *Broker) Unsubscribe(ctx context.Context, token string, p *packet.Unsubscribe) (*packet.UnsubAck, error)

type Config

type Config struct {
	AuthHelper func(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error)
	Session    SessionConfig
}

func DefaultConfig

func DefaultConfig() Config

type PeerStore added in v0.0.17

type PeerStore interface {
	ByID(id string) (peers.Peer, error)
	All() (peers.SubscriptionSet, error)
	Delete(id string) error
	On(event string, handler func(peers.Peer)) func()
}

type Queue added in v0.0.58

type Queue interface {
	Enqueue(p *publishQueue.Message)
	Consume(f func(*publishQueue.Message))
	Close() error
}

type SessionConfig added in v0.0.9

type SessionConfig struct {
	MaxInflightSize int
}

type SessionStore

type SessionStore interface {
	ByID(ctx context.Context, id string) (*sessions.Session, error)
	ByClientID(ctx context.Context, id string) ([]*sessions.Session, error)
	ByPeer(ctx context.Context, peer string) ([]*sessions.Session, error)
	All(ctx context.Context) ([]*sessions.Session, error)
	Create(ctx context.Context, sess sessions.SessionCreateInput) error
	RefreshKeepAlive(ctx context.Context, id string, timestamp int64) error
	Delete(ctx context.Context, id string) error
}

type SubscriptionStore

type SubscriptionStore interface {
	ByTopic(ctx context.Context, tenant string, pattern []byte) ([]*subscriptions.Metadata, error)
	ByID(ctx context.Context, id string) (*subscriptions.Metadata, error)
	All(ctx context.Context) ([]*subscriptions.Metadata, error)
	ByPeer(ctx context.Context, peer string) ([]*subscriptions.Metadata, error)
	BySession(ctx context.Context, id string) ([]*subscriptions.Metadata, error)
	Create(ctx context.Context, message subscriptions.SubscriptionCreateInput) error
	Delete(ctx context.Context, id string) error
}

type Token added in v0.0.64

type Token struct {
	SessionID     string `json:"session_id"`
	PeerID        string `json:"peer_id"`
	SessionTenant string `json:"session_tenant"`
	jwt.StandardClaims
}

func DecodeSessionToken added in v0.0.64

func DecodeSessionToken(signKey string, signedToken string) (Token, error)

type TopicStore

type TopicStore interface {
	Create(message topics.RetainedMessage) error
	ByTopicPattern(tenant string, pattern []byte) (topics.RetainedMessageSet, error)
	All() (topics.RetainedMessageSet, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL