From: jweigele Date: Mon, 21 Aug 2023 01:03:10 +0000 (-0700) Subject: Update wunder to pull AQI data for a location X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=7fbf0acd7429df4120a4b5fe121e98b7d224f990;p=rabbit_go Update wunder to pull AQI data for a location Secondary update in reprocess to do the gauge set from the rabbitmq topic, and just process PM2.5 and resend in the main function (since we don't have PM2.5 in Wunderground data and I don't want to convert back) --- diff --git a/reprocess/main.go b/reprocess/main.go index c5b0637..1701f0a 100644 --- a/reprocess/main.go +++ b/reprocess/main.go @@ -251,6 +251,20 @@ func handleTemp(obj map[string]interface{}) { } +func handleAQI(obj map[string]interface{}) { + now := time.Now().UTC() + // do the label update here + //pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25) + aqiGauge.With(prometheus.Labels{"location": obj["location"].(string)}).Set(obj["aqi"].(float64)) + + expireMutex.Lock() + defer expireMutex.Unlock() + + aqiExpire[obj["location"].(string)] = now + //pm25Expire[location] = now + +} + func processPM25(location string, pm25 float64, sendChannel chan helper.RabbitSend) { // should have the senseType setup correctly now, if we're still here dataMap := make(map[string]interface{}, 0) @@ -267,17 +281,6 @@ func processPM25(location string, pm25 float64, sendChannel chan helper.RabbitSe sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "aqi", IncludeDate: true} sendChannel <- sendThis - // we don't want to have to loop back around on rabbit processing, so just set the gauge here - now := time.Now().UTC() - // do the label update here - pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25) - aqiGauge.With(prometheus.Labels{"location": location}).Set(aqi) - - expireMutex.Lock() - defer expireMutex.Unlock() - - aqiExpire[location] = now - pm25Expire[location] = now } @@ -492,6 +495,13 @@ func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.Ra os.Exit(1) } + /// this one is hardcoded, for the generic AQI output + err = helper.Bind("aqi", &rabbit) + if err != nil { + logger.Error(err, "unable to bind successfully to exchange", "routingKey", "aqi") + os.Exit(1) + } + deliveries, err := helper.StartConsuming(rabbit) if err != nil { logger.Error(err, "unable to start consuming data from rabbit") @@ -515,6 +525,8 @@ func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.Ra logger.V(2).Info("Received a delivery", "delivery", delivery) if delivery.RoutingKey == "temp" { handleTemp(item) + } else if delivery.RoutingKey == "aqi" { + handleAQI(item) } for _, device := range devices { if device.getRoutingKey() == delivery.RoutingKey { diff --git a/wunder/main.go b/wunder/main.go index 9e4a139..9f9588f 100644 --- a/wunder/main.go +++ b/wunder/main.go @@ -58,12 +58,40 @@ func extractTemp(htmlReader io.Reader) (float64, error) { } }) if !foundTemp { - return currentTemp, fmt.Errorf("An error occurred while parsing html data") + return currentTemp, fmt.Errorf("An error occurred while parsing temp html data") } return currentTemp, nil } +func extractAQI(htmlReader io.Reader) (float64, error) { + foundAQI := false + var currentAQI float64 + doc, err := goquery.NewDocumentFromReader(htmlReader) + if err != nil { + return currentAQI, err + } + + // this is the span class where the AQI data lives (buried in it though) + doc.Find(".aqi-value").Each(func(i int, s *goquery.Selection) { + class, _ := s.Attr("class") + + if class == "aqi-value" { + parsedAQI, err := strconv.ParseFloat(s.Text(), 64) + if err == nil { + currentAQI = parsedAQI + logger.Info("Found some aqi value", "aqi-value", s.Text()) + foundAQI = true + } + } + }) + if !foundAQI { + return currentAQI, fmt.Errorf("An error occured while parsing AQI html data") + } + return currentAQI, nil + +} + func fetchTemp(weatherStation string) (float64, error) { const baseURL string = "https://www.wunderground.com/weather/us/wa/seattle" client := &http.Client{ @@ -86,6 +114,28 @@ func fetchTemp(weatherStation string) (float64, error) { } +func fetchAQI(weatherStation string) (float64, error) { + const baseURL string = "https://www.wunderground.com/health/us/wa/seattle" + const suffixURL string = "?cm_ven=localwx_modaq" + client := &http.Client{ + Timeout: time.Second * 10, + } + fullURL := fmt.Sprintf("%s/%s/%s", baseURL, weatherStation, suffixURL) + logger.Info("Fetching full url", "url", fullURL) + req, err := http.NewRequest("GET", fullURL, nil) + if err != nil { + return 0, err + } + req.Header.Set("user-agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0") + res, err := client.Do(req) + if err != nil { + return 0, err + } + defer res.Body.Close() + // just return whatever we get back by passing through the html + return extractAQI(res.Body) +} + func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error { tempList := make([]float64, 0) for _ = range channel { @@ -111,6 +161,17 @@ func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfi logger.V(2).Info("updated templist and average", "tempList", tempList, "tempAvg", tempAvg) } } + currentAQI, err := fetchAQI(weatherStation) + if err != nil { + logger.Error(err, "Wasn't able to get AQI, ignoring") + } else { + logger.Info("got AQI", "currentAQI", currentAQI) + if currentAQI > 500 || currentAQI < 0 { + logger.Error(nil, "AQI way out of wack, not reporting", "currentAQI", currentAQI) + } else { + sendAQI(currentAQI, rabbit) + } + } } return nil @@ -130,6 +191,19 @@ func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig } } +func sendAQI(currentAQI float64, rabbit helper.RabbitConfig) { + sendObj := make(map[string]interface{}) + sendObj["aqi"] = currentAQI + // just a constant now + sendObj["location"] = "Outside (Wunderground)" + rabbitData := helper.RabbitSend{Data: sendObj, IncludeDate: true, RoutingKey: "aqi"} + err := helper.SendData(rabbitData, rabbit, true) + if err != nil { + logger.Error(err, "Unable to send data, exiting!") + os.Exit(1) + } +} + func main() { // logging and flag initialization flag.StringVar(&configFilename, "config", "", "the config filename")