Clean up the expiration of stale metrics and prepare for production
authorjweigele <jweigele@local>
Mon, 12 Dec 2022 02:51:44 +0000 (18:51 -0800)
committerjweigele <jweigele@local>
Mon, 12 Dec 2022 02:51:44 +0000 (18:51 -0800)
 * Add power metric expiration
 * Make the map expiry keys a joined string, which will help prevent
    weird array lengths being required all over the place, makes expiry
    more generic in implementation
 * Switch routing keys to their production variants
 * _Should_ work in kubernetes as the others do, just need to deploy

go.mod
reprocess/main.go

diff --git a/go.mod b/go.mod
index e54c84e992b048e5b7bcd8ef7266ebe5997f43ce..c5bf4e122fc232879db9b59527daef84cc8d158c 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
        github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
        github.com/prometheus/client_golang v1.14.0
        github.com/rabbitmq/amqp091-go v1.5.0
+       golang.org/x/exp v0.0.0-20221211140036-ad323defaf05
        gopkg.in/yaml.v2 v2.4.0
        k8s.io/klog/v2 v2.80.1
 )
index 12745d1b24919596e3534d65072092e0d66925ca..166dee096fd2b95436887d8784368f9456c0925a 100644 (file)
@@ -6,6 +6,7 @@ import (
        "fmt"
        "os"
        "reflect"
+       "strings"
        "time"
 
        // prometheus imports
@@ -38,13 +39,23 @@ var (
                []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"},
        )
 
-       tempExpire = make(map[[2]string]time.Time)
+       tempExpire  = make(map[string]time.Time)
+       powerExpire = make(map[string]time.Time)
 
-       staleTime      = time.Duration(15 * time.Second)
+       staleTime      = time.Duration(60 * time.Second)
        logger         logr.Logger
        configFilename string
 )
 
+type labelValHash struct {
+       labelValues []string
+}
+
+// used to get a constant hash for indexing a map
+func (l labelValHash) getHash() string {
+       return strings.Join(l.labelValues, ",")
+}
+
 type device interface {
        getRoutingKey() string
 }
@@ -66,7 +77,6 @@ type powerdevice struct {
        *zigdevice
        powerName           string
        queryNeeded         bool
-       lastUpdate          time.Time
        lastUpdateRequested time.Time
        powerList           []powertime
 }
@@ -143,7 +153,10 @@ func handleTemp(obj map[string]interface{}) {
        // do the label update here
        tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius)
        // here, we might have any number of temp values some not covered by our own objects
-       tempExpire[[2]string{"celsius", location}] = now
+       //tempExpire[[2]string{"celsius", location}] = now
+       // create hashed labels for later expiry
+       tempHash := labelValHash{labelValues: []string{"celsius", location}}
+       tempExpire[tempHash.getHash()] = now
 
 }
 
@@ -177,14 +190,14 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
                        dataMap["location"] = data["location"].(string)
                        logger.V(2).Info("Sending reprocessed temperature")
                        // hardcoded temp routingKey for this type of measurement
-                       sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true}
+                       sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "temp", IncludeDate: true}
                        sendChannel <- sendThis
                } else if senseType == "motion" {
                        dataMap["motion_detected"] = data["state"].(bool)
                        // copied from above, but it might change later so shrug
                        dataMap["location"] = data["location"].(string)
                        logger.V(2).Info("Sending reprocessed motion")
-                       sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true}
+                       sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "motion", IncludeDate: true}
                        sendChannel <- sendThis
                } else {
                        logger.Info("Sense type not detected, ignoring")
@@ -210,13 +223,13 @@ func (dev *powerdevice) shouldUpdate(now time.Time) bool {
 func handlePower(obj map[string]interface{}, sendChannel chan helper.RabbitSend, triggerDevice *powerdevice) {
        now := time.Now().UTC()
        logger.V(1).Info("Power data received", "obj", obj)
-       //name := triggerDevice.powerName
-       // here, we're directly tracking power devices so we can bind updated directly to the object
-       triggerDevice.lastUpdate = now
        if triggerDevice.shouldUpdate(now) {
                triggerDevice.add(now, obj["power"].(float64))
                powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": triggerDevice.friendlyName, "ePDUOutletStatusOutletName": triggerDevice.powerName}).Set(triggerDevice.avgPower())
-               // don't worry about expiration, since we have all the values in our objects
+               // here, we have unique objects, but we're forced to be more permissive for temperature so might as well do the same here
+               tempHash := labelValHash{labelValues: []string{triggerDevice.friendlyName, triggerDevice.powerName}}
+               powerExpire[tempHash.getHash()] = now
+
        }
 
 }
@@ -308,8 +321,8 @@ func powerLoop(sendChannel chan helper.RabbitSend, devices []device) {
 
 func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) {
        for sendItem := range channel {
-               helper.SendData(sendItem, rabbit, true)
-               //helper.SendData(sendData, rabbit, false)
+               //helper.SendData(sendItem, rabbit, true)
+               helper.SendData(sendItem, rabbit, false)
        }
 }
 
@@ -319,23 +332,23 @@ func promSetup() {
        http.ListenAndServe(":9099", nil)
 }
 
-func expireStaleMetrics() {
+func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.GaugeVec) {
        now := time.Now().UTC()
-       // tempExpire
-       tempExpireValues := make([][2]string, 0)
-       for labels, lastUpdate := range tempExpire {
+       expireMapValues := make([]string, 0)
+       for labels, lastUpdate := range expireMap {
                curStale := now.Sub(lastUpdate)
                if curStale > staleTime {
-                       logger.Info("stale metric update time, expiring labels", "curStale", curStale, "labels", labels, "lastUpdate", lastUpdate)
-                       tempExpireValues = append(tempExpireValues, labels)
+                       logger.Info("stale temp metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate)
+                       expireMapValues = append(expireMapValues, labels)
                }
        }
-
-       // now we have the values to delete
-       for _, labels := range tempExpireValues {
-               complete := tempGauge.DeleteLabelValues(labels[:]...)
+       // now actually go through and delete labels and then clean up our entry
+       for _, labels := range expireMapValues {
+               // need to make this back into a slice for deletion
+               deleteSlice := strings.Split(labels, ",")
+               complete := expireGauge.DeleteLabelValues(deleteSlice...)
                if complete {
-                       delete(tempExpire, labels)
+                       delete(expireMap, labels)
                } else {
                        logger.Info("unable to delete metrics, exiting")
                        os.Exit(1)
@@ -344,6 +357,11 @@ func expireStaleMetrics() {
 
 }
 
+func expireStaleMetrics() {
+       expireStaleMetric(tempExpire, tempGauge)
+       expireStaleMetric(powerExpire, powerGauge)
+}
+
 func main() {
        // logging and flag initialization
        flag.StringVar(&configFilename, "config", "", "the config filename")
@@ -376,6 +394,9 @@ func main() {
 
        // power devices
        devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true))
+       devices = append(devices, newpowerdevice("0x00158d0002a9ca3a", "Office", true))
+       devices = append(devices, newpowerdevice("0x00158d0002a9a1c7", "Hallway", true))
+       devices = append(devices, newpowerdevice("mobile_power", "Mobile Power", false))
 
        //currentTemp, err := fetchTemp(weatherStation)
        channel := make(chan helper.RabbitSend)