From: jweigele Date: Fri, 9 Dec 2022 22:50:25 +0000 (-0800) Subject: Tidy up helper somewhat, and start on implementing reprocess X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=4e0d630d7fe90c5d83eac8f78b3b60d133e73861;p=rabbit_go Tidy up helper somewhat, and start on implementing reprocess For helper: * Helper does most of the json I/O so we don't need to import in other files * Add some structs to manage the complexity of multiple routing keys * Various convenience options (IncludeDate on send, Source in the RabbitConfig) for easier scope For reprocess: * Reads in temp pretty simply, and sets prometheus gauges as we've done before * Kinda handles DIY temperature (resends and prometheus gauge), and does not care about motion (so won't send it back out) * Weird chain of structs to implement different devices * Have not started on power or other complicated logic yet --- diff --git a/go.mod b/go.mod index 71e3d9a..4510935 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,20 @@ go 1.18 require ( github.com/PuerkitoBio/goquery v1.8.0 github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d + github.com/prometheus/client_golang v1.14.0 github.com/rabbitmq/amqp091-go v1.5.0 ) require ( github.com/andybalholm/cascadia v1.3.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.37.0 // indirect + github.com/prometheus/procfs v0.8.0 // indirect golang.org/x/net v0.3.0 // indirect + golang.org/x/sys v0.3.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/helper/helper.go b/helper/helper.go index ced2c28..9e966aa 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -9,6 +9,7 @@ import ( // "io" "io/ioutil" "log" + "strings" // "math" // "net/http" // "os" @@ -16,15 +17,25 @@ import ( "time" // "github.com/PuerkitoBio/goquery" -// "github.com/leekchan/timeutil" + "github.com/leekchan/timeutil" amqp "github.com/rabbitmq/amqp091-go" ) + +// RabbitSend just a bundle of a map with a routing key +type RabbitSend struct { + Data map[string]interface{} + RoutingKey string + IncludeDate bool // defaults to false, so we don't send the date +} + // RabbitConfig simple values to pass around type RabbitConfig struct { - Exchange string - Channel *amqp.Channel - RoutingKey string + Exchange string + Channel *amqp.Channel + Queue *amqp.Queue + DefaultRoutingKey string + Source string } func failOnError(err error, msg string) { @@ -33,7 +44,8 @@ func failOnError(err error, msg string) { } } -func SetupRabbit(configFilename string, routingKey string) RabbitConfig { + +func SetupRabbit(configFilename string, defaultRoutingKey string, source string) RabbitConfig { configJSON, err := ioutil.ReadFile(configFilename) if err != nil { failOnError(err, "oh no read") @@ -52,7 +64,7 @@ func SetupRabbit(configFilename string, routingKey string) RabbitConfig { //defer conn.Close() rabbitChannel, err := conn.Channel() - failOnError(err, "Failed to open a rabbitChannelannel") + failOnError(err, "Failed to open a rabbitChannel") //defer rabbitChannel.Close() err = rabbitChannel.ExchangeDeclare( @@ -72,26 +84,113 @@ func SetupRabbit(configFilename string, routingKey string) RabbitConfig { rabbit.Exchange = result["exchange"].(string) rabbit.Channel = rabbitChannel // constants - rabbit.RoutingKey = routingKey + rabbit.DefaultRoutingKey = defaultRoutingKey + rabbit.Source = source return rabbit } +// simple, bind to a (topic) with our default params +// will declare a queue if we don't have one +func Bind(routingKey string, rabbit RabbitConfig) (RabbitConfig){ + // if queue hasn't yet been initialized (no other bound topics basically) + if rabbit.Queue == nil { + queue, err := rabbit.Channel.QueueDeclare( + "", // queue name (auto-gen) + false, // durable + true, // autoDelete (get rid of if consumer discos or server restarts) + true, // exclusive + false, // noWait (i.e. block until finished) + nil, // args + ) + // pass by reference so we can fit in our struct + rabbit.Queue = &queue + failOnError(err, "failed queue declaration") + log.Printf("new queue declared %s", rabbit.Queue.Name) + } + + // last two are block on binding and args + err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil) + failOnError(err, "failed queue binding") + return rabbit -func SendJSON(data []byte, rabbit RabbitConfig, verboseSend bool) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := rabbit.Channel.PublishWithContext(ctx, - rabbit.Exchange, // exchange - rabbit.RoutingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: data, - }) - failOnError(err, "Failed to publish a message") - if verboseSend { - log.Printf("Sent %s routing: %s\n", data, rabbit.RoutingKey) +} + +// starts consuming with all our presets and returns channel +func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery){ + log.Printf("consuming on queue %+v", rabbit.Queue) + deliveries, err := rabbit.Channel.Consume( + rabbit.Queue.Name, + "", // consumer (auto-gen) + true, // autoAck + true, // exclusive + false, // noLocal - does nothing? + false, // wait and block please + nil, // args + ) + failOnError(err, "failed channel consume") + return deliveries +} + +func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}){ + //var result map[string]string //interface{} + result := make(map[string]interface{}) + // either explicitly json, or zigbee2mqtt (which we have set to only output json) + isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt") + if delivery.ContentType == "application/json" || isZigbee { + err := json.Unmarshal(delivery.Body, &result) + if isZigbee { + // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value + // try to further decode this in an effort to help the data consumer + _, actionPresent := result["action"] + if actionPresent { + var decode map[string]interface{} + json.Unmarshal([]byte(result["action"].(string)), &decode) + result["action"] = decode + } } + failOnError(err, "failed to decode json") + } else { + result["data"] = string(delivery.Body[:]) + } + return result } + +func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool){ + if rabbitData.IncludeDate { + // add the date as a convenience + utcNow := time.Now().UTC() + dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") + rabbitData.Data["date"] = dateString + } + if rabbit.Source != "" { + rabbitData.Data["source"] = rabbit.Source + } + + jsonBytes, err := json.Marshal(rabbitData.Data) + failOnError(err, "Improperly formatted JSON, just gonna exit here") + var routingKey string + if rabbitData.RoutingKey != "" { + routingKey = rabbitData.RoutingKey + } else if rabbit.DefaultRoutingKey != "" { + routingKey = rabbit.DefaultRoutingKey + } else { + failOnError(fmt.Errorf("routing key error"), "No valid routing key found to send on") + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = rabbit.Channel.PublishWithContext(ctx, + rabbit.Exchange, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: jsonBytes, + }) + failOnError(err, "Failed to publish a message") + if verboseSend { + log.Printf("Sent %s routing: %s\n", jsonBytes, routingKey) + } + +} diff --git a/reprocess/main.go b/reprocess/main.go new file mode 100644 index 0000000..99a29c5 --- /dev/null +++ b/reprocess/main.go @@ -0,0 +1,234 @@ +// basic stuff +package main + +import ( + "fmt" + "log" + "os" + "time" + + // prometheus imports + "net/http" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "unpiege.net/rabbit_go.git/helper" +) + +var ( + tempGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "temp_sensor", + Help: "Used to measure temperatures in the real world", + }, + []string{"scale", "location"}, + ) + //tempExpire = map[prometheus.Labels]time.Time + //tempExpire = map[string]string + + tempExpire = make(map[[2]string]time.Time) + + staleTime = time.Duration(15 * time.Second) + +) + + +type Device interface { + getRoutingKey() string +} + +type ZigDevice struct { + routingKey string +} + +type DIYDevice struct { + *ZigDevice +} + +type PowerDevice struct { + *ZigDevice + powerName string + queryNeeded bool +} + + +func (dev *DIYDevice) getRoutingKey() (string){ + return fmt.Sprintf("zigbee2mqtt.%s", dev.routingKey) +} + +func NewDevice(routingKey string) (*ZigDevice){ + retval := ZigDevice{routingKey: routingKey} + //retval.routingKey = routingKey + return &retval +} + +func NewDIYDevice(routingKey string) (*DIYDevice){ + return &DIYDevice{ + ZigDevice: NewDevice(routingKey), + } + +} + +//tempExpire := make(map[[2]string]time.Time) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func handleTemp(obj map[string]interface{}){ + log.Printf("temp! %v", obj) + celsius := obj["celsius"].(float64) + location := obj["location"].(string) + now := time.Now().UTC() + // TODO: last update portion + + // do the label update here + //log.Printf("celsius: %f location %s", celsius, location) + tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius) + tempExpire[[2]string{"celsius", location}] = now + +} + +func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend){ + log.Printf("diy! %v", obj) + _, isAction := obj["action"] + if isAction { + // this is yet more JSON, so decode it + data := obj["action"].(map[string]interface{}) +/* err := json.Unmarshal([]byte(obj["action"].(string)), &data) + if err != nil { + log.Printf("Unable to decode json action from diy, ignoring") + return + }*/ + var senseType string + + _, isTemp := data["celsius"] + if isTemp { + senseType = "temperature" + } else { + _, hasType := data["type"] + if hasType { + senseType = data["type"].(string) + } else { + log.Printf("Did not find any sense type, just returning") + return + } + } + // should have the senseType setup correctly now, if we're still here + if senseType == "temperature" { + tempMap := make(map[string]interface{}, 0) + // these should all exist properly + tempMap["fahrenheit"] = data["celsius"].(float64) * 9/5 + 32 + tempMap["celsius"] = data["celsius"].(float64) + tempMap["location"] = data["location"].(string) + log.Printf("would send %v", tempMap) + // hardcoded temp routingKey for this type of measurement + sendThis := helper.RabbitSend{Data: tempMap, RoutingKey: "lol", IncludeDate: true} + sendChannel <- sendThis + } + + } else { + log.Printf("not action, ignoring") + } + +} + +func readLoop(channel chan helper.RabbitSend, devices []Device, rabbit helper.RabbitConfig) { + for _, device := range devices { + rabbit = helper.Bind(device.getRoutingKey(), rabbit) + log.Printf("bound to %s", device.getRoutingKey()) + } + rabbit = helper.Bind("temp", rabbit) + log.Printf("bound to temp") + deliveries := helper.StartConsuming(rabbit) + for delivery := range deliveries { + //log.Printf("got a delivery %+v", delivery) + item := helper.DecodeDelivery(delivery) + //log.Printf("%+v", item) + if delivery.RoutingKey == "temp" { + handleTemp(item) + } + for _, device := range devices { + if device.getRoutingKey() == delivery.RoutingKey { + log.Printf("Found device for routing key %s, %v", delivery.RoutingKey, device) + switch t := device.(type) { + case *DIYDevice: + log.Printf("DIY device, sending to handle") + handleDIY(item, channel) + default: + log.Printf("other device, type was %s", t) + } + } + } + //time.Sleep(time.Duration(0.5*float64(time.Second))) + } +} +func timeLoop(channel chan helper.RabbitSend) { + for { + time.Sleep(time.Duration(0.5*float64(time.Second))) + } +} + +func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig){ + for sendItem := range channel { + log.Printf("would try to send here %v", sendItem) + helper.SendData(sendItem, rabbit, true) + //helper.SendData(sendData, rabbit, false) + } +} + +func promSetup(){ + // prometheus setup + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":9099", nil) +} + + +func expireStaleMetrics(){ + now := time.Now().UTC() + // tempExpire + tempExpireValues := make([][2]string, 0) + for labels, lastUpdate := range tempExpire { + curStale := now.Sub(lastUpdate) + if curStale > staleTime { + log.Printf("stale time %v, expiring labels %v\tlastUpdate: %v", curStale, labels, lastUpdate) + tempExpireValues = append(tempExpireValues, labels) + } + } + + // now we have the values to delete + for _, labels := range tempExpireValues { + complete := tempGauge.DeleteLabelValues(labels[:]...) + if complete { + delete(tempExpire, labels) + } else { + log.Panicf("oh no wasn't able to delete metrics") + } + } + +} + +func main() { + rabbit := helper.SetupRabbit(os.Args[1], "", "reprocess") // config file, default routing key, source + const sleepTime time.Duration = 5 * time.Second + + devices := make([]Device, 0) + devices = append(devices, NewDIYDevice("office_pico")) + + //currentTemp, err := fetchTemp(weatherStation) + channel := make(chan helper.RabbitSend) + + go promSetup() + go readLoop(channel, devices, rabbit) + go timeLoop(channel) + go sendLoop(channel, rabbit) + + for true { + time.Sleep(sleepTime) + go expireStaleMetrics() + } + +} diff --git a/timecolorshift/main.go b/timecolorshift/main.go index 72c2e3e..decabb9 100644 --- a/timecolorshift/main.go +++ b/timecolorshift/main.go @@ -2,7 +2,7 @@ package main import ( - "encoding/json" + //"encoding/json" //"fmt" //"io" "log" @@ -131,7 +131,7 @@ func sum(arr []float64) float64 { return sum } -func timeLoop(channel chan []byte, timeColors []TimeColor) { +func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) { loc, _ := time.LoadLocation("America/Los_Angeles") for { //log.Printf("okay just looping here") @@ -151,8 +151,7 @@ func timeLoop(channel chan []byte, timeColors []TimeColor) { sendObj["red"] = totalRGB.Red sendObj["green"] = totalRGB.Green sendObj["blue"] = totalRGB.Blue - - data, _ := json.Marshal(sendObj) + data := helper.RabbitSend{Data: sendObj} channel <- data time.Sleep(time.Duration(0.5*float64(time.Second))) @@ -160,19 +159,18 @@ func timeLoop(channel chan []byte, timeColors []TimeColor) { } } -func sendLoop(channel chan []byte, rabbit helper.RabbitConfig){ +func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig){ for sendData := range channel { - //log.Printf("would try to send here %s", sendData) - helper.SendJSON(sendData, rabbit, false) + helper.SendData(sendData, rabbit, true) } } func main() { - rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py") // config file, routing key + rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py", "timecolorshift") // config file, routing key const sleepTime time.Duration = 30 * time.Second //currentTemp, err := fetchTemp(weatherStation) - channel := make(chan []byte) + channel := make(chan helper.RabbitSend) all_tcs := make([]TimeColor, 0) all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour:12})) all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour:8})) diff --git a/timecolorshift/timecolorshift b/timecolorshift/timecolorshift deleted file mode 100755 index ff2414c..0000000 Binary files a/timecolorshift/timecolorshift and /dev/null differ diff --git a/wunder/main.go b/wunder/main.go index 293ab4d..c820386 100644 --- a/wunder/main.go +++ b/wunder/main.go @@ -2,7 +2,7 @@ package main import ( - "encoding/json" + //"encoding/json" "fmt" "io" "log" @@ -13,7 +13,7 @@ import ( "time" "github.com/PuerkitoBio/goquery" - "github.com/leekchan/timeutil" + //"github.com/leekchan/timeutil" "unpiege.net/rabbit_go.git/helper" ) @@ -111,7 +111,7 @@ func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfi } else { sendTemp(currentTemp, celsius, rabbit) } - fmt.Printf("temp list is %s, avg is %f\n", tempList, tempAvg) + fmt.Printf("temp list is %+v, avg is %f\n", tempList, tempAvg) } } } @@ -126,16 +126,12 @@ func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig sendObj["fahrenheit"] = tempValueF // just a constant now sendObj["location"] = "Outside (Wunderground)" - utcNow := time.Now().UTC() - dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") - sendObj["date"] = dateString - data, err := json.Marshal(sendObj) - failOnError(err, "oh no json marshal") - helper.SendJSON(data, rabbit, true) + rabbitData := helper.RabbitSend{Data: sendObj, IncludeDate: true} + helper.SendData(rabbitData, rabbit, true) } func main() { - rabbit := helper.SetupRabbit(os.Args[1], "temp") // config file, routing key + rabbit := helper.SetupRabbit(os.Args[1], "temp", "wunder") // config file, routing key const weatherStation string = "KWASEATT2696" const sleepTime time.Duration = 30 * time.Second