openpdu/src/mqtt/mqtt.go

114 lines
3.1 KiB
Go

package mqtt
import (
"fmt"
"time"
"git.openpdu.org/OpenPDU/openpdu/config"
"git.openpdu.org/OpenPDU/openpdu/syslog"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var MQTTclient MQTT.Client
func init() {
config.SetDefault("Mqtt.CliendID", "OpenPDU") // max 23 chars
config.SetDefault("Mqtt.Prefix", "openpdu") // max 23 chars
config.SetDefault("Mqtt.Schema", "tcp")
config.SetDefault("Mqtt.Host", "localhost")
config.SetDefault("Mqtt.Port", "1883")
config.SetDefault("Mqtt.Username", "")
config.SetDefault("Mqtt.Password", "")
config.SetDefault("Mqtt.LWTTopic", "LWT")
config.SetDefault("Mqtt.LWTMessageOnline", "Online")
config.SetDefault("Mqtt.LWTMessageOffline", "Offline")
config.SetDefault("Mqtt.HomeAssistant", false)
// MQTT.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
// MQTT.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
// MQTT.WARN = log.New(os.Stdout, "[WARN] ", 0)
// MQTT.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
}
// https://girishjoshi.io/post/golang-paho-mqtt/
func Setup() {
uri := config.GetString("Mqtt.Schema") + "://" + config.GetString("Mqtt.Host") + ":" + config.GetString("Mqtt.Port")
opts := MQTT.NewClientOptions().AddBroker(uri)
opts.SetClientID(config.GetString("Mqtt.CliendID"))
opts.SetWill(config.GetString("Mqtt.Prefix")+"/"+config.GetString("Mqtt.LWTTopic"), config.GetString("Mqtt.LWTMessageOffline"), 0, false)
if username := config.GetString("Mqtt.Username"); username != "" {
opts.SetUsername(username)
}
if password := config.GetString("Mqtt.Password"); password != "" {
opts.SetPassword(password)
}
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetConnectTimeout(5 * time.Second)
opts.SetCleanSession(true)
opts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
syslog.Err("mqtt connection lost error: " + err.Error())
})
opts.SetReconnectingHandler(func(c MQTT.Client, options *MQTT.ClientOptions) {
syslog.Notice("mqtt reconnecting")
})
opts.SetOnConnectHandler(func(c MQTT.Client) {
syslog.Notice("mqtt connected")
PublishRoot(config.GetString("Mqtt.Prefix")+"/"+config.GetString("Mqtt.LWTTopic"), config.GetString("Mqtt.LWTMessageOnline"))
})
MQTTclient = MQTT.NewClient(opts)
for {
token := MQTTclient.Connect()
token.Wait()
if MQTTclient.IsConnected() {
break
}
time.Sleep(5 * time.Second)
}
}
func Subscribe(topic string, handler func(MQTT.Client, MQTT.Message)) {
// MQTTHandler(MQTT.Client, MQTT.Message)
MQTTclient.Subscribe(config.GetString("Mqtt.Prefix")+"/switch/"+topic, 1, handler)
syslog.Debug(fmt.Sprintf("MQTT subscribed to topic: %s", topic))
}
func Connected() bool {
if MQTTclient == nil {
return false
}
return MQTTclient.IsConnected()
}
func Disconnect() {
MQTTclient.Disconnect(250)
}
func Publish(topic string, value string) {
if MQTTclient == nil {
return
}
if MQTTclient.IsConnected() {
MQTTclient.Publish(config.GetString("Mqtt.Prefix")+"/switch/"+topic, 0, false, value)
}
}
func PublishRoot(topic string, value string) {
if MQTTclient == nil {
return
}
if MQTTclient.IsConnected() {
MQTTclient.Publish(topic, 0, false, value)
}
}