From: jweigele Date: Tue, 6 Dec 2022 16:10:56 +0000 (-0800) Subject: Break out helper functions to a helper package for further work X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=abc128ddee1a0413f91b21edab90d02099ac9001;p=rabbit_go Break out helper functions to a helper package for further work --- diff --git a/go.mod b/go.mod index 71e3d9a..3c911bc 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module unpiege.net/rabbit_go.git go 1.18 + require ( github.com/PuerkitoBio/goquery v1.8.0 github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d diff --git a/helper/helper.go b/helper/helper.go new file mode 100644 index 0000000..858abc3 --- /dev/null +++ b/helper/helper.go @@ -0,0 +1,98 @@ +package helper + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" +// "io" + "io/ioutil" + "log" +// "math" +// "net/http" +// "os" +// "strconv" + "time" + +// "github.com/PuerkitoBio/goquery" +// "github.com/leekchan/timeutil" + amqp "github.com/rabbitmq/amqp091-go" +) + +// RabbitConfig simple values to pass around +type RabbitConfig struct { + Exchange string + Channel *amqp.Channel + RoutingKey string + Location string +} + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func SetupRabbit(configFilename string) RabbitConfig { + configJSON, err := ioutil.ReadFile(configFilename) + if err != nil { + failOnError(err, "oh no read") + } + var result map[string]interface{} + json.Unmarshal([]byte(configJSON), &result) + fmt.Println(result) + + // no cert verification because we're self-signed + cfg := new(tls.Config) + cfg.RootCAs = x509.NewCertPool() + cfg.InsecureSkipVerify = true + + conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg) + failOnError(err, "Failed to connect to RabbitMQ") + //defer conn.Close() + + rabbitChannel, err := conn.Channel() + failOnError(err, "Failed to open a rabbitChannelannel") + //defer rabbitChannel.Close() + + err = rabbitChannel.ExchangeDeclare( + result["exchange"].(string), // name + amqp.ExchangeTopic, // type + true, // durable + false, // autodelete + false, // internal (doesn't accept publish so...) + false, // block and wait for call to finish + nil, // optional args I don't know any + ) + failOnError(err, "Failed exchange stuff") + + err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages + failOnError(err, "Failed confirm") + var rabbit RabbitConfig + rabbit.Exchange = result["exchange"].(string) + rabbit.Channel = rabbitChannel + // constants + rabbit.RoutingKey = "temp" + rabbit.Location = "Outside (Wunderground)" + return rabbit +} + + +func SendJSON(data []byte, rabbit RabbitConfig) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := rabbit.Channel.PublishWithContext(ctx, + rabbit.Exchange, // exchange + rabbit.RoutingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: data, + }) + failOnError(err, "Failed to publish a message") + log.Printf(" [x] Sent %s\n", data) + +} + diff --git a/wunder/main.go b/wunder/main.go index 3e13d2f..e634851 100644 --- a/wunder/main.go +++ b/wunder/main.go @@ -2,13 +2,9 @@ package main import ( - "context" - "crypto/tls" - "crypto/x509" "encoding/json" "fmt" "io" - "io/ioutil" "log" "math" "net/http" @@ -18,17 +14,9 @@ import ( "github.com/PuerkitoBio/goquery" "github.com/leekchan/timeutil" - amqp "github.com/rabbitmq/amqp091-go" + "unpiege.net/rabbit_go.git/helper" ) -// RabbitConfig simple values to pass around -type RabbitConfig struct { - exchange string - channel *amqp.Channel - routingKey string - location string -} - func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) @@ -100,7 +88,7 @@ func fetchTemp(weatherStation string) (float64, error) { } -func tempLoop(channel chan int, weatherStation string, rabbit RabbitConfig) error { +func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error { tempList := make([]float64, 0) for count := range channel { const tempLength int = 60 @@ -131,78 +119,22 @@ func tempLoop(channel chan int, weatherStation string, rabbit RabbitConfig) erro } -func setupRabbit(configFilename string) RabbitConfig { - configJSON, err := ioutil.ReadFile(configFilename) - if err != nil { - failOnError(err, "oh no read") - } - var result map[string]interface{} - json.Unmarshal([]byte(configJSON), &result) - fmt.Println(result) - - // no cert verification because we're self-signed - cfg := new(tls.Config) - cfg.RootCAs = x509.NewCertPool() - cfg.InsecureSkipVerify = true - - conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg) - failOnError(err, "Failed to connect to RabbitMQ") - //defer conn.Close() - - rabbitChannel, err := conn.Channel() - failOnError(err, "Failed to open a rabbitChannelannel") - //defer rabbitChannel.Close() - - err = rabbitChannel.ExchangeDeclare( - result["exchange"].(string), // name - amqp.ExchangeTopic, // type - true, // durable - false, // autodelete - false, // internal (doesn't accept publish so...) - false, // block and wait for call to finish - nil, // optional args I don't know any - ) - failOnError(err, "Failed exchange stuff") - - err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages - failOnError(err, "Failed confirm") - var rabbit RabbitConfig - rabbit.exchange = result["exchange"].(string) - rabbit.channel = rabbitChannel - // constants - rabbit.routingKey = "temp" - rabbit.location = "Outside (Wunderground)" - return rabbit -} -func sendTemp(tempValueF float64, tempValueC float64, rabbit RabbitConfig) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - sendObj := make(map[string]interface{}) - sendObj["celsius"] = tempValueC - sendObj["fahrenheit"] = tempValueF - sendObj["location"] = rabbit.location - utcNow := time.Now().UTC() - dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") - sendObj["date"] = dateString - - data, err := json.Marshal(sendObj) - failOnError(err, "oh no json marshal") - - err = rabbit.channel.PublishWithContext(ctx, - rabbit.exchange, // exchange - rabbit.routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: data, - }) - failOnError(err, "Failed to publish a message") - log.Printf(" [x] Sent %s\n", data) +func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig) { + sendObj := make(map[string]interface{}) + sendObj["celsius"] = tempValueC + sendObj["fahrenheit"] = tempValueF + sendObj["location"] = rabbit.Location + utcNow := time.Now().UTC() + dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") + sendObj["date"] = dateString + data, err := json.Marshal(sendObj) + failOnError(err, "oh no json marshal") + helper.SendJSON(data, rabbit) } + func main() { - rabbit := setupRabbit(os.Args[1]) + rabbit := helper.SetupRabbit(os.Args[1]) const weatherStation string = "KWASEATT2696" const sleepTime time.Duration = 30 * time.Second