From: jweigele Date: Fri, 30 Jun 2023 07:41:08 +0000 (-0700) Subject: A few smaller changes X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=74350c23f712c7a537acd7dbf8eb9b4eaf360276;p=rabbit_go A few smaller changes * Bugfix in fancontrol, keep it from sending a source which weirds out zigbee2mqtt * Bugfix in reprocess, don't modify the expire map at the same time as you're updating it from received messages (mutex) * Feature addition in lights - dummy mode, to just process updates but not take action on them (i.e. controlling lights) * Feature addition in reprocess, allowing it to read pm25 sensor data (DIY devices) and door sensor updates (off the shelf) --- diff --git a/fancontrol/main.go b/fancontrol/main.go index 32c73fb..c5b2ab5 100644 --- a/fancontrol/main.go +++ b/fancontrol/main.go @@ -48,7 +48,7 @@ func main() { sendObj["state"] = "off" } for _, fan := range fanFlags { - data := helper.RabbitSend{Data: sendObj, RoutingKey: fmt.Sprintf("zigbee2mqtt.%s/set", fan)} + data := helper.RabbitSend{Data: sendObj, RoutingKey: fmt.Sprintf("zigbee2mqtt.%s/set", fan), EmptySource: true} err = helper.SendData(data, rabbit, false) // TODO: find a cleaner way to bubble this up if err != nil { diff --git a/go.mod b/go.mod index 1bbd00c..d4aa3dc 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-logr/logr v1.2.3 github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d github.com/lucasb-eyer/go-colorful v1.2.0 + github.com/mrflynn/go-aqi v0.0.9 github.com/prometheus/client_golang v1.14.0 github.com/rabbitmq/amqp091-go v1.5.0 golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 @@ -23,6 +24,7 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect + github.com/shopspring/decimal v1.2.0 // indirect golang.org/x/net v0.3.0 // indirect golang.org/x/sys v0.3.0 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/lights/main.go b/lights/main.go index a6fd6ec..6b27b0f 100644 --- a/lights/main.go +++ b/lights/main.go @@ -194,6 +194,7 @@ type RGBRelay struct { LastUpdate time.Time LastMotion time.Time LastState bool + Dummy bool `yaml:"Dummy"` logger logr.Logger sendChannel chan helper.RabbitSend switches []*Switch `yaml:"Switches"` @@ -332,7 +333,7 @@ func (relay *RGBRelay) setSwitchesExpired(source *Switch) { } func (relay *RGBRelay) setPWM(red, green, blue float64) { - if relay.shouldUpdate() { + if relay.shouldUpdate() && !relay.Dummy { relay.sendUpdate(red, green, blue) } } diff --git a/reprocess/main.go b/reprocess/main.go index 4558da6..bd97a61 100644 --- a/reprocess/main.go +++ b/reprocess/main.go @@ -7,6 +7,7 @@ import ( "os" "reflect" "strings" + "sync" "time" // prometheus imports @@ -16,6 +17,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + // aqi calculations + "github.com/mrflynn/go-aqi" + // logging interface "github.com/go-logr/logr" @@ -38,9 +42,46 @@ var ( }, []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"}, ) + pm25Gauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "airsensor_pm25", + Help: "Used to measure air PM2.5 concentration in µg/m³", + }, + []string{"location"}, + ) + + aqiGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "airsensor_aqi", + Help: "Calculated from PM2.5 concentration, US EPA AQI", + }, + []string{"location"}, + ) + + doorOpenCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "sensor_door_opened_count", + Help: "Counts each time the door is opened at a particular location", + }, + []string{"location"}, + ) + + doorTamperCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "sensor_door_tamper_count", + Help: "Counts each time the sensor is removed from the bracket", + }, + []string{"location"}, + ) tempExpire = make(map[string]time.Time) powerExpire = make(map[string]time.Time) + pm25Expire = make(map[string]time.Time) + aqiExpire = make(map[string]time.Time) + + doorMap = make(map[string]map[string]interface{}) + + expireMutex sync.Mutex staleTime = time.Duration(60 * time.Second) logger logr.Logger @@ -68,6 +109,10 @@ type diydevice struct { *zigdevice } +type doordevice struct { + *zigdevice +} + type powertime struct { timeLogged time.Time powerValue float64 @@ -137,6 +182,14 @@ func newdiydevice(friendlyName string) *diydevice { } +func newdoordevice(friendlyName string) *doordevice { + doorOpenCounter.With(prometheus.Labels{"location": friendlyName}) + doorTamperCounter.With(prometheus.Labels{"location": friendlyName}) + return &doordevice{ + zigdevice: newdevice(friendlyName), + } +} + func newpowerdevice(friendlyName, powerName string, queryNeeded bool) *powerdevice { return &powerdevice{ zigdevice: newdevice(friendlyName), @@ -157,6 +210,8 @@ func handleTemp(obj map[string]interface{}) { //tempExpire[[2]string{"celsius", location}] = now // create hashed labels for later expiry tempHash := labelValHash{labelValues: []string{"celsius", location}} + expireMutex.Lock() + defer expireMutex.Unlock() tempExpire[tempHash.getHash()] = now } @@ -201,6 +256,34 @@ func (dev *diydevice) handleDIY(obj map[string]interface{}, sendChannel chan hel logger.V(2).Info("Sending reprocessed motion") sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "motion", IncludeDate: true} sendChannel <- sendThis + } else if senseType == "pm25" { + location := data["location"].(string) + pm25 := data["pm25"].(float64) + dataMap["location"] = location + dataMap["pm25"] = pm25 + results, err := aqi.Calculate(aqi.PM25{Concentration: pm25}) + if err != nil { + logger.Error(nil, "Unable to calculate AQI, ignoring this delivery") + return + } + + aqi := results.AQI + dataMap["aqi"] = aqi + + sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "aqi", IncludeDate: true} + sendChannel <- sendThis + // we don't want to have to loop back around on rabbit processing, so just set the gauge here + now := time.Now().UTC() + // do the label update here + pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25) + aqiGauge.With(prometheus.Labels{"location": location}).Set(aqi) + + expireMutex.Lock() + defer expireMutex.Unlock() + + aqiExpire[location] = now + pm25Expire[location] = now + } else { logger.Info("Sense type not detected, ignoring") } @@ -233,12 +316,65 @@ func (dev *powerdevice) handlePower(obj map[string]interface{}, sendChannel chan powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": dev.friendlyName, "ePDUOutletStatusOutletName": dev.powerName}).Set(dev.avgPower()) // here, we have unique objects, but we're forced to be more permissive for temperature so might as well do the same here tempHash := labelValHash{labelValues: []string{dev.friendlyName, dev.powerName}} + expireMutex.Lock() + defer expireMutex.Unlock() + powerExpire[tempHash.getHash()] = now } } +func (dev *doordevice) handleDoor(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { + //now := time.Now().UTC() + logger.V(0).Info("Door data received", "obj", obj) + doorOpened := false + doorTampered := false + + contact, ok := obj["contact"].(bool) + if !ok { + logger.Error(nil, "contact not found in data, ignoring", "obj", obj) + return + } + + tamper, ok := obj["tamper"].(bool) + if !ok { + logger.Error(nil, "contact not found in data, ignoring", "obj", obj) + return + } + + // we have both tamper and contact data, can now compare with previous data (or take as gospel if new) + oldData, ok := doorMap[dev.friendlyName] + // if the door is currently open + if !contact { + // either brand new location data (thus no state to pull from) or last data was door closed (aka contact == true) + // then we know door has been opened + if !ok || oldData["contact"].(bool) { + doorOpened = true + } + } + + // currently tamper is set true, sensor is away from bracket (maybe battery or whatever) + if tamper { + if !ok || !oldData["tamper"].(bool) { + doorTampered = true + } + } + + // increment counters where needed + if doorOpened { + doorOpenCounter.With(prometheus.Labels{"location": dev.friendlyName}).Inc() + } + + if doorTampered { + doorTamperCounter.With(prometheus.Labels{"location": dev.friendlyName}).Inc() + } + + // save data for next cycle + doorMap[dev.friendlyName] = obj + +} + func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.RabbitConfig) { for _, device := range devices { err := helper.Bind(device.getRoutingKey(), &rabbit) @@ -290,6 +426,9 @@ func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.Ra logger.V(2).Info("Power device, sending to handle", "device", device) // it's explicitly a powerdevice, so cast it before sending device.(*powerdevice).handlePower(item, channel) + case *doordevice: + logger.V(2).Info("Door device, sending to handle", "device", device) + device.(*doordevice).handleDoor(item, channel) default: logger.Info("Found a device we can't classify", "device", device, "type", t) } @@ -349,12 +488,15 @@ func promSetup() { } func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.GaugeVec) { + expireMutex.Lock() + defer expireMutex.Unlock() + now := time.Now().UTC() expireMapValues := make([]string, 0) for labels, lastUpdate := range expireMap { curStale := now.Sub(lastUpdate) if curStale > staleTime { - logger.Info("stale temp metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate) + logger.Info("stale metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate) expireMapValues = append(expireMapValues, labels) } } @@ -376,6 +518,9 @@ func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.G func expireStaleMetrics() { expireStaleMetric(tempExpire, tempGauge) expireStaleMetric(powerExpire, powerGauge) + expireStaleMetric(pm25Expire, pm25Gauge) + expireStaleMetric(aqiExpire, aqiGauge) + } func main() { @@ -404,8 +549,10 @@ func main() { devices = append(devices, newdiydevice("office_pico")) devices = append(devices, newdiydevice("stairway_pico")) devices = append(devices, newdiydevice("downstairs_stairway_pico")) + devices = append(devices, newdiydevice("downstairs_pico")) devices = append(devices, newdiydevice("upstairs_pico")) devices = append(devices, newdiydevice("data_closet_pico")) + devices = append(devices, newdiydevice("aqi_pico")) // power devices devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true)) @@ -413,6 +560,10 @@ func main() { devices = append(devices, newpowerdevice("0x00158d0002a9a1c7", "Hallway", true)) devices = append(devices, newpowerdevice("mobile_power", "Mobile Power", false)) + // door devices + devices = append(devices, newdoordevice("front_door")) + devices = append(devices, newdoordevice("data_door")) + //currentTemp, err := fetchTemp(weatherStation) channel := make(chan helper.RabbitSend)