From bcfbbaba804c30953662301dd89ece68f7f8329e Mon Sep 17 00:00:00 2001 From: Paolo Asperti Date: Tue, 5 Jan 2021 16:23:55 +0100 Subject: [PATCH] Update board.go and board_mqtt.go --- src/board.go | 4 +++ src/board_mqtt.go | 76 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/src/board.go b/src/board.go index 1464e11..d5bc62f 100644 --- a/src/board.go +++ b/src/board.go @@ -94,6 +94,10 @@ func parseBoardsConfig() { outlets[num] = &o } + for b := range boards { + go boards[b].Init() + } + // dumpa tutto for b := range boards { boards[b].Dump() diff --git a/src/board_mqtt.go b/src/board_mqtt.go index 18605c7..0ea37f3 100644 --- a/src/board_mqtt.go +++ b/src/board_mqtt.go @@ -4,7 +4,9 @@ import ( "fmt" "log" "strings" + "time" + MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/spf13/viper" ) @@ -24,10 +26,12 @@ type MQTTBoard struct { Name string ChannelCount uint Channels []*MQTTChannel + MQTTRemoteSchema string MQTTRemoteHost string MQTTRemotePort string MQTTRemoteUsername string MQTTRemotePassword string + MQTTClient MQTT.Client } func newMQTTChannel(v *viper.Viper, channelID string) MQTTChannel { @@ -35,6 +39,7 @@ func newMQTTChannel(v *viper.Viper, channelID string) MQTTChannel { v.SetDefault("lastValue", false) v.SetDefault("onboot", "off") v.SetDefault("mqtttopic", v.GetString("name")) + v.SetDefault("mqttremote.topic", v.GetString("name")) value := false switch v.GetString("onboot") { @@ -46,12 +51,13 @@ func newMQTTChannel(v *viper.Viper, channelID string) MQTTChannel { // newUUID := UUID.New().String() // v.SetDefault("id", newUUID) return MQTTChannel{ - ID: channelID, - Num: v.GetUint("num"), - name: v.GetString("name"), - MQTTTopic: v.GetString("mqtttopic"), - Value: value, - onboot: v.GetString("onboot"), + ID: channelID, + Num: v.GetUint("num"), + name: v.GetString("name"), + MQTTTopic: v.GetString("mqtttopic"), + Value: value, + onboot: v.GetString("onboot"), + MQTTRemoteTopic: v.GetString("mqttremote.topic"), } } @@ -62,6 +68,11 @@ func newMQTTBoard(v *viper.Viper, id string) *MQTTBoard { v.SetDefault("type", "mqtt") v.SetDefault("channelCount", 0) v.SetDefault("channels", "") + v.SetDefault("MQTTRemote.Schema", "tcp") + v.SetDefault("MQTTRemote.Host", "") + v.SetDefault("MQTTRemote.Port", "1883") + v.SetDefault("MQTTRemote.Username", "") + v.SetDefault("MQTTRemote.Password", "") if v.GetInt("channelCount") > 0 { for i := 0; i < v.GetInt("channelCount"); i++ { @@ -70,9 +81,14 @@ func newMQTTBoard(v *viper.Viper, id string) *MQTTBoard { } b = MQTTBoard{ - ID: id, - Name: v.GetString("name"), - ChannelCount: v.GetUint("channelCount"), + ID: id, + Name: v.GetString("name"), + ChannelCount: v.GetUint("channelCount"), + MQTTRemoteSchema: v.GetString("MQTTRemote.Schema"), + MQTTRemoteHost: v.GetString("MQTTRemote.Host"), + MQTTRemotePort: v.GetString("MQTTRemote.Port"), + MQTTRemoteUsername: v.GetString("MQTTRemote.Username"), + MQTTRemotePassword: v.GetString("MQTTRemote.Password"), } channels := make([]*MQTTChannel, v.GetInt("channelCount")) @@ -124,6 +140,9 @@ func (c *MQTTChannel) ToString() string { func (c *MQTTChannel) UpdateMQTT() { MQTTpublish(c.MQTTTopic, c.ToString()) + if c.parent.MQTTClient.IsConnected() { + c.parent.MQTTClient.Publish(c.MQTTRemoteTopic, 0, false, c.ToString()) + } } func (c *MQTTChannel) OnChange() { @@ -155,7 +174,44 @@ func (b *MQTTBoard) Dump() { } func (b *MQTTBoard) Init() { - return + uri := b.MQTTRemoteSchema + "://" + b.MQTTRemoteHost + ":" + b.MQTTRemotePort + opts := MQTT.NewClientOptions().AddBroker(uri) + + opts.SetClientID(b.ID) + + if b.MQTTRemoteUsername != "" { + opts.SetUsername(b.MQTTRemoteUsername) + } + + if b.MQTTRemotePassword != "" { + opts.SetPassword(b.MQTTRemotePassword) + } + + opts.SetAutoReconnect(true) + opts.SetConnectRetryInterval(5 * time.Second) + opts.SetConnectTimeout(5 * time.Second) + + opts.SetConnectionLostHandler(func(c MQTT.Client, err error) { + logErr("mqtt connection lost error: " + err.Error()) + }) + opts.SetReconnectingHandler(func(c MQTT.Client, options *MQTT.ClientOptions) { + logNotice("mqtt reconnecting") + }) + opts.SetOnConnectHandler(func(c MQTT.Client) { + logNotice("mqtt connected") + // MQTTclient.Publish("openpdu/status", 0, false, "connected") + }) + + b.MQTTClient = MQTT.NewClient(opts) + + for { + token := b.MQTTClient.Connect() + token.Wait() + if b.MQTTClient.IsConnected() { + break + } + time.Sleep(5 * time.Second) + } } func (b *MQTTBoard) Channel(num uint64) Channel {