First working run!
authorjweigele <jweigele@local>
Tue, 6 Dec 2022 04:03:59 +0000 (20:03 -0800)
committerjweigele <jweigele@local>
Tue, 6 Dec 2022 04:03:59 +0000 (20:03 -0800)
 * Rename the module slightly for future compat (I think?)
 * Connects to the rabbitmq server and sends messages like we'd want
 * Some values hardcoded, and not really any retry or backoff logic
    - This is actually good! I swear!
    - Leave the kubelet doing the backoff and retry, and see how that takes
 * One restarted mq server seemed to work, but probably needs some testing

go.mod
wunder/main.go

diff --git a/go.mod b/go.mod
index fa5885df36ef36ffefcaeebb0d1f86ce3ff2fde4..4acf7d14a93f0a0cac8be4f4c159cf2f82d890dd 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,14 @@
-module unpiege.net/rabbit_go/wunder/m/v2
+module unpiege.net/rabbit_go/m/v2
 
 go 1.18
+
+require (
+       github.com/PuerkitoBio/goquery v1.8.0
+       github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
+       github.com/rabbitmq/amqp091-go v1.5.0
+)
+
+require (
+       github.com/andybalholm/cascadia v1.3.1 // indirect
+       golang.org/x/net v0.0.0-20210916014120-12bc252f5db8 // indirect
+)
index a9dc3479b8edeec8fe98285137d6367c67a90210..3e13d2f26f9db0004278df3fa89df79ef8e6f0f2 100644 (file)
@@ -1,3 +1,4 @@
+// basic stuff
 package main
 
 import (
@@ -6,27 +7,137 @@ import (
        "crypto/x509"
        "encoding/json"
        "fmt"
+       "io"
        "io/ioutil"
        "log"
+       "math"
+       "net/http"
        "os"
+       "strconv"
        "time"
 
+       "github.com/PuerkitoBio/goquery"
+       "github.com/leekchan/timeutil"
        amqp "github.com/rabbitmq/amqp091-go"
 )
 
+// RabbitConfig simple values to pass around
+type RabbitConfig struct {
+       exchange   string
+       channel    *amqp.Channel
+       routingKey string
+       location   string
+}
+
 func failOnError(err error, msg string) {
        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
 }
 
-func main() {
-       configJson, err := ioutil.ReadFile(os.Args[1])
+func sum(arr []float64) float64 {
+       sum := 0.0
+       for _, value := range arr {
+               sum += value
+       }
+       return sum
+}
+func extractTemp(htmlReader io.Reader) (float64, error) {
+       foundTemp := false
+       var currentTemp float64
+
+       // pull in the doc parser data
+       doc, err := goquery.NewDocumentFromReader(htmlReader)
+       if err != nil {
+               return currentTemp, err
+       }
+
+       // this is the span class where the temperature data lives (buried in it though)
+       doc.Find(".current-temp").Each(func(i int, s *goquery.Selection) {
+               class, _ := s.Attr("class")
+               if class == "current-temp" {
+                       // gotta get down the span tags I guess
+                       s.Children().Children().Children().Each(func(j int, t *goquery.Selection) {
+                               childClass, _ := t.Attr("class")
+                               // this class actually has the current temp
+                               if childClass == "wu-value wu-value-to" {
+                                       currentTemp, err = strconv.ParseFloat(t.Text(), 10)
+                                       if err != nil {
+                                               log.Println(err)
+                                       }
+                                       foundTemp = true
+                               }
+                       })
+               }
+       })
+       if !foundTemp {
+               fmt.Println("oh no!")
+               return currentTemp, fmt.Errorf("An error occurred while parsing html data")
+       }
+       return currentTemp, nil
+
+}
+
+func fetchTemp(weatherStation string) (float64, error) {
+       const baseURL string = "https://www.wunderground.com/weather/us/wa/seattle"
+       client := &http.Client{
+               Timeout: time.Second * 10,
+       }
+       fullURL := fmt.Sprintf("%s/%s", baseURL, weatherStation)
+       log.Printf("Fetching full url %s\n", fullURL)
+       req, err := http.NewRequest("GET", fullURL, nil)
+       if err != nil {
+               return 0, err
+       }
+       req.Header.Set("user-agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0")
+       res, err := client.Do(req)
+       if err != nil {
+               return 0, err
+       }
+       defer res.Body.Close()
+       // just return whatever we get back by passing through the html
+       return extractTemp(res.Body)
+
+}
+
+func tempLoop(channel chan int, weatherStation string, rabbit RabbitConfig) error {
+       tempList := make([]float64, 0)
+       for count := range channel {
+               const tempLength int = 60
+               log.Printf("doing another fetch, count %d", count)
+               currentTemp, err := fetchTemp(weatherStation)
+               if err != nil {
+                       fmt.Errorf("Wasn't able to get temp %s", err)
+               } else {
+                       if currentTemp == 0 {
+                               fmt.Printf("Got 0F for temp, ignoring")
+                       } else {
+                               celsius := (currentTemp - 32) * 5 / 9
+                               tempList = append(tempList, celsius)
+                               if len(tempList) > tempLength {
+                                       tempList = tempList[1:]
+                               }
+                               tempAvg := sum(tempList) / float64(len(tempList))
+                               if math.Abs(tempAvg-celsius) > 3.0 {
+                                       log.Printf("temperature %f outside mean %f, not printing", celsius, tempAvg)
+                               } else {
+                                       sendTemp(currentTemp, celsius, rabbit)
+                               }
+                               fmt.Printf("temp list is %s, avg is %f\n", tempList, tempAvg)
+                       }
+               }
+       }
+       return nil
+
+}
+
+func setupRabbit(configFilename string) RabbitConfig {
+       configJSON, err := ioutil.ReadFile(configFilename)
        if err != nil {
                failOnError(err, "oh no read")
        }
        var result map[string]interface{}
-       json.Unmarshal([]byte(configJson), &result)
+       json.Unmarshal([]byte(configJSON), &result)
        fmt.Println(result)
 
        // no cert verification because we're self-signed
@@ -36,13 +147,13 @@ func main() {
 
        conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg)
        failOnError(err, "Failed to connect to RabbitMQ")
-       defer conn.Close()
+       //defer conn.Close()
 
-       ch, err := conn.Channel()
-       failOnError(err, "Failed to open a channel")
-       defer ch.Close()
+       rabbitChannel, err := conn.Channel()
+       failOnError(err, "Failed to open a rabbitChannelannel")
+       //defer rabbitChannel.Close()
 
-       err = ch.ExchangeDeclare(
+       err = rabbitChannel.ExchangeDeclare(
                result["exchange"].(string), // name
                amqp.ExchangeTopic,          // type
                true,                        // durable
@@ -53,26 +164,57 @@ func main() {
        )
        failOnError(err, "Failed exchange stuff")
 
-       err = ch.Confirm(false) // wait for this to finish too, confirm delivery of messages
+       err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages
        failOnError(err, "Failed confirm")
-
+       var rabbit RabbitConfig
+       rabbit.exchange = result["exchange"].(string)
+       rabbit.channel = rabbitChannel
+       // constants
+       rabbit.routingKey = "temp"
+       rabbit.location = "Outside (Wunderground)"
+       return rabbit
+}
+func sendTemp(tempValueF float64, tempValueC float64, rabbit RabbitConfig) {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
-       send_obj := make(map[string]interface{})
-       send_obj["text"] = "Hello World!"
+       sendObj := make(map[string]interface{})
+       sendObj["celsius"] = tempValueC
+       sendObj["fahrenheit"] = tempValueF
+       sendObj["location"] = rabbit.location
+       utcNow := time.Now().UTC()
+       dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
+       sendObj["date"] = dateString
 
-       data, err := json.Marshal(send_obj)
+       data, err := json.Marshal(sendObj)
        failOnError(err, "oh no json marshal")
 
-       err = ch.PublishWithContext(ctx,
-               result["exchange"].(string), // exchange
-               "lol",                       // routing key
-               false,                       // mandatory
-               false,                       // immediate
+       err = rabbit.channel.PublishWithContext(ctx,
+               rabbit.exchange,   // exchange
+               rabbit.routingKey, // routing key
+               false,             // mandatory
+               false,             // immediate
                amqp.Publishing{
                        ContentType: "application/json",
                        Body:        data,
                })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s\n", data)
+
+}
+func main() {
+       rabbit := setupRabbit(os.Args[1])
+       const weatherStation string = "KWASEATT2696"
+       const sleepTime time.Duration = 30 * time.Second
+
+       //currentTemp, err := fetchTemp(weatherStation)
+       channel := make(chan int)
+       go tempLoop(channel, weatherStation, rabbit)
+       //time.Sleep(sleepTime)
+       var count int
+       for true {
+               channel <- count
+               count = count + 1
+               time.Sleep(sleepTime)
+       }
+
 }