broker

package
v0.0.63-rc58 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2019 License: MPL-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EVENT_MESSAGE_PUBLISHED = "message_published"
	EVENT_STATE_UPDATED     = "state_updated"
)

Variables

This section is empty.

Functions

func EncodeSessionToken added in v0.0.64

func EncodeSessionToken(signKey string, session sessions.Session) (string, error)

func Serve added in v0.0.51

func Serve(port int, handler broker) net.Listener

Types

type Broker

type Broker struct {
	ID string

	Subscriptions SubscriptionStore
	Sessions      SessionStore
	Topics        TopicStore
	Peers         PeerStore
	// contains filtered or unexported fields
}

func New

func New(id string, logger *zap.Logger, mesh cluster.Mesh, config Config) *Broker

func (*Broker) Authenticate

func (b *Broker) Authenticate(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error)

func (*Broker) CloseSession added in v0.0.32

func (b *Broker) CloseSession(ctx context.Context, token string) error

func (*Broker) Connect added in v0.0.51

func (b *Broker) Connect(ctx context.Context, metadata transport.Metadata, p *packet.Connect) (string, string, *packet.ConnAck, error)

func (*Broker) Disconnect added in v0.0.51

func (b *Broker) Disconnect(ctx context.Context, token string, p *packet.Disconnect) error

func (*Broker) DistributeMessage added in v0.0.37

func (b *Broker) DistributeMessage(message *pb.MessagePublished) error

func (*Broker) Health added in v0.0.64

func (b *Broker) Health() string

func (*Broker) JoinServiceLayer added in v0.0.52

func (b *Broker) JoinServiceLayer(layer types.ServiceLayer)

func (*Broker) ListRetainedMessages

func (b *Broker) ListRetainedMessages() (topics.RetainedMessageSet, error)

func (*Broker) ListSessions

func (b *Broker) ListSessions() (sessions.SessionSet, error)

func (*Broker) ListSubscriptions

func (b *Broker) ListSubscriptions() (subscriptions.SubscriptionSet, error)

func (*Broker) PingReq added in v0.0.62

func (b *Broker) PingReq(ctx context.Context, id string, _ *packet.PingReq) (*packet.PingResp, error)

func (*Broker) Publish added in v0.0.51

func (b *Broker) Publish(ctx context.Context, token string, p *packet.Publish) (*packet.PubAck, error)

func (*Broker) RemoteRPCProvider added in v0.0.42

func (b *Broker) RemoteRPCProvider(id, peer string) sessions.Transport

func (*Broker) Serve added in v0.0.52

func (b *Broker) Serve(port int) net.Listener

func (*Broker) Shutdown added in v0.0.52

func (b *Broker) Shutdown()

func (*Broker) SigningKey added in v0.0.64

func (b *Broker) SigningKey() string

func (*Broker) Start added in v0.0.51

func (broker *Broker) Start(layer types.ServiceLayer)

func (*Broker) Stop

func (b *Broker) Stop()

func (*Broker) Subscribe added in v0.0.51

func (b *Broker) Subscribe(ctx context.Context, token string, p *packet.Subscribe) (*packet.SubAck, error)

func (*Broker) Unsubscribe added in v0.0.51

func (b *Broker) Unsubscribe(ctx context.Context, token string, p *packet.Unsubscribe) (*packet.UnsubAck, error)

type Config

type Config struct {
	AuthHelper func(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error)
	Session    SessionConfig
}

func DefaultConfig

func DefaultConfig() Config

type PeerStore added in v0.0.17

type PeerStore interface {
	ByID(id string) (peers.Peer, error)
	All() (peers.SubscriptionSet, error)
	Delete(id string) error
	On(event string, handler func(peers.Peer)) func()
}

type Queue added in v0.0.58

type Queue interface {
	Enqueue(p *publishQueue.Message)
	Consume(f func(*publishQueue.Message))
	Close() error
}

type SessionConfig added in v0.0.9

type SessionConfig struct {
	MaxInflightSize int
}

type SessionStore

type SessionStore interface {
	ByID(id string) (sessions.Session, error)
	ByClientID(id string) (sessions.SessionSet, error)
	ByPeer(peer string) (sessions.SessionSet, error)
	All() (sessions.SessionSet, error)
	Exists(id string) bool
	Upsert(sess sessions.Session, transport sessions.Transport) error
	Delete(id, reason string) error
}

type SubscriptionStore

type SubscriptionStore interface {
	ByTopic(tenant string, pattern []byte) (subscriptions.SubscriptionSet, error)
	ByID(id string) (subscriptions.Subscription, error)
	All() (subscriptions.SubscriptionSet, error)
	ByPeer(peer string) (subscriptions.SubscriptionSet, error)
	BySession(id string) (subscriptions.SubscriptionSet, error)
	Sessions() ([]string, error)
	Create(message subscriptions.Subscription, sender func(context.Context, packet.Publish) error) error
	Delete(id string) error
}

type Token added in v0.0.64

type Token struct {
	SessionID     string `json:"session_id"`
	PeerID        string `json:"peer_id"`
	SessionTenant string `json:"session_tenant"`
	jwt.StandardClaims
}

func DecodeSessionToken added in v0.0.64

func DecodeSessionToken(signKey string, signedToken string) (Token, error)

type TopicStore

type TopicStore interface {
	Create(message topics.RetainedMessage) error
	ByTopicPattern(tenant string, pattern []byte) (topics.RetainedMessageSet, error)
	All() (topics.RetainedMessageSet, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL