From d61787dc865ee4abe1e53930e43a1b9454935cd3 Mon Sep 17 00:00:00 2001 From: jweigele Date: Sun, 11 Dec 2022 18:51:44 -0800 Subject: [PATCH] Clean up the expiration of stale metrics and prepare for production * Add power metric expiration * Make the map expiry keys a joined string, which will help prevent weird array lengths being required all over the place, makes expiry more generic in implementation * Switch routing keys to their production variants * _Should_ work in kubernetes as the others do, just need to deploy --- go.mod | 1 + reprocess/main.go | 67 +++++++++++++++++++++++++++++++---------------- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index e54c84e..c5bf4e1 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d github.com/prometheus/client_golang v1.14.0 github.com/rabbitmq/amqp091-go v1.5.0 + golang.org/x/exp v0.0.0-20221211140036-ad323defaf05 gopkg.in/yaml.v2 v2.4.0 k8s.io/klog/v2 v2.80.1 ) diff --git a/reprocess/main.go b/reprocess/main.go index 12745d1..166dee0 100644 --- a/reprocess/main.go +++ b/reprocess/main.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "reflect" + "strings" "time" // prometheus imports @@ -38,13 +39,23 @@ var ( []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"}, ) - tempExpire = make(map[[2]string]time.Time) + tempExpire = make(map[string]time.Time) + powerExpire = make(map[string]time.Time) - staleTime = time.Duration(15 * time.Second) + staleTime = time.Duration(60 * time.Second) logger logr.Logger configFilename string ) +type labelValHash struct { + labelValues []string +} + +// used to get a constant hash for indexing a map +func (l labelValHash) getHash() string { + return strings.Join(l.labelValues, ",") +} + type device interface { getRoutingKey() string } @@ -66,7 +77,6 @@ type powerdevice struct { *zigdevice powerName string queryNeeded bool - lastUpdate time.Time lastUpdateRequested time.Time powerList []powertime } @@ -143,7 +153,10 @@ func handleTemp(obj map[string]interface{}) { // do the label update here tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius) // here, we might have any number of temp values some not covered by our own objects - tempExpire[[2]string{"celsius", location}] = now + //tempExpire[[2]string{"celsius", location}] = now + // create hashed labels for later expiry + tempHash := labelValHash{labelValues: []string{"celsius", location}} + tempExpire[tempHash.getHash()] = now } @@ -177,14 +190,14 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { dataMap["location"] = data["location"].(string) logger.V(2).Info("Sending reprocessed temperature") // hardcoded temp routingKey for this type of measurement - sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true} + sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "temp", IncludeDate: true} sendChannel <- sendThis } else if senseType == "motion" { dataMap["motion_detected"] = data["state"].(bool) // copied from above, but it might change later so shrug dataMap["location"] = data["location"].(string) logger.V(2).Info("Sending reprocessed motion") - sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true} + sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "motion", IncludeDate: true} sendChannel <- sendThis } else { logger.Info("Sense type not detected, ignoring") @@ -210,13 +223,13 @@ func (dev *powerdevice) shouldUpdate(now time.Time) bool { func handlePower(obj map[string]interface{}, sendChannel chan helper.RabbitSend, triggerDevice *powerdevice) { now := time.Now().UTC() logger.V(1).Info("Power data received", "obj", obj) - //name := triggerDevice.powerName - // here, we're directly tracking power devices so we can bind updated directly to the object - triggerDevice.lastUpdate = now if triggerDevice.shouldUpdate(now) { triggerDevice.add(now, obj["power"].(float64)) powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": triggerDevice.friendlyName, "ePDUOutletStatusOutletName": triggerDevice.powerName}).Set(triggerDevice.avgPower()) - // don't worry about expiration, since we have all the values in our objects + // here, we have unique objects, but we're forced to be more permissive for temperature so might as well do the same here + tempHash := labelValHash{labelValues: []string{triggerDevice.friendlyName, triggerDevice.powerName}} + powerExpire[tempHash.getHash()] = now + } } @@ -308,8 +321,8 @@ func powerLoop(sendChannel chan helper.RabbitSend, devices []device) { func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) { for sendItem := range channel { - helper.SendData(sendItem, rabbit, true) - //helper.SendData(sendData, rabbit, false) + //helper.SendData(sendItem, rabbit, true) + helper.SendData(sendItem, rabbit, false) } } @@ -319,23 +332,23 @@ func promSetup() { http.ListenAndServe(":9099", nil) } -func expireStaleMetrics() { +func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.GaugeVec) { now := time.Now().UTC() - // tempExpire - tempExpireValues := make([][2]string, 0) - for labels, lastUpdate := range tempExpire { + expireMapValues := make([]string, 0) + for labels, lastUpdate := range expireMap { curStale := now.Sub(lastUpdate) if curStale > staleTime { - logger.Info("stale metric update time, expiring labels", "curStale", curStale, "labels", labels, "lastUpdate", lastUpdate) - tempExpireValues = append(tempExpireValues, labels) + logger.Info("stale temp metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate) + expireMapValues = append(expireMapValues, labels) } } - - // now we have the values to delete - for _, labels := range tempExpireValues { - complete := tempGauge.DeleteLabelValues(labels[:]...) + // now actually go through and delete labels and then clean up our entry + for _, labels := range expireMapValues { + // need to make this back into a slice for deletion + deleteSlice := strings.Split(labels, ",") + complete := expireGauge.DeleteLabelValues(deleteSlice...) if complete { - delete(tempExpire, labels) + delete(expireMap, labels) } else { logger.Info("unable to delete metrics, exiting") os.Exit(1) @@ -344,6 +357,11 @@ func expireStaleMetrics() { } +func expireStaleMetrics() { + expireStaleMetric(tempExpire, tempGauge) + expireStaleMetric(powerExpire, powerGauge) +} + func main() { // logging and flag initialization flag.StringVar(&configFilename, "config", "", "the config filename") @@ -376,6 +394,9 @@ func main() { // power devices devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true)) + devices = append(devices, newpowerdevice("0x00158d0002a9ca3a", "Office", true)) + devices = append(devices, newpowerdevice("0x00158d0002a9a1c7", "Hallway", true)) + devices = append(devices, newpowerdevice("mobile_power", "Mobile Power", false)) //currentTemp, err := fetchTemp(weatherStation) channel := make(chan helper.RabbitSend) -- 2.30.2