Documentation ¶
Overview ¶
Package service provides the MQTT Server and Client services in a library form. See Server and Client examples below for more detailed usage.
Index ¶
- Constants
- Variables
- func GetOnlineStatus(key string) (online string, lasttime time.Time, conn *io.Closer)
- func SetOnlineStatus(key string, online bool, lasttime time.Time, conn *io.Closer)
- type BadgeMessage
- type BroadCastMessage
- type Client
- func (this *Client) Connect(uri string, msg *message.ConnectMessage) (err error)
- func (this *Client) Disconnect()
- func (this *Client) Ping(onComplete OnCompleteFunc) error
- func (this *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error
- func (this *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, ...) error
- func (this *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error
- type ClientHash
- type MtBroadCastMessage
- func (mj *MtBroadCastMessage) MarshalJSON() ([]byte, error)
- func (mj *MtBroadCastMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error
- func (uj *MtBroadCastMessage) UnmarshalJSON(input []byte) error
- func (uj *MtBroadCastMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
- type OfflineTopicQueue
- type OnCompleteFunc
- type OnPublishFunc
- type PendingStatus
- type Server
- type Status
Constants ¶
const ( DefaultKeepAlive = 300 DefaultConnectTimeout = 2 DefaultAckTimeout = 20 DefaultTimeoutRetries = 3 DefaultSessionsProvider = "mem" DefaultAuthenticator = "mockSuccess" DefaultTopicsProvider = "mx" )
Variables ¶
var ( DefaultBufferSize int64 DeviceInBufferSize int64 DeviceOutBufferSize int64 MasterInBufferSize int64 MasterOutBufferSize int64 )
var ( MsgPendingTime time.Duration OfflineTopicRWmux sync.RWMutex BroadCastChannel string SendChannel string ApnPushChannel string MessagePool *sync.Pool OnGroupPublish func(msg *message.PublishMessage, this *service) (err error) OnlineStatusChannel = "/fdf406fadef0ba24f3bfe8bc00b7bb350901417f" ApnInvalidTokensChannel = "/apn/invalid_token" ApnInvalidTokens = []string{} )
var ( Cert tls.Certificate APNsTopic string ApnClient *apns.Client )
var ( PendingQueue = make([]*PendingStatus, 65536, 65536) OfflineTopicMap = make(map[string]*OfflineTopicQueue) OfflineTopicQueueProcessor = make(chan *message.PublishMessage, 8192) OfflineTopicCleanProcessor = make(chan string, 128) ClientMap = make(map[string]*net.Conn) ClientMapProcessor = make(chan ClientHash, 8192) PktId = uint32(1) OldMessagesQueue = make(chan *message.PublishMessage, 8192) Max_message_queue int MessageQueueStore string TempBytes *sync.Pool LevelDB *leveldb.DB )
var ( Log log.Logger LogLevel log.Level ErrInvalidConnectionType error = errors.New("service: Invalid connection type") ErrInvalidSubscriber error = errors.New("service: Invalid subscriber") ErrBufferNotReady error = errors.New("service: buffer is not ready") ErrBufferInsufficientData error = errors.New("service: buffer has insufficient data.") )
var (
CompressLevel int
)
var (
IsOnline func(topic string) (online bool)
)
Functions ¶
func GetOnlineStatus ¶
Types ¶
type BadgeMessage ¶
func (*BadgeMessage) MarshalJSON ¶
func (mj *BadgeMessage) MarshalJSON() ([]byte, error)
func (*BadgeMessage) MarshalJSONBuf ¶
func (mj *BadgeMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*BadgeMessage) UnmarshalJSON ¶
func (uj *BadgeMessage) UnmarshalJSON(input []byte) error
func (*BadgeMessage) UnmarshalJSONFFLexer ¶
func (uj *BadgeMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type BroadCastMessage ¶
func (*BroadCastMessage) MarshalJSON ¶
func (mj *BroadCastMessage) MarshalJSON() ([]byte, error)
func (*BroadCastMessage) MarshalJSONBuf ¶
func (mj *BroadCastMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*BroadCastMessage) UnmarshalJSON ¶
func (uj *BroadCastMessage) UnmarshalJSON(input []byte) error
func (*BroadCastMessage) UnmarshalJSONFFLexer ¶
func (uj *BroadCastMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type Client ¶
type Client struct { // The number of seconds to keep the connection live if there's no data. // If not set then default to 5 mins. KeepAlive int // The number of seconds to wait for the CONNACK message before disconnecting. // If not set then default to 2 seconds. ConnectTimeout int // The number of seconds to wait for any ACK messages before failing. // If not set then default to 20 seconds. AckTimeout int // The number of times to retry sending a packet if ACK is not received. // If no set then default to 3 retries. TimeoutRetries int // contains filtered or unexported fields }
Client is a library implementation of the MQTT client that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.
func (*Client) Connect ¶
func (this *Client) Connect(uri string, msg *message.ConnectMessage) (err error)
Connect is for MQTT clients to open a connection to a remote server. It needs to know the URI, e.g., "tcp://127.0.0.1:1883", so it knows where to connect to. It also needs to be supplied with the MQTT CONNECT message.
func (*Client) Disconnect ¶
func (this *Client) Disconnect()
Disconnect sends a single DISCONNECT message to the server. The client immediately terminates after the sending of the DISCONNECT message.
func (*Client) Ping ¶
func (this *Client) Ping(onComplete OnCompleteFunc) error
Ping sends a single PINGREQ message to the server. PINGREQ/PINGRESP messages are mainly used by the client to keep a heartbeat to the server so the connection won't be dropped.
func (*Client) Publish ¶
func (this *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error
Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.
func (*Client) Subscribe ¶
func (this *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, onPublish OnPublishFunc) error
Subscribe sends a single SUBSCRIBE message to the server. The SUBSCRIBE message can contain multiple topics that the client wants to subscribe to. On completion, which is when the client receives a SUBACK messsage back from the server, the supplied onComplete funciton is called.
When messages are sent to the client from the server that matches the topics the client subscribed to, the onPublish function is called to handle those messages. So in effect, the client can supply different onPublish functions for different topics.
func (*Client) Unsubscribe ¶
func (this *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error
Unsubscribe sends a single UNSUBSCRIBE message to the server. The UNSUBSCRIBE message can contain multiple topics that the client wants to unsubscribe. On completion, which is when the client receives a UNSUBACK message from the server, the supplied onComplete function is called. The client will no longer handle messages from the server for those unsubscribed topics.
type ClientHash ¶
type MtBroadCastMessage ¶
func (*MtBroadCastMessage) MarshalJSON ¶
func (mj *MtBroadCastMessage) MarshalJSON() ([]byte, error)
func (*MtBroadCastMessage) MarshalJSONBuf ¶
func (mj *MtBroadCastMessage) MarshalJSONBuf(buf fflib.EncodingBuffer) error
func (*MtBroadCastMessage) UnmarshalJSON ¶
func (uj *MtBroadCastMessage) UnmarshalJSON(input []byte) error
func (*MtBroadCastMessage) UnmarshalJSONFFLexer ¶
func (uj *MtBroadCastMessage) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
type OfflineTopicQueue ¶
type OfflineTopicQueue struct { Topic string Q [][]byte Pos int Cleaned bool // contains filtered or unexported fields }
定义一个离线消息队列的结构体,保存一个二维byte数组和一个位置
func NewOfflineTopicQueue ¶
func NewOfflineTopicQueue(topic string) (mq *OfflineTopicQueue)
func (*OfflineTopicQueue) DBKey ¶
func (this *OfflineTopicQueue) DBKey(pos int) (key string)
func (*OfflineTopicQueue) GetAll ¶
func (this *OfflineTopicQueue) GetAll() (msg_bytes [][]byte)
type OnPublishFunc ¶
type OnPublishFunc func(msg *message.PublishMessage) error
type PendingStatus ¶
type PendingStatus struct { Done chan (bool) Topic string Msg *message.PublishMessage }
func NewPendingStatus ¶
func NewPendingStatus(topic string, msg *message.PublishMessage) *PendingStatus
type Server ¶
type Server struct { // The number of seconds to keep the connection live if there's no data. // If not set then default to 5 mins. KeepAlive int // The number of seconds to wait for the CONNECT message before disconnecting. // If not set then default to 2 seconds. ConnectTimeout int // The number of seconds to wait for any ACK messages before failing. // If not set then default to 20 seconds. AckTimeout int // The number of times to retry sending a packet if ACK is not received. // If no set then default to 3 retries. TimeoutRetries int // Authenticator is the authenticator used to check username and password sent // in the CONNECT message. If not set then default to "mockSuccess". Authenticator string // SessionsProvider is the session store that keeps all the Session objects. // This is the store to check if CleanSession is set to 0 in the CONNECT message. // If not set then default to "mem". SessionsProvider string // TopicsProvider is the topic store that keeps all the subscription topics. // If not set then default to "mx". TopicsProvider string // contains filtered or unexported fields }
Server is a library implementation of the MQTT server that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.
func (*Server) Close ¶
Close terminates the server by shutting down all the client connections and closing the listener. It will, as best it can, clean up after itself.
func (*Server) CreateAndGetBytes ¶
func (*Server) DestoryBytes ¶
func (*Server) ListenAndServe ¶
ListenAndServe listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:1883".
func (*Server) Publish ¶
func (this *Server) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) (err error)
Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.
type Status ¶
type Status struct {
// contains filtered or unexported fields
}