Tidy up helper somewhat, and start on implementing reprocess
authorjweigele <jweigele@local>
Fri, 9 Dec 2022 22:50:25 +0000 (14:50 -0800)
committerjweigele <jweigele@local>
Fri, 9 Dec 2022 22:50:25 +0000 (14:50 -0800)
For helper:
 * Helper does most of the json I/O so we don't need to import in other files
 * Add some structs to manage the complexity of multiple routing keys
 * Various convenience options (IncludeDate on send, Source in the RabbitConfig) for easier scope

For reprocess:
 * Reads in temp pretty simply, and sets prometheus gauges as we've done before
 * Kinda handles DIY temperature (resends and prometheus gauge), and does not care about motion (so won't send it back out)
 * Weird chain of structs to implement different devices
 * Have not started on power or other complicated logic yet

go.mod
helper/helper.go
reprocess/main.go [new file with mode: 0644]
timecolorshift/main.go
timecolorshift/timecolorshift [deleted file]
wunder/main.go

diff --git a/go.mod b/go.mod
index 71e3d9ae1c400d30b68de6b0948bc64bd4a92531..4510935f2bd90b3754df025459060366031b394d 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -5,10 +5,20 @@ go 1.18
 require (
        github.com/PuerkitoBio/goquery v1.8.0
        github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
+       github.com/prometheus/client_golang v1.14.0
        github.com/rabbitmq/amqp091-go v1.5.0
 )
 
 require (
        github.com/andybalholm/cascadia v1.3.1 // indirect
+       github.com/beorn7/perks v1.0.1 // indirect
+       github.com/cespare/xxhash/v2 v2.1.2 // indirect
+       github.com/golang/protobuf v1.5.2 // indirect
+       github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
+       github.com/prometheus/client_model v0.3.0 // indirect
+       github.com/prometheus/common v0.37.0 // indirect
+       github.com/prometheus/procfs v0.8.0 // indirect
        golang.org/x/net v0.3.0 // indirect
+       golang.org/x/sys v0.3.0 // indirect
+       google.golang.org/protobuf v1.28.1 // indirect
 )
index ced2c2873a8b5a77b84d9ca40dbabe310fb9892a..9e966aa975692b85065a15d2d4e6e8ecb168bb6c 100644 (file)
@@ -9,6 +9,7 @@ import (
 //        "io"
         "io/ioutil"
         "log"
+        "strings"
 //        "math"
 //        "net/http"
 //        "os"
@@ -16,15 +17,25 @@ import (
         "time"
 
 //        "github.com/PuerkitoBio/goquery"
-//        "github.com/leekchan/timeutil"
+        "github.com/leekchan/timeutil"
         amqp "github.com/rabbitmq/amqp091-go"
 )
 
+
+// RabbitSend just a bundle of a map with a routing key
+type RabbitSend struct {
+        Data map[string]interface{}
+        RoutingKey string
+        IncludeDate bool            // defaults to false, so we don't send the date
+}
+
 // RabbitConfig simple values to pass around
 type RabbitConfig struct {
-        Exchange   string
-        Channel    *amqp.Channel
-        RoutingKey string
+        Exchange            string
+        Channel             *amqp.Channel
+        Queue               *amqp.Queue
+        DefaultRoutingKey   string
+        Source              string
 }
 
 func failOnError(err error, msg string) {
@@ -33,7 +44,8 @@ func failOnError(err error, msg string) {
         }
 }
 
-func SetupRabbit(configFilename string, routingKey string) RabbitConfig {
+
+func SetupRabbit(configFilename string, defaultRoutingKey string, source string) RabbitConfig {
         configJSON, err := ioutil.ReadFile(configFilename)
         if err != nil {
                 failOnError(err, "oh no read")
@@ -52,7 +64,7 @@ func SetupRabbit(configFilename string, routingKey string) RabbitConfig {
         //defer conn.Close()
 
         rabbitChannel, err := conn.Channel()
-        failOnError(err, "Failed to open a rabbitChannelannel")
+        failOnError(err, "Failed to open a rabbitChannel")
         //defer rabbitChannel.Close()
 
         err = rabbitChannel.ExchangeDeclare(
@@ -72,26 +84,113 @@ func SetupRabbit(configFilename string, routingKey string) RabbitConfig {
         rabbit.Exchange = result["exchange"].(string)
         rabbit.Channel = rabbitChannel
         // constants
-        rabbit.RoutingKey = routingKey 
+        rabbit.DefaultRoutingKey = defaultRoutingKey 
+        rabbit.Source = source
         return rabbit
 }
 
+// simple, bind to a (topic) with our default params
+// will declare a queue if we don't have one
+func Bind(routingKey string, rabbit RabbitConfig) (RabbitConfig){
+    // if queue hasn't yet been initialized (no other bound topics basically)
+    if rabbit.Queue == nil {
+        queue, err := rabbit.Channel.QueueDeclare(
+                "",         // queue name (auto-gen)
+                false,      // durable
+                true,       // autoDelete (get rid of if consumer discos or server restarts)
+                true,       // exclusive
+                false,      // noWait (i.e. block until finished)
+                nil,        // args
+        )
+        // pass by reference so we can fit in our struct
+        rabbit.Queue = &queue
+        failOnError(err, "failed queue declaration") 
+        log.Printf("new queue declared %s", rabbit.Queue.Name)
+    }
+
+    // last two are block on binding and args
+    err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil)
+    failOnError(err, "failed queue binding")
+    return rabbit
 
-func SendJSON(data []byte, rabbit RabbitConfig, verboseSend bool) {
-        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-        defer cancel()
-        err := rabbit.Channel.PublishWithContext(ctx,
-                rabbit.Exchange,   // exchange
-                rabbit.RoutingKey, // routing key
-                false,             // mandatory
-                false,             // immediate
-                amqp.Publishing{
-                        ContentType: "application/json",
-                        Body:        data,
-                })
-        failOnError(err, "Failed to publish a message")
-        if verboseSend {
-            log.Printf("Sent %s routing: %s\n", data, rabbit.RoutingKey)
+}
+
+// starts consuming with all our presets and returns channel
+func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery){
+    log.Printf("consuming on queue %+v", rabbit.Queue)
+    deliveries, err := rabbit.Channel.Consume(
+            rabbit.Queue.Name,
+            "",                 // consumer (auto-gen)
+            true,               // autoAck
+            true,               // exclusive
+            false,              // noLocal - does nothing?
+            false,              // wait and block please
+            nil,                // args
+    )
+    failOnError(err, "failed channel consume")
+    return deliveries
+}
+
+func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}){
+    //var result map[string]string //interface{} 
+    result := make(map[string]interface{})
+    // either explicitly json, or zigbee2mqtt (which we have set to only output json)
+    isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt")
+    if delivery.ContentType == "application/json" || isZigbee {
+        err := json.Unmarshal(delivery.Body, &result)
+        if isZigbee {
+            // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
+            // try to further decode this in an effort to help the data consumer
+            _, actionPresent := result["action"]
+            if actionPresent {
+                var decode map[string]interface{}
+                json.Unmarshal([]byte(result["action"].(string)), &decode)
+                result["action"] = decode
+            }
         }
+        failOnError(err, "failed to decode json")
+    } else {
+        result["data"] = string(delivery.Body[:])
+    }
+    return result
 }
+        
+func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool){
+    if rabbitData.IncludeDate {
+        // add the date as a convenience
+        utcNow := time.Now().UTC()
+        dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
+        rabbitData.Data["date"] = dateString
+    }
 
+    if rabbit.Source != "" {
+        rabbitData.Data["source"] = rabbit.Source
+    }
+
+    jsonBytes, err := json.Marshal(rabbitData.Data)
+    failOnError(err, "Improperly formatted JSON, just gonna exit here")
+    var routingKey string
+    if rabbitData.RoutingKey != "" {
+        routingKey = rabbitData.RoutingKey
+    } else if rabbit.DefaultRoutingKey != "" {
+        routingKey = rabbit.DefaultRoutingKey
+    } else {
+        failOnError(fmt.Errorf("routing key error"), "No valid routing key found to send on")
+    }
+    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+    defer cancel()
+    err = rabbit.Channel.PublishWithContext(ctx,
+            rabbit.Exchange,        // exchange
+            routingKey,  // routing key
+            false,                  // mandatory
+            false,                  // immediate
+            amqp.Publishing{
+                    ContentType: "application/json",
+                    Body:        jsonBytes,
+            })
+    failOnError(err, "Failed to publish a message")
+    if verboseSend {
+        log.Printf("Sent %s routing: %s\n", jsonBytes, routingKey)
+    }
+
+}
diff --git a/reprocess/main.go b/reprocess/main.go
new file mode 100644 (file)
index 0000000..99a29c5
--- /dev/null
@@ -0,0 +1,234 @@
+// basic stuff
+package main
+
+import (
+       "fmt"
+       "log"
+       "os"
+       "time"
+
+        // prometheus imports
+        "net/http"
+        "github.com/prometheus/client_golang/prometheus"
+        "github.com/prometheus/client_golang/prometheus/promauto"
+        "github.com/prometheus/client_golang/prometheus/promhttp"
+
+        "unpiege.net/rabbit_go.git/helper"
+)
+
+var (
+        tempGauge = promauto.NewGaugeVec(
+            prometheus.GaugeOpts{
+                Name: "temp_sensor",
+                Help: "Used to measure temperatures in the real world",
+            },
+            []string{"scale", "location"},
+        )
+        //tempExpire = map[prometheus.Labels]time.Time
+        //tempExpire = map[string]string
+
+        tempExpire = make(map[[2]string]time.Time)
+
+        staleTime = time.Duration(15 * time.Second)
+
+)
+
+
+type Device interface {
+    getRoutingKey() string
+}
+
+type ZigDevice struct {
+    routingKey string
+}
+
+type DIYDevice struct {
+    *ZigDevice
+}
+
+type PowerDevice struct {
+    *ZigDevice
+    powerName string
+    queryNeeded bool
+}
+
+
+func (dev *DIYDevice) getRoutingKey() (string){
+    return fmt.Sprintf("zigbee2mqtt.%s", dev.routingKey)
+}
+
+func NewDevice(routingKey string) (*ZigDevice){
+    retval := ZigDevice{routingKey: routingKey}
+    //retval.routingKey = routingKey
+    return &retval
+}
+
+func NewDIYDevice(routingKey string) (*DIYDevice){
+    return &DIYDevice{
+        ZigDevice: NewDevice(routingKey),
+    }
+    
+}
+
+//tempExpire := make(map[[2]string]time.Time)
+
+func failOnError(err error, msg string) {
+       if err != nil {
+               log.Panicf("%s: %s", msg, err)
+       }
+}
+
+func handleTemp(obj map[string]interface{}){
+    log.Printf("temp! %v", obj)
+    celsius := obj["celsius"].(float64)
+    location := obj["location"].(string)
+    now := time.Now().UTC()
+    // TODO: last update portion
+
+    // do the label update here
+    //log.Printf("celsius: %f location %s", celsius, location)
+    tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius)
+    tempExpire[[2]string{"celsius", location}] = now
+    
+}
+
+func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend){
+    log.Printf("diy! %v", obj)
+    _, isAction := obj["action"]
+    if isAction {
+        // this is yet more JSON, so decode it
+        data := obj["action"].(map[string]interface{})
+/*        err := json.Unmarshal([]byte(obj["action"].(string)), &data)
+        if err != nil {
+            log.Printf("Unable to decode json action from diy, ignoring")
+            return
+        }*/
+        var senseType string
+
+        _, isTemp := data["celsius"]
+        if isTemp {
+            senseType = "temperature"
+        } else {
+            _, hasType := data["type"]
+            if hasType {
+                senseType = data["type"].(string)
+            } else {
+                log.Printf("Did not find any sense type, just returning")
+                return
+            }
+        }
+        // should have the senseType setup correctly now, if we're still here
+        if senseType == "temperature" {
+            tempMap := make(map[string]interface{}, 0)    
+            // these should all exist properly
+            tempMap["fahrenheit"] = data["celsius"].(float64) * 9/5 + 32
+            tempMap["celsius"] = data["celsius"].(float64)
+            tempMap["location"] = data["location"].(string)
+            log.Printf("would send %v", tempMap)
+            // hardcoded temp routingKey for this type of measurement
+            sendThis := helper.RabbitSend{Data: tempMap, RoutingKey: "lol", IncludeDate: true}
+            sendChannel <- sendThis
+        }
+
+    } else {
+        log.Printf("not action, ignoring")
+    }
+
+}
+
+func readLoop(channel chan helper.RabbitSend, devices []Device, rabbit helper.RabbitConfig) {
+        for _, device := range devices {
+            rabbit = helper.Bind(device.getRoutingKey(), rabbit)
+            log.Printf("bound to %s", device.getRoutingKey())
+        }
+        rabbit = helper.Bind("temp", rabbit)
+        log.Printf("bound to temp")
+        deliveries := helper.StartConsuming(rabbit)
+        for delivery := range deliveries {
+            //log.Printf("got a delivery %+v", delivery)
+            item := helper.DecodeDelivery(delivery)
+            //log.Printf("%+v", item)
+            if delivery.RoutingKey == "temp" {
+                handleTemp(item)
+            }
+            for _, device := range devices {
+                if device.getRoutingKey() == delivery.RoutingKey {
+                    log.Printf("Found device for routing key %s, %v", delivery.RoutingKey, device)
+                    switch t := device.(type) {
+                        case *DIYDevice:
+                            log.Printf("DIY device, sending to handle")
+                            handleDIY(item, channel)
+                        default:
+                            log.Printf("other device, type was %s", t)
+                    }
+                }
+            }
+            //time.Sleep(time.Duration(0.5*float64(time.Second)))
+        }
+}
+func timeLoop(channel chan helper.RabbitSend) {
+       for {
+            time.Sleep(time.Duration(0.5*float64(time.Second)))
+       }
+}
+
+func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig){
+        for sendItem := range channel {
+            log.Printf("would try to send here %v", sendItem)
+            helper.SendData(sendItem, rabbit, true)
+            //helper.SendData(sendData, rabbit, false)
+        }
+}
+
+func promSetup(){
+        // prometheus setup
+        http.Handle("/metrics", promhttp.Handler())
+        http.ListenAndServe(":9099", nil)
+}
+
+
+func expireStaleMetrics(){
+    now := time.Now().UTC()
+    // tempExpire
+    tempExpireValues := make([][2]string, 0)
+    for labels, lastUpdate := range tempExpire {
+        curStale := now.Sub(lastUpdate)
+        if curStale > staleTime {
+            log.Printf("stale time %v, expiring labels %v\tlastUpdate: %v", curStale, labels, lastUpdate)
+            tempExpireValues = append(tempExpireValues, labels)
+        }
+    }
+
+    // now we have the values to delete
+    for _, labels := range tempExpireValues {
+        complete := tempGauge.DeleteLabelValues(labels[:]...)
+        if complete {
+            delete(tempExpire, labels)
+        } else {
+            log.Panicf("oh no wasn't able to delete metrics")
+        }
+    }
+
+}
+
+func main() {
+       rabbit := helper.SetupRabbit(os.Args[1], "", "reprocess") // config file, default routing key, source
+       const sleepTime time.Duration = 5 * time.Second
+
+        devices := make([]Device, 0)
+        devices = append(devices, NewDIYDevice("office_pico"))
+
+       //currentTemp, err := fetchTemp(weatherStation)
+       channel := make(chan helper.RabbitSend)
+
+        go promSetup()
+        go readLoop(channel, devices, rabbit)
+       go timeLoop(channel)
+        go sendLoop(channel, rabbit)
+
+       for true {
+               time.Sleep(sleepTime)
+                go expireStaleMetrics()
+       }
+
+}
index 72c2e3e1b50411e0e9ee6b310345b6416278b0b6..decabb94f5092304797426c41a6bcb1563e57899 100644 (file)
@@ -2,7 +2,7 @@
 package main
 
 import (
-       "encoding/json"
+       //"encoding/json"
        //"fmt"
        //"io"
        "log"
@@ -131,7 +131,7 @@ func sum(arr []float64) float64 {
        return sum
 }
 
-func timeLoop(channel chan []byte, timeColors []TimeColor) {
+func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) {
         loc, _ := time.LoadLocation("America/Los_Angeles")
        for {
             //log.Printf("okay just looping here")
@@ -151,8 +151,7 @@ func timeLoop(channel chan []byte, timeColors []TimeColor) {
             sendObj["red"] = totalRGB.Red
             sendObj["green"] = totalRGB.Green
             sendObj["blue"] = totalRGB.Blue
-
-            data, _ := json.Marshal(sendObj)
+            data := helper.RabbitSend{Data: sendObj}
 
             channel <- data
             time.Sleep(time.Duration(0.5*float64(time.Second)))
@@ -160,19 +159,18 @@ func timeLoop(channel chan []byte, timeColors []TimeColor) {
        }
 }
 
-func sendLoop(channel chan []byte, rabbit helper.RabbitConfig){
+func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig){
         for sendData := range channel {
-            //log.Printf("would try to send here %s", sendData)
-            helper.SendJSON(sendData, rabbit, false)
+            helper.SendData(sendData, rabbit, true)
         }
 }
 
 func main() {
-       rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py") // config file, routing key
+       rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py", "timecolorshift") // config file, routing key
        const sleepTime time.Duration = 30 * time.Second
 
        //currentTemp, err := fetchTemp(weatherStation)
-       channel := make(chan []byte)
+       channel := make(chan helper.RabbitSend)
         all_tcs := make([]TimeColor, 0)
         all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour:12}))
         all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour:8}))
