Documentation
¶
Overview ¶
Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Index ¶
- Constants
- Variables
- func GenUniqueId() string
- func InitHTTPMoniter(b *Broker)
- func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error)
- func ProcessMessage(msg *Message)
- func TlsTimeout(conn *tls.Conn)
- type Broker
- func (b *Broker) CheckConnectAuth(clientID, username, password string) bool
- func (b *Broker) CheckTopicAuth(action, username, topic string) bool
- func (b *Broker) DeleteConnect(clientID string)
- func (b *Broker) GetConnectionNumber() (int64, error)
- func (b *Broker) Handshake(conn net.Conn) bool
- func (b *Broker) InitKafka()
- func (b *Broker) InitKafkaConsumer()
- func (b *Broker) InitKafkaProducer()
- func (b *Broker) InitRedisClient()
- func (b *Broker) PublishMessage(packet *packets.PublishPacket)
- func (b *Broker) PublishToKafka(e *Elements)
- func (b *Broker) RecvConnect()
- func (b *Broker) SaveConnect(clientID string)
- func (b *Broker) SendConnect(clientID string)
- func (b *Broker) Start()
- func (b *Broker) StartClientListening(Tls bool)
- func (b *Broker) StartWebsocketListening()
- func (b *Broker) SubmitWork(clientId string, msg *Message)
- type Config
- type Elements
- type KafkaConfig
- type Message
- type TLSInfo
Constants ¶
View Source
const ( SUB = "1" PUB = "2" )
View Source
const ( MessagePoolNum = 1024 MessagePoolMessageNum = 1024 )
View Source
const ( Connected = 1 Disconnected = 2 )
View Source
const ( // ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors. ACCEPT_MIN_SLEEP = 100 * time.Millisecond // ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors ACCEPT_MAX_SLEEP = 10 * time.Second // DEFAULT_ROUTE_CONNECT Route solicitation intervals. DEFAULT_ROUTE_CONNECT = 5 * time.Second // DEFAULT_TLS_TIMEOUT DEFAULT_TLS_TIMEOUT = 5 * time.Second )
View Source
const ( CONNECT = uint8(iota + 1) CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT )
View Source
const ( QosAtMostOnce byte = iota QosAtLeastOnce QosExactlyOnce QosFailure = 0x80 )
View Source
const ( //Connect mqtt connect Connect = "connect" //Publish mqtt publish Publish = "publish" //Subscribe mqtt sub Subscribe = "subscribe" //Unsubscribe mqtt sub Unsubscribe = "unsubscribe" //Disconnect mqtt disconenct Disconnect = "disconnect" )
View Source
const (
MQTTConnectionKey = "mqtt-connection"
)
Variables ¶
View Source
var (
DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
)
Functions ¶
func GenUniqueId ¶
func GenUniqueId() string
func InitHTTPMoniter ¶
func InitHTTPMoniter(b *Broker)
func ProcessMessage ¶
func ProcessMessage(msg *Message)
func TlsTimeout ¶
Types ¶
type Broker ¶
type Broker struct { RedisClient *redis.Client Auth auth.Auth // contains filtered or unexported fields }
func (*Broker) CheckConnectAuth ¶
func (*Broker) CheckTopicAuth ¶
func (*Broker) DeleteConnect ¶
func (*Broker) GetConnectionNumber ¶
func (*Broker) InitRedisClient ¶
func (b *Broker) InitRedisClient()
func (*Broker) PublishMessage ¶
func (b *Broker) PublishMessage(packet *packets.PublishPacket)
func (*Broker) PublishToKafka ¶
func (*Broker) RecvConnect ¶
func (b *Broker) RecvConnect()
func (*Broker) SaveConnect ¶
func (*Broker) SendConnect ¶
func (*Broker) StartClientListening ¶
func (*Broker) StartWebsocketListening ¶
func (b *Broker) StartWebsocketListening()
func (*Broker) SubmitWork ¶
type Config ¶
type Config struct { Worker int `json:"workerNum"` Host string `json:"host"` Port string `json:"port"` TlsHost string `json:"tlsHost"` TlsPort string `json:"tlsPort"` WsPath string `json:"wsPath"` WsPort string `json:"wsPort"` WsTLS bool `json:"wsTLS"` TlsInfo TLSInfo `json:"tlsInfo"` Acl bool `json:"acl"` AclConf string `json:"aclConf"` Debug bool `json:"debug"` KafkaConfig KafkaConfig `json:"kafkaConfig"` RedisAddr string `json:"redisAddr"` Plugins []string `json:"plugins"` }
func ConfigureConfig ¶
func LoadConfig ¶
type Elements ¶
type Elements struct { ClientID string `json:"clientid"` Username string `json:"username"` Topic string `json:"topic"` Payload string `json:"payload"` Timestamp int64 `json:"ts"` Size int32 `json:"size"` Action string `json:"action"` }
Elements kafka publish elements
type KafkaConfig ¶
type KafkaConfig struct { Addr []string `json:"addr"` Partition int32 Producer struct { ConnectTopic string `json:"onConnect"` SubscribeTopic string `json:"onSubscribe"` PublishTopic string `json:"onPublish"` UnsubscribeTopic string `json:"onUnsubscribe"` DisconnectTopic string `json:"onDisconnect"` RegexpMap map[string]string `json:"regexpMap"` } `json:"producer"` Consumer struct { Topic []string } `json:"consumer"` }
KfakConfig device kafka config
Source Files
¶
Click to show internal directories.
Click to hide internal directories.