Update wunder to pull AQI data for a location
authorjweigele <jweigele@local>
Mon, 21 Aug 2023 01:03:10 +0000 (18:03 -0700)
committerjweigele <jweigele@local>
Mon, 21 Aug 2023 01:03:10 +0000 (18:03 -0700)
Secondary update in reprocess to do the gauge set from the rabbitmq
topic, and just process PM2.5 and resend in the main function (since we
don't have PM2.5 in Wunderground data and I don't want to convert back)

reprocess/main.go
wunder/main.go

index c5b06377d68cdbb476559bdcdc299add4717d94b..1701f0afdd9299878028080f25d917ca70db0b38 100644 (file)
@@ -251,6 +251,20 @@ func handleTemp(obj map[string]interface{}) {
 
 }
 
+func handleAQI(obj map[string]interface{}) {
+       now := time.Now().UTC()
+       // do the label update here
+       //pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25)
+       aqiGauge.With(prometheus.Labels{"location": obj["location"].(string)}).Set(obj["aqi"].(float64))
+
+       expireMutex.Lock()
+       defer expireMutex.Unlock()
+
+       aqiExpire[obj["location"].(string)] = now
+       //pm25Expire[location] = now
+
+}
+
 func processPM25(location string, pm25 float64, sendChannel chan helper.RabbitSend) {
        // should have the senseType setup correctly now, if we're still here
        dataMap := make(map[string]interface{}, 0)
@@ -267,17 +281,6 @@ func processPM25(location string, pm25 float64, sendChannel chan helper.RabbitSe
 
        sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "aqi", IncludeDate: true}
        sendChannel <- sendThis
-       // we don't want to have to loop back around on rabbit processing, so just set the gauge here
-       now := time.Now().UTC()
-       // do the label update here
-       pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25)
-       aqiGauge.With(prometheus.Labels{"location": location}).Set(aqi)
-
-       expireMutex.Lock()
-       defer expireMutex.Unlock()
-
-       aqiExpire[location] = now
-       pm25Expire[location] = now
 
 }
 
@@ -492,6 +495,13 @@ func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.Ra
                os.Exit(1)
        }
 
+       /// this one is hardcoded, for the generic AQI output
+       err = helper.Bind("aqi", &rabbit)
+       if err != nil {
+               logger.Error(err, "unable to bind successfully to exchange", "routingKey", "aqi")
+               os.Exit(1)
+       }
+
        deliveries, err := helper.StartConsuming(rabbit)
        if err != nil {
                logger.Error(err, "unable to start consuming data from rabbit")
@@ -515,6 +525,8 @@ func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.Ra
                logger.V(2).Info("Received a delivery", "delivery", delivery)
                if delivery.RoutingKey == "temp" {
                        handleTemp(item)
+               } else if delivery.RoutingKey == "aqi" {
+                       handleAQI(item)
                }
                for _, device := range devices {
                        if device.getRoutingKey() == delivery.RoutingKey {
index 9e4a139093865b9980a6fad657de6169a97c7cfd..9f9588ff624a40a2db4e715e1b986b7c9ee25802 100644 (file)
@@ -58,12 +58,40 @@ func extractTemp(htmlReader io.Reader) (float64, error) {
                }
        })
        if !foundTemp {
-               return currentTemp, fmt.Errorf("An error occurred while parsing html data")
+               return currentTemp, fmt.Errorf("An error occurred while parsing temp html data")
        }
        return currentTemp, nil
 
 }
 
+func extractAQI(htmlReader io.Reader) (float64, error) {
+       foundAQI := false
+       var currentAQI float64
+       doc, err := goquery.NewDocumentFromReader(htmlReader)
+       if err != nil {
+               return currentAQI, err
+       }
+
+       // this is the span class where the AQI data lives (buried in it though)
+       doc.Find(".aqi-value").Each(func(i int, s *goquery.Selection) {
+               class, _ := s.Attr("class")
+
+               if class == "aqi-value" {
+                       parsedAQI, err := strconv.ParseFloat(s.Text(), 64)
+                       if err == nil {
+                               currentAQI = parsedAQI
+                               logger.Info("Found some aqi value", "aqi-value", s.Text())
+                               foundAQI = true
+                       }
+               }
+       })
+       if !foundAQI {
+               return currentAQI, fmt.Errorf("An error occured while parsing AQI html data")
+       }
+       return currentAQI, nil
+
+}
+
 func fetchTemp(weatherStation string) (float64, error) {
        const baseURL string = "https://www.wunderground.com/weather/us/wa/seattle"
        client := &http.Client{
@@ -86,6 +114,28 @@ func fetchTemp(weatherStation string) (float64, error) {
 
 }
 
+func fetchAQI(weatherStation string) (float64, error) {
+       const baseURL string = "https://www.wunderground.com/health/us/wa/seattle"
+       const suffixURL string = "?cm_ven=localwx_modaq"
+       client := &http.Client{
+               Timeout: time.Second * 10,
+       }
+       fullURL := fmt.Sprintf("%s/%s/%s", baseURL, weatherStation, suffixURL)
+       logger.Info("Fetching full url", "url", fullURL)
+       req, err := http.NewRequest("GET", fullURL, nil)
+       if err != nil {
+               return 0, err
+       }
+       req.Header.Set("user-agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0")
+       res, err := client.Do(req)
+       if err != nil {
+               return 0, err
+       }
+       defer res.Body.Close()
+       // just return whatever we get back by passing through the html
+       return extractAQI(res.Body)
+}
+
 func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error {
        tempList := make([]float64, 0)
        for _ = range channel {
@@ -111,6 +161,17 @@ func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfi
                                logger.V(2).Info("updated templist and average", "tempList", tempList, "tempAvg", tempAvg)
                        }
                }
+               currentAQI, err := fetchAQI(weatherStation)
+               if err != nil {
+                       logger.Error(err, "Wasn't able to get AQI, ignoring")
+               } else {
+                       logger.Info("got AQI", "currentAQI", currentAQI)
+                       if currentAQI > 500 || currentAQI < 0 {
+                               logger.Error(nil, "AQI way out of wack, not reporting", "currentAQI", currentAQI)
+                       } else {
+                               sendAQI(currentAQI, rabbit)
+                       }
+               }
        }
        return nil
 
@@ -130,6 +191,19 @@ func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig
        }
 }
 
+func sendAQI(currentAQI float64, rabbit helper.RabbitConfig) {
+       sendObj := make(map[string]interface{})
+       sendObj["aqi"] = currentAQI
+       // just a constant now
+       sendObj["location"] = "Outside (Wunderground)"
+       rabbitData := helper.RabbitSend{Data: sendObj, IncludeDate: true, RoutingKey: "aqi"}
+       err := helper.SendData(rabbitData, rabbit, true)
+       if err != nil {
+               logger.Error(err, "Unable to send data, exiting!")
+               os.Exit(1)
+       }
+}
+
 func main() {
        // logging and flag initialization
        flag.StringVar(&configFilename, "config", "", "the config filename")