diff --git a/timecolorshift/timecolorshift b/timecolorshift/timecolorshift
deleted file mode 100755 (executable)
index ff2414c..0000000
Binary files a/timecolorshift/timecolorshift and /dev/null differ
index 293ab4db041af0842d735cd2964556d04e767737..c820386b402d8f41cfe1e4e275e5c6c425f23b5b 100644 (file)
@@ -2,7 +2,7 @@
 package main
 
 import (
-       "encoding/json"
+       //"encoding/json"
        "fmt"
        "io"
        "log"
@@ -13,7 +13,7 @@ import (
        "time"
 
        "github.com/PuerkitoBio/goquery"
-       "github.com/leekchan/timeutil"
+       //"github.com/leekchan/timeutil"
         "unpiege.net/rabbit_go.git/helper"
 )
 
@@ -111,7 +111,7 @@ func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfi
                                } else {
                                        sendTemp(currentTemp, celsius, rabbit)
                                }
-                               fmt.Printf("temp list is %s, avg is %f\n", tempList, tempAvg)
+                               fmt.Printf("temp list is %+v, avg is %f\n", tempList, tempAvg)
                        }
                }
        }
@@ -126,16 +126,12 @@ func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig
         sendObj["fahrenheit"] = tempValueF
         // just a constant now
         sendObj["location"] = "Outside (Wunderground)"
-        utcNow := time.Now().UTC()
-        dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
-        sendObj["date"] = dateString
-        data, err := json.Marshal(sendObj)
-        failOnError(err, "oh no json marshal")
-        helper.SendJSON(data, rabbit, true)
+        rabbitData := helper.RabbitSend{Data: sendObj, IncludeDate: true}
+        helper.SendData(rabbitData, rabbit, true)
 }
 
 func main() {
-       rabbit := helper.SetupRabbit(os.Args[1], "temp") // config file, routing key
+       rabbit := helper.SetupRabbit(os.Args[1], "temp", "wunder") // config file, routing key
        const weatherStation string = "KWASEATT2696"
        const sleepTime time.Duration = 30 * time.Second