Documentation ¶
Index ¶
- Variables
- func AddFields(ctx context.Context, fields ...zap.Field) context.Context
- func L(ctx context.Context) *zap.Logger
- func ProcessPublish(ctx context.Context, id uint64, transport Membership, fsm FSM, state ReadState, ...) error
- func RunSession(ctx context.Context, peer uint64, fsm FSM, state ReadState, ...) error
- func StoreLogger(ctx context.Context, l *zap.Logger) context.Context
- func StorePublish(messageLog MessageLog, p []*messages.StoredMessage) error
- type AuthenticationHandler
- type FSM
- type Membership
- type MessageLog
- type MqttServer
- func (s *MqttServer) CreateSubscription(ctx context.Context, r *api.CreateSubscriptionRequest) (*api.CreateSubscriptionResponse, error)
- func (s *MqttServer) DeleteSubscription(ctx context.Context, r *api.DeleteSubscriptionRequest) (*api.DeleteSubscriptionResponse, error)
- func (s *MqttServer) DistributeMessage(ctx context.Context, r *api.DistributeMessageRequest) (*api.DistributeMessageResponse, error)
- func (s *MqttServer) ListSessionMetadatas(ctx context.Context, r *api.ListSessionMetadatasRequest) (*api.ListSessionMetadatasResponse, error)
- func (s *MqttServer) ListSubscriptions(ctx context.Context, r *api.ListSubscriptionsRequest) (*api.ListSubscriptionsResponse, error)
- func (s *MqttServer) Serve(grpcServer *grpc.Server)
- type ReadState
- type State
- type StateDump
Constants ¶
This section is empty.
Variables ¶
View Source
var ( Clock = time.Now ErrConnectNotDone = errors.New("CONNECT not done") ErrSessionLost = errors.New("Session lost") ErrSessionDisconnected = errors.New("Session disconnected") ErrUnknownPacketReceived = errors.New("Received unknown packet type") ErrAuthenticationFailed = errors.New("Authentication failed") )
Functions ¶
func ProcessPublish ¶
func RunSession ¶
func RunSession(ctx context.Context, peer uint64, fsm FSM, state ReadState, c transport.TimeoutReadWriteCloser, ch chan *messages.StoredMessage, authHandler AuthenticationHandler) error
func StorePublish ¶
func StorePublish(messageLog MessageLog, p []*messages.StoredMessage) error
Types ¶
type AuthenticationHandler ¶
type FSM ¶
type FSM interface { RetainedMessage(ctx context.Context, publish *packet.Publish) error DeleteRetainedMessage(ctx context.Context, topic []byte) error Subscribe(ctx context.Context, id string, pattern []byte, qos int32) error SubscribeFrom(ctx context.Context, id string, peer uint64, pattern []byte, qos int32) error Unsubscribe(ctx context.Context, id string, pattern []byte) error DeleteSessionMetadata(ctx context.Context, id, mountpoint string) error CreateSessionMetadata(ctx context.Context, id, clientID string, lwt *packet.Publish, mountpoint string) error }
type Membership ¶
type Membership interface {
Call(id uint64, f func(*grpc.ClientConn) error) error
}
type MessageLog ¶
type MessageLog interface { io.Closer Append(b []*messages.StoredMessage) error }
type MqttServer ¶ added in v1.2.0
type MqttServer struct {
// contains filtered or unexported fields
}
func NewMQTTServer ¶ added in v1.2.0
func NewMQTTServer(state State, fsm FSM, localPublishCh, remotePublishCh chan *messages.StoredMessage) *MqttServer
func (*MqttServer) CreateSubscription ¶ added in v1.2.0
func (s *MqttServer) CreateSubscription(ctx context.Context, r *api.CreateSubscriptionRequest) (*api.CreateSubscriptionResponse, error)
func (*MqttServer) DeleteSubscription ¶ added in v1.2.0
func (s *MqttServer) DeleteSubscription(ctx context.Context, r *api.DeleteSubscriptionRequest) (*api.DeleteSubscriptionResponse, error)
func (*MqttServer) DistributeMessage ¶ added in v1.2.0
func (s *MqttServer) DistributeMessage(ctx context.Context, r *api.DistributeMessageRequest) (*api.DistributeMessageResponse, error)
func (*MqttServer) ListSessionMetadatas ¶ added in v1.2.0
func (s *MqttServer) ListSessionMetadatas(ctx context.Context, r *api.ListSessionMetadatasRequest) (*api.ListSessionMetadatasResponse, error)
func (*MqttServer) ListSubscriptions ¶ added in v1.2.0
func (s *MqttServer) ListSubscriptions(ctx context.Context, r *api.ListSubscriptionsRequest) (*api.ListSubscriptionsResponse, error)
func (*MqttServer) Serve ¶ added in v1.2.0
func (s *MqttServer) Serve(grpcServer *grpc.Server)
type ReadState ¶
type ReadState interface { Recipients(topic []byte) ([]uint64, []string, []int32, error) ListSubscriptions() ([][]byte, []uint64, []string, []int32, error) GetSession(id string) *sessions.Session SaveSession(id string, session *sessions.Session) CloseSession(id string) RetainedMessages(topic []byte) ([]*packet.Publish, error) ListSessionMetadatas() []*api.SessionMetadatas GetSessionMetadatas(id string) *api.SessionMetadatas GetSessionMetadatasByClientID(id string) *api.SessionMetadatas }
type State ¶
type State interface { ReadState Subscribe(peer uint64, id string, pattern []byte, qos int32) error Unsubscribe(id string, pattern []byte) error RemoveSubscriptionsForPeer(peer uint64) RemoveSubscriptionsForSession(id string) RetainMessage(msg *packet.Publish) error DeleteRetainedMessage(topic []byte) error Load([]byte) error MarshalBinary() ([]byte, error) CreateSessionMetadata(id string, peer uint64, clientID string, connectedAt int64, lwt *packet.Publish, mountpoint string) error DeleteSessionMetadata(id string, peer uint64) error DeleteSessionMetadatasByPeer(peer uint64) }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.