A few smaller changes
authorjweigele <jweigele@local>
Fri, 30 Jun 2023 07:41:08 +0000 (00:41 -0700)
committerjweigele <jweigele@local>
Fri, 30 Jun 2023 07:41:08 +0000 (00:41 -0700)
 * Bugfix in fancontrol, keep it from sending a source which weirds out
   zigbee2mqtt
 * Bugfix in reprocess, don't modify the expire map at the same time as
   you're updating it from received messages (mutex)
 * Feature addition in lights - dummy mode, to just process updates but
   not take action on them (i.e. controlling lights)
 * Feature addition in reprocess, allowing it to read pm25 sensor data
   (DIY devices) and door sensor updates (off the shelf)

fancontrol/main.go
go.mod
lights/main.go
reprocess/main.go

index 32c73fbe1b11d6ae7b05947cc191854536e7af2a..c5b2ab5796ecaf871ad68bd666d301220375fd11 100644 (file)
@@ -48,7 +48,7 @@ func main() {
                sendObj["state"] = "off"
        }
        for _, fan := range fanFlags {
-               data := helper.RabbitSend{Data: sendObj, RoutingKey: fmt.Sprintf("zigbee2mqtt.%s/set", fan)}
+               data := helper.RabbitSend{Data: sendObj, RoutingKey: fmt.Sprintf("zigbee2mqtt.%s/set", fan), EmptySource: true}
                err = helper.SendData(data, rabbit, false)
                // TODO: find a cleaner way to bubble this up
                if err != nil {
diff --git a/go.mod b/go.mod
index 1bbd00c5151d7ced4df73ed3c366b0cc4c207e9c..d4aa3dc4ba67db40ca57847f18bbbefced62beaf 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
        github.com/go-logr/logr v1.2.3
        github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
        github.com/lucasb-eyer/go-colorful v1.2.0
+       github.com/mrflynn/go-aqi v0.0.9
        github.com/prometheus/client_golang v1.14.0
        github.com/rabbitmq/amqp091-go v1.5.0
        golang.org/x/exp v0.0.0-20221212164502-fae10dda9338
@@ -23,6 +24,7 @@ require (
        github.com/prometheus/client_model v0.3.0 // indirect
        github.com/prometheus/common v0.37.0 // indirect
        github.com/prometheus/procfs v0.8.0 // indirect
+       github.com/shopspring/decimal v1.2.0 // indirect
        golang.org/x/net v0.3.0 // indirect
        golang.org/x/sys v0.3.0 // indirect
        google.golang.org/protobuf v1.28.1 // indirect
index a6fd6ecc5bb152c4d0713796463abfa19ff1a8bf..6b27b0fb4f288f5580bbd156498bacb11f4d276f 100644 (file)
@@ -194,6 +194,7 @@ type RGBRelay struct {
        LastUpdate        time.Time
        LastMotion        time.Time
        LastState         bool
+       Dummy             bool `yaml:"Dummy"`
        logger            logr.Logger
        sendChannel       chan helper.RabbitSend
        switches          []*Switch `yaml:"Switches"`
@@ -332,7 +333,7 @@ func (relay *RGBRelay) setSwitchesExpired(source *Switch) {
 }
 
 func (relay *RGBRelay) setPWM(red, green, blue float64) {
-       if relay.shouldUpdate() {
+       if relay.shouldUpdate() && !relay.Dummy {
                relay.sendUpdate(red, green, blue)
        }
 }
index 4558da6322f6b276d0fd790c627ac1dbce0bfd01..bd97a61c9b2659a978335c0eaa953816132c87fa 100644 (file)
@@ -7,6 +7,7 @@ import (
        "os"
        "reflect"
        "strings"
+       "sync"
        "time"
 
        // prometheus imports
@@ -16,6 +17,9 @@ import (
        "github.com/prometheus/client_golang/prometheus/promauto"
        "github.com/prometheus/client_golang/prometheus/promhttp"
 
+       // aqi calculations
+       "github.com/mrflynn/go-aqi"
+
        // logging interface
        "github.com/go-logr/logr"
 
@@ -38,9 +42,46 @@ var (
                },
                []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"},
        )
+       pm25Gauge = promauto.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "airsensor_pm25",
+                       Help: "Used to measure air PM2.5 concentration in µg/m³",
+               },
+               []string{"location"},
+       )
+
+       aqiGauge = promauto.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "airsensor_aqi",
+                       Help: "Calculated from PM2.5 concentration, US EPA AQI",
+               },
+               []string{"location"},
+       )
+
+       doorOpenCounter = promauto.NewCounterVec(
+               prometheus.CounterOpts{
+                       Name: "sensor_door_opened_count",
+                       Help: "Counts each time the door is opened at a particular location",
+               },
+               []string{"location"},
+       )
+
+       doorTamperCounter = promauto.NewCounterVec(
+               prometheus.CounterOpts{
+                       Name: "sensor_door_tamper_count",
+                       Help: "Counts each time the sensor is removed from the bracket",
+               },
+               []string{"location"},
+       )
 
        tempExpire  = make(map[string]time.Time)
        powerExpire = make(map[string]time.Time)
+       pm25Expire  = make(map[string]time.Time)
+       aqiExpire   = make(map[string]time.Time)
+
+       doorMap = make(map[string]map[string]interface{})
+
+       expireMutex sync.Mutex
 
        staleTime      = time.Duration(60 * time.Second)
        logger         logr.Logger
@@ -68,6 +109,10 @@ type diydevice struct {
        *zigdevice
 }
 
+type doordevice struct {
+       *zigdevice
+}
+
 type powertime struct {
        timeLogged time.Time
        powerValue float64
@@ -137,6 +182,14 @@ func newdiydevice(friendlyName string) *diydevice {
 
 }
 
+func newdoordevice(friendlyName string) *doordevice {
+       doorOpenCounter.With(prometheus.Labels{"location": friendlyName})
+       doorTamperCounter.With(prometheus.Labels{"location": friendlyName})
+       return &doordevice{
+               zigdevice: newdevice(friendlyName),
+       }
+}
+
 func newpowerdevice(friendlyName, powerName string, queryNeeded bool) *powerdevice {
        return &powerdevice{
                zigdevice:   newdevice(friendlyName),
@@ -157,6 +210,8 @@ func handleTemp(obj map[string]interface{}) {
        //tempExpire[[2]string{"celsius", location}] = now
        // create hashed labels for later expiry
        tempHash := labelValHash{labelValues: []string{"celsius", location}}
+       expireMutex.Lock()
+       defer expireMutex.Unlock()
        tempExpire[tempHash.getHash()] = now
 
 }
@@ -201,6 +256,34 @@ func (dev *diydevice) handleDIY(obj map[string]interface{}, sendChannel chan hel
                                logger.V(2).Info("Sending reprocessed motion")
                                sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "motion", IncludeDate: true}
                                sendChannel <- sendThis
+                       } else if senseType == "pm25" {
+                               location := data["location"].(string)
+                               pm25 := data["pm25"].(float64)
+                               dataMap["location"] = location
+                               dataMap["pm25"] = pm25
+                               results, err := aqi.Calculate(aqi.PM25{Concentration: pm25})
+                               if err != nil {
+                                       logger.Error(nil, "Unable to calculate AQI, ignoring this delivery")
+                                       return
+                               }
+
+                               aqi := results.AQI
+                               dataMap["aqi"] = aqi
+
+                               sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "aqi", IncludeDate: true}
+                               sendChannel <- sendThis
+                               // we don't want to have to loop back around on rabbit processing, so just set the gauge here
+                               now := time.Now().UTC()
+                               // do the label update here
+                               pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25)
+                               aqiGauge.With(prometheus.Labels{"location": location}).Set(aqi)
+
+                               expireMutex.Lock()
+                               defer expireMutex.Unlock()
+
+                               aqiExpire[location] = now
+                               pm25Expire[location] = now
+
                        } else {
                                logger.Info("Sense type not detected, ignoring")
                        }
@@ -233,12 +316,65 @@ func (dev *powerdevice) handlePower(obj map[string]interface{}, sendChannel chan
                powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": dev.friendlyName, "ePDUOutletStatusOutletName": dev.powerName}).Set(dev.avgPower())
                // here, we have unique objects, but we're forced to be more permissive for temperature so might as well do the same here
                tempHash := labelValHash{labelValues: []string{dev.friendlyName, dev.powerName}}
+               expireMutex.Lock()
+               defer expireMutex.Unlock()
+
                powerExpire[tempHash.getHash()] = now
 
        }
 
 }
 
+func (dev *doordevice) handleDoor(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
+       //now := time.Now().UTC()
+       logger.V(0).Info("Door data received", "obj", obj)
+       doorOpened := false
+       doorTampered := false
+
+       contact, ok := obj["contact"].(bool)
+       if !ok {
+               logger.Error(nil, "contact not found in data, ignoring", "obj", obj)
+               return
+       }
+
+       tamper, ok := obj["tamper"].(bool)
+       if !ok {
+               logger.Error(nil, "contact not found in data, ignoring", "obj", obj)
+               return
+       }
+
+       // we have both tamper and contact data, can now compare with previous data (or take as gospel if new)
+       oldData, ok := doorMap[dev.friendlyName]
+       // if the door is currently open
+       if !contact {
+               // either brand new location data (thus no state to pull from) or last data was door closed (aka contact == true)
+               // then we know door has been opened
+               if !ok || oldData["contact"].(bool) {
+                       doorOpened = true
+               }
+       }
+
+       // currently tamper is set true, sensor is away from bracket (maybe battery or whatever)
+       if tamper {
+               if !ok || !oldData["tamper"].(bool) {
+                       doorTampered = true
+               }
+       }
+
+       // increment counters where needed
+       if doorOpened {
+               doorOpenCounter.With(prometheus.Labels{"location": dev.friendlyName}).Inc()
+       }
+
+       if doorTampered {
+               doorTamperCounter.With(prometheus.Labels{"location": dev.friendlyName}).Inc()
+       }
+
+       // save data for next cycle
+       doorMap[dev.friendlyName] = obj
+
+}
+
 func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.RabbitConfig) {
        for _, device := range devices {
                err := helper.Bind(device.getRoutingKey(), &rabbit)
@@ -290,6 +426,9 @@ func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.Ra
                                        logger.V(2).Info("Power device, sending to handle", "device", device)
                                        // it's explicitly a powerdevice, so cast it before sending
                                        device.(*powerdevice).handlePower(item, channel)
+                               case *doordevice:
+                                       logger.V(2).Info("Door device, sending to handle", "device", device)
+                                       device.(*doordevice).handleDoor(item, channel)
                                default:
                                        logger.Info("Found a device we can't classify", "device", device, "type", t)
                                }
@@ -349,12 +488,15 @@ func promSetup() {
 }
 
 func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.GaugeVec) {
+       expireMutex.Lock()
+       defer expireMutex.Unlock()
+
        now := time.Now().UTC()
        expireMapValues := make([]string, 0)
        for labels, lastUpdate := range expireMap {
                curStale := now.Sub(lastUpdate)
                if curStale > staleTime {
-                       logger.Info("stale temp metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate)
+                       logger.Info("stale metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate)
                        expireMapValues = append(expireMapValues, labels)
                }
        }
@@ -376,6 +518,9 @@ func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.G
 func expireStaleMetrics() {
        expireStaleMetric(tempExpire, tempGauge)
        expireStaleMetric(powerExpire, powerGauge)
+       expireStaleMetric(pm25Expire, pm25Gauge)
+       expireStaleMetric(aqiExpire, aqiGauge)
+
 }
 
 func main() {
@@ -404,8 +549,10 @@ func main() {
        devices = append(devices, newdiydevice("office_pico"))
        devices = append(devices, newdiydevice("stairway_pico"))
        devices = append(devices, newdiydevice("downstairs_stairway_pico"))
+       devices = append(devices, newdiydevice("downstairs_pico"))
        devices = append(devices, newdiydevice("upstairs_pico"))
        devices = append(devices, newdiydevice("data_closet_pico"))
+       devices = append(devices, newdiydevice("aqi_pico"))
 
        // power devices
        devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true))
@@ -413,6 +560,10 @@ func main() {
        devices = append(devices, newpowerdevice("0x00158d0002a9a1c7", "Hallway", true))
        devices = append(devices, newpowerdevice("mobile_power", "Mobile Power", false))
 
+       // door devices
+       devices = append(devices, newdoordevice("front_door"))
+       devices = append(devices, newdoordevice("data_door"))
+
        //currentTemp, err := fetchTemp(weatherStation)
        channel := make(chan helper.RabbitSend)