}
+func handleAQI(obj map[string]interface{}) {
+ now := time.Now().UTC()
+ // do the label update here
+ //pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25)
+ aqiGauge.With(prometheus.Labels{"location": obj["location"].(string)}).Set(obj["aqi"].(float64))
+
+ expireMutex.Lock()
+ defer expireMutex.Unlock()
+
+ aqiExpire[obj["location"].(string)] = now
+ //pm25Expire[location] = now
+
+}
+
func processPM25(location string, pm25 float64, sendChannel chan helper.RabbitSend) {
// should have the senseType setup correctly now, if we're still here
dataMap := make(map[string]interface{}, 0)
sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "aqi", IncludeDate: true}
sendChannel <- sendThis
- // we don't want to have to loop back around on rabbit processing, so just set the gauge here
- now := time.Now().UTC()
- // do the label update here
- pm25Gauge.With(prometheus.Labels{"location": location}).Set(pm25)
- aqiGauge.With(prometheus.Labels{"location": location}).Set(aqi)
-
- expireMutex.Lock()
- defer expireMutex.Unlock()
-
- aqiExpire[location] = now
- pm25Expire[location] = now
}
os.Exit(1)
}
+ /// this one is hardcoded, for the generic AQI output
+ err = helper.Bind("aqi", &rabbit)
+ if err != nil {
+ logger.Error(err, "unable to bind successfully to exchange", "routingKey", "aqi")
+ os.Exit(1)
+ }
+
deliveries, err := helper.StartConsuming(rabbit)
if err != nil {
logger.Error(err, "unable to start consuming data from rabbit")
logger.V(2).Info("Received a delivery", "delivery", delivery)
if delivery.RoutingKey == "temp" {
handleTemp(item)
+ } else if delivery.RoutingKey == "aqi" {
+ handleAQI(item)
}
for _, device := range devices {
if device.getRoutingKey() == delivery.RoutingKey {
}
})
if !foundTemp {
- return currentTemp, fmt.Errorf("An error occurred while parsing html data")
+ return currentTemp, fmt.Errorf("An error occurred while parsing temp html data")
}
return currentTemp, nil
}
+func extractAQI(htmlReader io.Reader) (float64, error) {
+ foundAQI := false
+ var currentAQI float64
+ doc, err := goquery.NewDocumentFromReader(htmlReader)
+ if err != nil {
+ return currentAQI, err
+ }
+
+ // this is the span class where the AQI data lives (buried in it though)
+ doc.Find(".aqi-value").Each(func(i int, s *goquery.Selection) {
+ class, _ := s.Attr("class")
+
+ if class == "aqi-value" {
+ parsedAQI, err := strconv.ParseFloat(s.Text(), 64)
+ if err == nil {
+ currentAQI = parsedAQI
+ logger.Info("Found some aqi value", "aqi-value", s.Text())
+ foundAQI = true
+ }
+ }
+ })
+ if !foundAQI {
+ return currentAQI, fmt.Errorf("An error occured while parsing AQI html data")
+ }
+ return currentAQI, nil
+
+}
+
func fetchTemp(weatherStation string) (float64, error) {
const baseURL string = "https://www.wunderground.com/weather/us/wa/seattle"
client := &http.Client{
}
+func fetchAQI(weatherStation string) (float64, error) {
+ const baseURL string = "https://www.wunderground.com/health/us/wa/seattle"
+ const suffixURL string = "?cm_ven=localwx_modaq"
+ client := &http.Client{
+ Timeout: time.Second * 10,
+ }
+ fullURL := fmt.Sprintf("%s/%s/%s", baseURL, weatherStation, suffixURL)
+ logger.Info("Fetching full url", "url", fullURL)
+ req, err := http.NewRequest("GET", fullURL, nil)
+ if err != nil {
+ return 0, err
+ }
+ req.Header.Set("user-agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0")
+ res, err := client.Do(req)
+ if err != nil {
+ return 0, err
+ }
+ defer res.Body.Close()
+ // just return whatever we get back by passing through the html
+ return extractAQI(res.Body)
+}
+
func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error {
tempList := make([]float64, 0)
for _ = range channel {
logger.V(2).Info("updated templist and average", "tempList", tempList, "tempAvg", tempAvg)
}
}
+ currentAQI, err := fetchAQI(weatherStation)
+ if err != nil {
+ logger.Error(err, "Wasn't able to get AQI, ignoring")
+ } else {
+ logger.Info("got AQI", "currentAQI", currentAQI)
+ if currentAQI > 500 || currentAQI < 0 {
+ logger.Error(nil, "AQI way out of wack, not reporting", "currentAQI", currentAQI)
+ } else {
+ sendAQI(currentAQI, rabbit)
+ }
+ }
}
return nil
}
}
+func sendAQI(currentAQI float64, rabbit helper.RabbitConfig) {
+ sendObj := make(map[string]interface{})
+ sendObj["aqi"] = currentAQI
+ // just a constant now
+ sendObj["location"] = "Outside (Wunderground)"
+ rabbitData := helper.RabbitSend{Data: sendObj, IncludeDate: true, RoutingKey: "aqi"}
+ err := helper.SendData(rabbitData, rabbit, true)
+ if err != nil {
+ logger.Error(err, "Unable to send data, exiting!")
+ os.Exit(1)
+ }
+}
+
func main() {
// logging and flag initialization
flag.StringVar(&configFilename, "config", "", "the config filename")