"os"
"reflect"
"strings"
+ "sync"
"time"
// prometheus imports
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
+ // aqi calculations
+ "github.com/mrflynn/go-aqi"
+
// logging interface
"github.com/go-logr/logr"
},
[]string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"},
)
+ pm25Gauge = promauto.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "airsensor_pm25",
+ Help: "Used to measure air PM2.5 concentration in µg/m³",
+ },
+ []string{"location"},
+ )
+
+ aqiGauge = promauto.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "airsensor_aqi",
+ Help: "Calculated from PM2.5 concentration, US EPA AQI",
+ },
+ []string{"location"},
+ )
+
+ doorOpenCounter = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "sensor_door_opened_count",
+ Help: "Counts each time the door is opened at a particular location",
+ },
+ []string{"location"},
+ )
+
+ doorTamperCounter = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "sensor_door_tamper_count",
+ Help: "Counts each time the sensor is removed from the bracket",
+ },
+ []string{"location"},
+ )
tempExpire = make(map[string]time.Time)
powerExpire = make(map[string]time.Time)
+ pm25Expire = make(map[string]time.Time)
+ aqiExpire = make(map[string]time.Time)
+
+ doorMap = make(map[string]map[string]interface{})
+
+ expireMutex sync.Mutex
staleTime = time.Duration(60 * time.Second)
logger logr.Logger
*zigdevice
}
+type doordevice struct {
+ *zigdevice
+}
+
type powertime struct {
timeLogged time.Time
powerValue float64
}
+func newdoordevice(friendlyName string) *doordevice {
+ doorOpenCounter.With(prometheus.Labels{"location": friendlyName})
+ doorTamperCounter.With(prometheus.Labels{"location": friendlyName})
+ return &doordevice{
+ zigdevice: newdevice(friendlyName),
+ }
+}
+
func newpowerdevice(friendlyName, powerName string, queryNeeded bool) *powerdevice {
return &powerdevice{
zigdevice: newdevice(friendlyName),
//tempExpire[[2]string{"celsius", location}] = now
// create hashed labels for later expiry
tempHash := labelValHash{labelValues: []string{"celsius", location}}
+ expireMutex.Lock()
+ defer expireMutex.Unlock()
tempExpire[tempHash.getHash()] = now
}
logger.V(2).Info("Sending reprocessed motion")
sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "motion", IncludeDate: true}
sendChannel <- sendThis
+ } else if senseType == "pm25" {
+ location := data["location"].(string)
+ pm25 := data["pm25"].(float64)
+ dataMap["location"] = location
+ dataMap["pm25"] = pm25
+ results, err := aqi.Calculate(aqi.PM25{Concentration: pm25})
+ if err != nil {
+ logger.Error(nil, "Unable to calculate AQI, ignoring this delivery")
+ return
+ }
+
+ aqi := results.AQI
+ dataMap["aqi"] = aqi
+
+ sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "aqi", IncludeDate: true}
+ sendChannel <- sendThis
+ // we don't want to have to loop back around on rabbit processing, so just set the gauge here
+ now := time.Now().UTC()
+ // do the label update here
+ pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25)
+ aqiGauge.With(prometheus.Labels{"location": location}).Set(aqi)
+
+ expireMutex.Lock()
+ defer expireMutex.Unlock()
+
+ aqiExpire[location] = now
+ pm25Expire[location] = now
+
} else {
logger.Info("Sense type not detected, ignoring")
}
powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": dev.friendlyName, "ePDUOutletStatusOutletName": dev.powerName}).Set(dev.avgPower())
// here, we have unique objects, but we're forced to be more permissive for temperature so might as well do the same here
tempHash := labelValHash{labelValues: []string{dev.friendlyName, dev.powerName}}
+ expireMutex.Lock()
+ defer expireMutex.Unlock()
+
powerExpire[tempHash.getHash()] = now
}
}
+func (dev *doordevice) handleDoor(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
+ //now := time.Now().UTC()
+ logger.V(0).Info("Door data received", "obj", obj)
+ doorOpened := false
+ doorTampered := false
+
+ contact, ok := obj["contact"].(bool)
+ if !ok {
+ logger.Error(nil, "contact not found in data, ignoring", "obj", obj)
+ return
+ }
+
+ tamper, ok := obj["tamper"].(bool)
+ if !ok {
+ logger.Error(nil, "contact not found in data, ignoring", "obj", obj)
+ return
+ }
+
+ // we have both tamper and contact data, can now compare with previous data (or take as gospel if new)
+ oldData, ok := doorMap[dev.friendlyName]
+ // if the door is currently open
+ if !contact {
+ // either brand new location data (thus no state to pull from) or last data was door closed (aka contact == true)
+ // then we know door has been opened
+ if !ok || oldData["contact"].(bool) {
+ doorOpened = true
+ }
+ }
+
+ // currently tamper is set true, sensor is away from bracket (maybe battery or whatever)
+ if tamper {
+ if !ok || !oldData["tamper"].(bool) {
+ doorTampered = true
+ }
+ }
+
+ // increment counters where needed
+ if doorOpened {
+ doorOpenCounter.With(prometheus.Labels{"location": dev.friendlyName}).Inc()
+ }
+
+ if doorTampered {
+ doorTamperCounter.With(prometheus.Labels{"location": dev.friendlyName}).Inc()
+ }
+
+ // save data for next cycle
+ doorMap[dev.friendlyName] = obj
+
+}
+
func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.RabbitConfig) {
for _, device := range devices {
err := helper.Bind(device.getRoutingKey(), &rabbit)
logger.V(2).Info("Power device, sending to handle", "device", device)
// it's explicitly a powerdevice, so cast it before sending
device.(*powerdevice).handlePower(item, channel)
+ case *doordevice:
+ logger.V(2).Info("Door device, sending to handle", "device", device)
+ device.(*doordevice).handleDoor(item, channel)
default:
logger.Info("Found a device we can't classify", "device", device, "type", t)
}
}
func expireStaleMetric(expireMap map[string]time.Time, expireGauge *prometheus.GaugeVec) {
+ expireMutex.Lock()
+ defer expireMutex.Unlock()
+
now := time.Now().UTC()
expireMapValues := make([]string, 0)
for labels, lastUpdate := range expireMap {
curStale := now.Sub(lastUpdate)
if curStale > staleTime {
- logger.Info("stale temp metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate)
+ logger.Info("stale metric update time, expiring labels", "curStale", curStale.String(), "labels", strings.Split(labels, ","), "lastUpdate", lastUpdate)
expireMapValues = append(expireMapValues, labels)
}
}
func expireStaleMetrics() {
expireStaleMetric(tempExpire, tempGauge)
expireStaleMetric(powerExpire, powerGauge)
+ expireStaleMetric(pm25Expire, pm25Gauge)
+ expireStaleMetric(aqiExpire, aqiGauge)
+
}
func main() {
devices = append(devices, newdiydevice("office_pico"))
devices = append(devices, newdiydevice("stairway_pico"))
devices = append(devices, newdiydevice("downstairs_stairway_pico"))
+ devices = append(devices, newdiydevice("downstairs_pico"))
devices = append(devices, newdiydevice("upstairs_pico"))
devices = append(devices, newdiydevice("data_closet_pico"))
+ devices = append(devices, newdiydevice("aqi_pico"))
// power devices
devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true))
devices = append(devices, newpowerdevice("0x00158d0002a9a1c7", "Hallway", true))
devices = append(devices, newpowerdevice("mobile_power", "Mobile Power", false))
+ // door devices
+ devices = append(devices, newdoordevice("front_door"))
+ devices = append(devices, newdoordevice("data_door"))
+
//currentTemp, err := fetchTemp(weatherStation)
channel := make(chan helper.RabbitSend)