diff --git a/poc/mqtt.go b/poc/mqtt.go new file mode 100644 index 0000000..f2f3187 --- /dev/null +++ b/poc/mqtt.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +func main() { + + opts := MQTT.NewClientOptions() + opts.AddBroker("tcp://127.0.0.1:1883") + opts.SetClientID("ortobio") + opts.SetUsername("DVES_USER") + opts.SetPassword("DVES_PASS") + opts.SetCleanSession(false) + + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + fmt.Println("Sample Publisher Started") + + qos := 0 + + for i := 0; i < 3; i++ { + fmt.Println("---- doing publish ----", i) + token := client.Publish("ciaouno", byte(qos), false, fmt.Sprintf("%d", i)) + token.Wait() + time.Sleep(300 * time.Millisecond) + } + + client.Disconnect(250) + fmt.Println("Sample Publisher Disconnected") + +} diff --git a/poc/mqtt2.go b/poc/mqtt2.go new file mode 100644 index 0000000..a249839 --- /dev/null +++ b/poc/mqtt2.go @@ -0,0 +1,116 @@ + +package main + +import ( + "flag" + "fmt" + "os" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +/* +Options: + [-help] Display help + [-a pub|sub] Action pub (publish) or sub (subscribe) + [-m ] Payload to send + [-n ] Number of messages to send or receive + [-q 0|1|2] Quality of Service + [-clean] CleanSession (true if -clean is present) + [-id ] CliendID + [-user ] User + [-password ] Password + [-broker ] Broker URI + [-topic ] Topic + [-store ] Store Directory +*/ + +func main() { + topic := flag.String("topic", "", "The topic name to/from which to publish/subscribe") + broker := flag.String("broker", "tcp://iot.eclipse.org:1883", "The broker URI. ex: tcp://10.10.1.1:1883") + password := flag.String("password", "", "The password (optional)") + user := flag.String("user", "", "The User (optional)") + id := flag.String("id", "testgoid", "The ClientID (optional)") + cleansess := flag.Bool("clean", false, "Set Clean Session (default false)") + qos := flag.Int("qos", 0, "The Quality of Service 0,1,2 (default 0)") + num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)") + payload := flag.String("message", "", "The message text to publish (default empty)") + action := flag.String("action", "", "Action publish or subscribe (required)") + store := flag.String("store", ":memory:", "The Store Directory (default use memory store)") + flag.Parse() + + if *action != "pub" && *action != "sub" { + fmt.Println("Invalid setting for -action, must be pub or sub") + return + } + + if *topic == "" { + fmt.Println("Invalid setting for -topic, must not be empty") + return + } + + fmt.Printf("Sample Info:\n") + fmt.Printf("\taction: %s\n", *action) + fmt.Printf("\tbroker: %s\n", *broker) + fmt.Printf("\tclientid: %s\n", *id) + fmt.Printf("\tuser: %s\n", *user) + fmt.Printf("\tpassword: %s\n", *password) + fmt.Printf("\ttopic: %s\n", *topic) + fmt.Printf("\tmessage: %s\n", *payload) + fmt.Printf("\tqos: %d\n", *qos) + fmt.Printf("\tcleansess: %v\n", *cleansess) + fmt.Printf("\tnum: %d\n", *num) + fmt.Printf("\tstore: %s\n", *store) + + opts := MQTT.NewClientOptions() + opts.AddBroker(*broker) + opts.SetClientID(*id) + opts.SetUsername(*user) + opts.SetPassword(*password) + opts.SetCleanSession(*cleansess) + if *store != ":memory:" { + opts.SetStore(MQTT.NewFileStore(*store)) + } + + if *action == "pub" { + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + fmt.Println("Sample Publisher Started") + for i := 0; i < *num; i++ { + fmt.Println("---- doing publish ----") + token := client.Publish(*topic, byte(*qos), false, *payload) + token.Wait() + } + + client.Disconnect(250) + fmt.Println("Sample Publisher Disconnected") + } else { + receiveCount := 0 + choke := make(chan [2]string) + + opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) { + choke <- [2]string{msg.Topic(), string(msg.Payload())} + }) + + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + + if token := client.Subscribe(*topic, byte(*qos), nil); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + + for receiveCount < *num { + incoming := <-choke + fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1]) + receiveCount++ + } + + client.Disconnect(250) + fmt.Println("Sample Subscriber Disconnected") + } +}