From: jweigele Date: Tue, 6 Dec 2022 04:03:59 +0000 (-0800) Subject: First working run! X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=7aa5c664f505f8882a67b29a84ae170e8d010574;p=rabbit_go First working run! * Rename the module slightly for future compat (I think?) * Connects to the rabbitmq server and sends messages like we'd want * Some values hardcoded, and not really any retry or backoff logic - This is actually good! I swear! - Leave the kubelet doing the backoff and retry, and see how that takes * One restarted mq server seemed to work, but probably needs some testing --- diff --git a/go.mod b/go.mod index fa5885d..4acf7d1 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,14 @@ -module unpiege.net/rabbit_go/wunder/m/v2 +module unpiege.net/rabbit_go/m/v2 go 1.18 + +require ( + github.com/PuerkitoBio/goquery v1.8.0 + github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d + github.com/rabbitmq/amqp091-go v1.5.0 +) + +require ( + github.com/andybalholm/cascadia v1.3.1 // indirect + golang.org/x/net v0.0.0-20210916014120-12bc252f5db8 // indirect +) diff --git a/wunder/main.go b/wunder/main.go index a9dc347..3e13d2f 100644 --- a/wunder/main.go +++ b/wunder/main.go @@ -1,3 +1,4 @@ +// basic stuff package main import ( @@ -6,27 +7,137 @@ import ( "crypto/x509" "encoding/json" "fmt" + "io" "io/ioutil" "log" + "math" + "net/http" "os" + "strconv" "time" + "github.com/PuerkitoBio/goquery" + "github.com/leekchan/timeutil" amqp "github.com/rabbitmq/amqp091-go" ) +// RabbitConfig simple values to pass around +type RabbitConfig struct { + exchange string + channel *amqp.Channel + routingKey string + location string +} + func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } -func main() { - configJson, err := ioutil.ReadFile(os.Args[1]) +func sum(arr []float64) float64 { + sum := 0.0 + for _, value := range arr { + sum += value + } + return sum +} +func extractTemp(htmlReader io.Reader) (float64, error) { + foundTemp := false + var currentTemp float64 + + // pull in the doc parser data + doc, err := goquery.NewDocumentFromReader(htmlReader) + if err != nil { + return currentTemp, err + } + + // this is the span class where the temperature data lives (buried in it though) + doc.Find(".current-temp").Each(func(i int, s *goquery.Selection) { + class, _ := s.Attr("class") + if class == "current-temp" { + // gotta get down the span tags I guess + s.Children().Children().Children().Each(func(j int, t *goquery.Selection) { + childClass, _ := t.Attr("class") + // this class actually has the current temp + if childClass == "wu-value wu-value-to" { + currentTemp, err = strconv.ParseFloat(t.Text(), 10) + if err != nil { + log.Println(err) + } + foundTemp = true + } + }) + } + }) + if !foundTemp { + fmt.Println("oh no!") + return currentTemp, fmt.Errorf("An error occurred while parsing html data") + } + return currentTemp, nil + +} + +func fetchTemp(weatherStation string) (float64, error) { + const baseURL string = "https://www.wunderground.com/weather/us/wa/seattle" + client := &http.Client{ + Timeout: time.Second * 10, + } + fullURL := fmt.Sprintf("%s/%s", baseURL, weatherStation) + log.Printf("Fetching full url %s\n", fullURL) + req, err := http.NewRequest("GET", fullURL, nil) + if err != nil { + return 0, err + } + req.Header.Set("user-agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0") + res, err := client.Do(req) + if err != nil { + return 0, err + } + defer res.Body.Close() + // just return whatever we get back by passing through the html + return extractTemp(res.Body) + +} + +func tempLoop(channel chan int, weatherStation string, rabbit RabbitConfig) error { + tempList := make([]float64, 0) + for count := range channel { + const tempLength int = 60 + log.Printf("doing another fetch, count %d", count) + currentTemp, err := fetchTemp(weatherStation) + if err != nil { + fmt.Errorf("Wasn't able to get temp %s", err) + } else { + if currentTemp == 0 { + fmt.Printf("Got 0F for temp, ignoring") + } else { + celsius := (currentTemp - 32) * 5 / 9 + tempList = append(tempList, celsius) + if len(tempList) > tempLength { + tempList = tempList[1:] + } + tempAvg := sum(tempList) / float64(len(tempList)) + if math.Abs(tempAvg-celsius) > 3.0 { + log.Printf("temperature %f outside mean %f, not printing", celsius, tempAvg) + } else { + sendTemp(currentTemp, celsius, rabbit) + } + fmt.Printf("temp list is %s, avg is %f\n", tempList, tempAvg) + } + } + } + return nil + +} + +func setupRabbit(configFilename string) RabbitConfig { + configJSON, err := ioutil.ReadFile(configFilename) if err != nil { failOnError(err, "oh no read") } var result map[string]interface{} - json.Unmarshal([]byte(configJson), &result) + json.Unmarshal([]byte(configJSON), &result) fmt.Println(result) // no cert verification because we're self-signed @@ -36,13 +147,13 @@ func main() { conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg) failOnError(err, "Failed to connect to RabbitMQ") - defer conn.Close() + //defer conn.Close() - ch, err := conn.Channel() - failOnError(err, "Failed to open a channel") - defer ch.Close() + rabbitChannel, err := conn.Channel() + failOnError(err, "Failed to open a rabbitChannelannel") + //defer rabbitChannel.Close() - err = ch.ExchangeDeclare( + err = rabbitChannel.ExchangeDeclare( result["exchange"].(string), // name amqp.ExchangeTopic, // type true, // durable @@ -53,26 +164,57 @@ func main() { ) failOnError(err, "Failed exchange stuff") - err = ch.Confirm(false) // wait for this to finish too, confirm delivery of messages + err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages failOnError(err, "Failed confirm") - + var rabbit RabbitConfig + rabbit.exchange = result["exchange"].(string) + rabbit.channel = rabbitChannel + // constants + rabbit.routingKey = "temp" + rabbit.location = "Outside (Wunderground)" + return rabbit +} +func sendTemp(tempValueF float64, tempValueC float64, rabbit RabbitConfig) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - send_obj := make(map[string]interface{}) - send_obj["text"] = "Hello World!" + sendObj := make(map[string]interface{}) + sendObj["celsius"] = tempValueC + sendObj["fahrenheit"] = tempValueF + sendObj["location"] = rabbit.location + utcNow := time.Now().UTC() + dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") + sendObj["date"] = dateString - data, err := json.Marshal(send_obj) + data, err := json.Marshal(sendObj) failOnError(err, "oh no json marshal") - err = ch.PublishWithContext(ctx, - result["exchange"].(string), // exchange - "lol", // routing key - false, // mandatory - false, // immediate + err = rabbit.channel.PublishWithContext(ctx, + rabbit.exchange, // exchange + rabbit.routingKey, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: data, }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s\n", data) + +} +func main() { + rabbit := setupRabbit(os.Args[1]) + const weatherStation string = "KWASEATT2696" + const sleepTime time.Duration = 30 * time.Second + + //currentTemp, err := fetchTemp(weatherStation) + channel := make(chan int) + go tempLoop(channel, weatherStation, rabbit) + //time.Sleep(sleepTime) + var count int + for true { + channel <- count + count = count + 1 + time.Sleep(sleepTime) + } + }