broker

package
v0.0.63-rc8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2019 License: MPL-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EVENT_MESSAGE_PUBLISHED = "message_published"
	EVENT_STATE_UPDATED     = "state_updated"
)

Variables

This section is empty.

Functions

func Serve added in v0.0.51

func Serve(port int, handler broker) net.Listener

Types

type Broker

type Broker struct {
	ID string

	Subscriptions SubscriptionStore
	Sessions      SessionStore
	Topics        TopicStore
	Peers         PeerStore
	STANOutput    chan STANMessage
	// contains filtered or unexported fields
}

func New

func New(id string, mesh cluster.Mesh, config Config) *Broker

func (*Broker) Authenticate

func (b *Broker) Authenticate(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error)

func (*Broker) CloseSession added in v0.0.32

func (b *Broker) CloseSession(ctx context.Context, id string) error

func (*Broker) Connect added in v0.0.51

func (b *Broker) Connect(ctx context.Context, metadata transport.Metadata, p *packet.Connect) (string, *packet.ConnAck, error)

func (*Broker) Disconnect added in v0.0.51

func (b *Broker) Disconnect(ctx context.Context, id string, p *packet.Disconnect) error

func (*Broker) DistributeMessage added in v0.0.37

func (b *Broker) DistributeMessage(message *pb.MessagePublished) error

func (*Broker) Health added in v0.0.64

func (b *Broker) Health() string

func (*Broker) Join

func (b *Broker) Join(hosts []string)

func (*Broker) JoinServiceLayer added in v0.0.52

func (b *Broker) JoinServiceLayer(layer types.ServiceLayer)

func (*Broker) ListRetainedMessages

func (b *Broker) ListRetainedMessages() (topics.RetainedMessageSet, error)

func (*Broker) ListSessions

func (b *Broker) ListSessions() (sessions.SessionSet, error)

func (*Broker) ListSubscriptions

func (b *Broker) ListSubscriptions() (subscriptions.SubscriptionSet, error)

func (*Broker) PingReq added in v0.0.62

func (b *Broker) PingReq(ctx context.Context, id string, _ *packet.PingReq) (*packet.PingResp, error)

func (*Broker) Publish added in v0.0.51

func (b *Broker) Publish(ctx context.Context, id string, p *packet.Publish) (*packet.PubAck, error)

func (*Broker) RemoteRPCProvider added in v0.0.42

func (b *Broker) RemoteRPCProvider(id, peer string) sessions.Transport

func (*Broker) Serve added in v0.0.52

func (b *Broker) Serve(port int) net.Listener

func (*Broker) Shutdown added in v0.0.52

func (b *Broker) Shutdown()

func (*Broker) Start added in v0.0.51

func (broker *Broker) Start(layer types.ServiceLayer)

func (*Broker) Stop

func (b *Broker) Stop()

func (*Broker) Subscribe added in v0.0.51

func (b *Broker) Subscribe(ctx context.Context, id string, p *packet.Subscribe) (*packet.SubAck, error)

func (*Broker) Unsubscribe added in v0.0.51

func (b *Broker) Unsubscribe(ctx context.Context, id string, p *packet.Unsubscribe) (*packet.UnsubAck, error)

type Config

type Config struct {
	NATSURL    string
	AuthHelper func(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error)
	Session    SessionConfig
}

func DefaultConfig

func DefaultConfig() Config

type PeerStore added in v0.0.17

type PeerStore interface {
	ByID(id string) (peers.Peer, error)
	All() (peers.SubscriptionSet, error)
	Delete(id string) error
	On(event string, handler func(peers.Peer)) func()
}

type Queue added in v0.0.58

type Queue interface {
	Enqueue(p *publishQueue.Message)
	Consume(f func(*publishQueue.Message))
	Close() error
}

type STANMessage added in v0.0.30

type STANMessage struct {
	Timestamp time.Time `json:"timestamp"`
	Tenant    string    `json:"tenant"`
	Payload   []byte    `json:"payload"`
	Topic     []byte    `json:"topic"`
}

type SessionConfig added in v0.0.9

type SessionConfig struct {
	MaxInflightSize int
}

type SessionStore

type SessionStore interface {
	ByID(id string) (sessions.Session, error)
	ByClientID(id string) (sessions.SessionSet, error)
	ByPeer(peer string) (sessions.SessionSet, error)
	All() (sessions.SessionSet, error)
	Exists(id string) bool
	Upsert(sess sessions.Session, transport sessions.Transport) error
	Delete(id, reason string) error
}

type SubscriptionStore

type SubscriptionStore interface {
	ByTopic(tenant string, pattern []byte) (subscriptions.SubscriptionSet, error)
	ByID(id string) (subscriptions.Subscription, error)
	All() (subscriptions.SubscriptionSet, error)
	ByPeer(peer string) (subscriptions.SubscriptionSet, error)
	BySession(id string) (subscriptions.SubscriptionSet, error)
	Sessions() ([]string, error)
	Create(message subscriptions.Subscription, sender func(context.Context, packet.Publish) error) error
	Delete(id string) error
}

type TopicStore

type TopicStore interface {
	Create(message topics.RetainedMessage) error
	ByTopicPattern(tenant string, pattern []byte) (topics.RetainedMessageSet, error)
	All() (topics.RetainedMessageSet, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL