Documentation ¶
Index ¶
- Constants
- Variables
- func EncodeSessionToken(signKey string, session *sessions.Session) (string, error)
- func Serve(port int, handler broker) net.Listener
- type Broker
- func (b *Broker) Authenticate(transport transport.Metadata, sessionID []byte, username string, ...) (tenant string, err error)
- func (b *Broker) CloseSession(ctx context.Context, token string) error
- func (b *Broker) Connect(ctx context.Context, metadata transport.Metadata, p *packet.Connect) (string, string, *packet.ConnAck, error)
- func (b *Broker) Disconnect(ctx context.Context, token string, p *packet.Disconnect) error
- func (b *Broker) DistributeMessage(message *pb.MessagePublished) error
- func (b *Broker) Health() string
- func (b *Broker) JoinServiceLayer(name string, logger *zap.Logger, config cluster.ServiceConfig, ...)
- func (b *Broker) ListRetainedMessages() (topics.RetainedMessageSet, error)
- func (b *Broker) PingReq(ctx context.Context, id string, _ *packet.PingReq) (*packet.PingResp, error)
- func (b *Broker) Publish(ctx context.Context, token string, p *packet.Publish) (*packet.PubAck, error)
- func (b *Broker) Serve(port int) net.Listener
- func (b *Broker) Shutdown()
- func (b *Broker) SigningKey() string
- func (broker *Broker) Start(layer types.GossipServiceLayer)
- func (b *Broker) Stop()
- func (b *Broker) Subscribe(ctx context.Context, token string, p *packet.Subscribe) (*packet.SubAck, error)
- func (b *Broker) Unsubscribe(ctx context.Context, token string, p *packet.Unsubscribe) (*packet.UnsubAck, error)
- type Config
- type PeerStore
- type Queue
- type SessionConfig
- type SessionStore
- type SubscriptionStore
- type Token
- type TopicStore
Constants ¶
View Source
const ( EVENT_MESSAGE_PUBLISHED = "message_published" EVENT_STATE_UPDATED = "state_updated" )
Variables ¶
View Source
var (
ErrAuthenticationFailed = errors.New("authentication failed")
)
Functions ¶
func EncodeSessionToken ¶ added in v0.0.64
Types ¶
type Broker ¶
type Broker struct { ID string Subscriptions SubscriptionStore Sessions SessionStore Topics TopicStore Peers PeerStore // contains filtered or unexported fields }
func (*Broker) Authenticate ¶
func (*Broker) CloseSession ¶ added in v0.0.32
func (*Broker) Disconnect ¶ added in v0.0.51
func (*Broker) DistributeMessage ¶ added in v0.0.37
func (b *Broker) DistributeMessage(message *pb.MessagePublished) error
func (*Broker) JoinServiceLayer ¶ added in v0.0.52
func (b *Broker) JoinServiceLayer(name string, logger *zap.Logger, config cluster.ServiceConfig, rpcConfig cluster.ServiceConfig, mesh cluster.DiscoveryLayer)
func (*Broker) ListRetainedMessages ¶
func (b *Broker) ListRetainedMessages() (topics.RetainedMessageSet, error)
func (*Broker) SigningKey ¶ added in v0.0.64
func (*Broker) Start ¶ added in v0.0.51
func (broker *Broker) Start(layer types.GossipServiceLayer)
type Config ¶
type Config struct { AuthHelper func(transport transport.Metadata, sessionID []byte, username string, password string) (tenant string, err error) Session SessionConfig }
func DefaultConfig ¶
func DefaultConfig() Config
type Queue ¶ added in v0.0.58
type Queue interface { Enqueue(p *publishQueue.Message) Consume(f func(*publishQueue.Message)) Close() error }
type SessionConfig ¶ added in v0.0.9
type SessionConfig struct {
MaxInflightSize int
}
type SessionStore ¶
type SessionStore interface { ByID(ctx context.Context, id string) (*sessions.Session, error) ByClientID(ctx context.Context, id string) ([]*sessions.Session, error) ByPeer(ctx context.Context, peer string) ([]*sessions.Session, error) All(ctx context.Context) ([]*sessions.Session, error) Create(ctx context.Context, sess sessions.SessionCreateInput) error RefreshKeepAlive(ctx context.Context, id string, timestamp int64) error Delete(ctx context.Context, id string) error }
type SubscriptionStore ¶
type SubscriptionStore interface { ByTopic(ctx context.Context, tenant string, pattern []byte) ([]*subscriptions.Metadata, error) ByID(ctx context.Context, id string) (*subscriptions.Metadata, error) All(ctx context.Context) ([]*subscriptions.Metadata, error) ByPeer(ctx context.Context, peer string) ([]*subscriptions.Metadata, error) BySession(ctx context.Context, id string) ([]*subscriptions.Metadata, error) Create(ctx context.Context, message subscriptions.SubscriptionCreateInput) error Delete(ctx context.Context, id string) error }
type Token ¶ added in v0.0.64
type TopicStore ¶
type TopicStore interface { Create(message topics.RetainedMessage) error ByTopicPattern(tenant string, pattern []byte) (topics.RetainedMessageSet, error) All() (topics.RetainedMessageSet, error) }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.