require (
github.com/PuerkitoBio/goquery v1.8.0
github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
+ github.com/prometheus/client_golang v1.14.0
github.com/rabbitmq/amqp091-go v1.5.0
)
require (
github.com/andybalholm/cascadia v1.3.1 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.1.2 // indirect
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
+ github.com/prometheus/client_model v0.3.0 // indirect
+ github.com/prometheus/common v0.37.0 // indirect
+ github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/net v0.3.0 // indirect
+ golang.org/x/sys v0.3.0 // indirect
+ google.golang.org/protobuf v1.28.1 // indirect
)
// "io"
"io/ioutil"
"log"
+ "strings"
// "math"
// "net/http"
// "os"
"time"
// "github.com/PuerkitoBio/goquery"
-// "github.com/leekchan/timeutil"
+ "github.com/leekchan/timeutil"
amqp "github.com/rabbitmq/amqp091-go"
)
+
+// RabbitSend just a bundle of a map with a routing key
+type RabbitSend struct {
+ Data map[string]interface{}
+ RoutingKey string
+ IncludeDate bool // defaults to false, so we don't send the date
+}
+
// RabbitConfig simple values to pass around
type RabbitConfig struct {
- Exchange string
- Channel *amqp.Channel
- RoutingKey string
+ Exchange string
+ Channel *amqp.Channel
+ Queue *amqp.Queue
+ DefaultRoutingKey string
+ Source string
}
func failOnError(err error, msg string) {
}
}
-func SetupRabbit(configFilename string, routingKey string) RabbitConfig {
+
+func SetupRabbit(configFilename string, defaultRoutingKey string, source string) RabbitConfig {
configJSON, err := ioutil.ReadFile(configFilename)
if err != nil {
failOnError(err, "oh no read")
//defer conn.Close()
rabbitChannel, err := conn.Channel()
- failOnError(err, "Failed to open a rabbitChannelannel")
+ failOnError(err, "Failed to open a rabbitChannel")
//defer rabbitChannel.Close()
err = rabbitChannel.ExchangeDeclare(
rabbit.Exchange = result["exchange"].(string)
rabbit.Channel = rabbitChannel
// constants
- rabbit.RoutingKey = routingKey
+ rabbit.DefaultRoutingKey = defaultRoutingKey
+ rabbit.Source = source
return rabbit
}
+// simple, bind to a (topic) with our default params
+// will declare a queue if we don't have one
+func Bind(routingKey string, rabbit RabbitConfig) (RabbitConfig){
+ // if queue hasn't yet been initialized (no other bound topics basically)
+ if rabbit.Queue == nil {
+ queue, err := rabbit.Channel.QueueDeclare(
+ "", // queue name (auto-gen)
+ false, // durable
+ true, // autoDelete (get rid of if consumer discos or server restarts)
+ true, // exclusive
+ false, // noWait (i.e. block until finished)
+ nil, // args
+ )
+ // pass by reference so we can fit in our struct
+ rabbit.Queue = &queue
+ failOnError(err, "failed queue declaration")
+ log.Printf("new queue declared %s", rabbit.Queue.Name)
+ }
+
+ // last two are block on binding and args
+ err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil)
+ failOnError(err, "failed queue binding")
+ return rabbit
-func SendJSON(data []byte, rabbit RabbitConfig, verboseSend bool) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- err := rabbit.Channel.PublishWithContext(ctx,
- rabbit.Exchange, // exchange
- rabbit.RoutingKey, // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "application/json",
- Body: data,
- })
- failOnError(err, "Failed to publish a message")
- if verboseSend {
- log.Printf("Sent %s routing: %s\n", data, rabbit.RoutingKey)
+}
+
+// starts consuming with all our presets and returns channel
+func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery){
+ log.Printf("consuming on queue %+v", rabbit.Queue)
+ deliveries, err := rabbit.Channel.Consume(
+ rabbit.Queue.Name,
+ "", // consumer (auto-gen)
+ true, // autoAck
+ true, // exclusive
+ false, // noLocal - does nothing?
+ false, // wait and block please
+ nil, // args
+ )
+ failOnError(err, "failed channel consume")
+ return deliveries
+}
+
+func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}){
+ //var result map[string]string //interface{}
+ result := make(map[string]interface{})
+ // either explicitly json, or zigbee2mqtt (which we have set to only output json)
+ isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt")
+ if delivery.ContentType == "application/json" || isZigbee {
+ err := json.Unmarshal(delivery.Body, &result)
+ if isZigbee {
+ // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
+ // try to further decode this in an effort to help the data consumer
+ _, actionPresent := result["action"]
+ if actionPresent {
+ var decode map[string]interface{}
+ json.Unmarshal([]byte(result["action"].(string)), &decode)
+ result["action"] = decode
+ }
}
+ failOnError(err, "failed to decode json")
+ } else {
+ result["data"] = string(delivery.Body[:])
+ }
+ return result
}
+
+func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool){
+ if rabbitData.IncludeDate {
+ // add the date as a convenience
+ utcNow := time.Now().UTC()
+ dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
+ rabbitData.Data["date"] = dateString
+ }
+ if rabbit.Source != "" {
+ rabbitData.Data["source"] = rabbit.Source
+ }
+
+ jsonBytes, err := json.Marshal(rabbitData.Data)
+ failOnError(err, "Improperly formatted JSON, just gonna exit here")
+ var routingKey string
+ if rabbitData.RoutingKey != "" {
+ routingKey = rabbitData.RoutingKey
+ } else if rabbit.DefaultRoutingKey != "" {
+ routingKey = rabbit.DefaultRoutingKey
+ } else {
+ failOnError(fmt.Errorf("routing key error"), "No valid routing key found to send on")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ err = rabbit.Channel.PublishWithContext(ctx,
+ rabbit.Exchange, // exchange
+ routingKey, // routing key
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ ContentType: "application/json",
+ Body: jsonBytes,
+ })
+ failOnError(err, "Failed to publish a message")
+ if verboseSend {
+ log.Printf("Sent %s routing: %s\n", jsonBytes, routingKey)
+ }
+
+}
--- /dev/null
+// basic stuff
+package main
+
+import (
+ "fmt"
+ "log"
+ "os"
+ "time"
+
+ // prometheus imports
+ "net/http"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+
+ "unpiege.net/rabbit_go.git/helper"
+)
+
+var (
+ tempGauge = promauto.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "temp_sensor",
+ Help: "Used to measure temperatures in the real world",
+ },
+ []string{"scale", "location"},
+ )
+ //tempExpire = map[prometheus.Labels]time.Time
+ //tempExpire = map[string]string
+
+ tempExpire = make(map[[2]string]time.Time)
+
+ staleTime = time.Duration(15 * time.Second)
+
+)
+
+
+type Device interface {
+ getRoutingKey() string
+}
+
+type ZigDevice struct {
+ routingKey string
+}
+
+type DIYDevice struct {
+ *ZigDevice
+}
+
+type PowerDevice struct {
+ *ZigDevice
+ powerName string
+ queryNeeded bool
+}
+
+
+func (dev *DIYDevice) getRoutingKey() (string){
+ return fmt.Sprintf("zigbee2mqtt.%s", dev.routingKey)
+}
+
+func NewDevice(routingKey string) (*ZigDevice){
+ retval := ZigDevice{routingKey: routingKey}
+ //retval.routingKey = routingKey
+ return &retval
+}
+
+func NewDIYDevice(routingKey string) (*DIYDevice){
+ return &DIYDevice{
+ ZigDevice: NewDevice(routingKey),
+ }
+
+}
+
+//tempExpire := make(map[[2]string]time.Time)
+
+func failOnError(err error, msg string) {
+ if err != nil {
+ log.Panicf("%s: %s", msg, err)
+ }
+}
+
+func handleTemp(obj map[string]interface{}){
+ log.Printf("temp! %v", obj)
+ celsius := obj["celsius"].(float64)
+ location := obj["location"].(string)
+ now := time.Now().UTC()
+ // TODO: last update portion
+
+ // do the label update here
+ //log.Printf("celsius: %f location %s", celsius, location)
+ tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius)
+ tempExpire[[2]string{"celsius", location}] = now
+
+}
+
+func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend){
+ log.Printf("diy! %v", obj)
+ _, isAction := obj["action"]
+ if isAction {
+ // this is yet more JSON, so decode it
+ data := obj["action"].(map[string]interface{})
+/* err := json.Unmarshal([]byte(obj["action"].(string)), &data)
+ if err != nil {
+ log.Printf("Unable to decode json action from diy, ignoring")
+ return
+ }*/
+ var senseType string
+
+ _, isTemp := data["celsius"]
+ if isTemp {
+ senseType = "temperature"
+ } else {
+ _, hasType := data["type"]
+ if hasType {
+ senseType = data["type"].(string)
+ } else {
+ log.Printf("Did not find any sense type, just returning")
+ return
+ }
+ }
+ // should have the senseType setup correctly now, if we're still here
+ if senseType == "temperature" {
+ tempMap := make(map[string]interface{}, 0)
+ // these should all exist properly
+ tempMap["fahrenheit"] = data["celsius"].(float64) * 9/5 + 32
+ tempMap["celsius"] = data["celsius"].(float64)
+ tempMap["location"] = data["location"].(string)
+ log.Printf("would send %v", tempMap)
+ // hardcoded temp routingKey for this type of measurement
+ sendThis := helper.RabbitSend{Data: tempMap, RoutingKey: "lol", IncludeDate: true}
+ sendChannel <- sendThis
+ }
+
+ } else {
+ log.Printf("not action, ignoring")
+ }
+
+}
+
+func readLoop(channel chan helper.RabbitSend, devices []Device, rabbit helper.RabbitConfig) {
+ for _, device := range devices {
+ rabbit = helper.Bind(device.getRoutingKey(), rabbit)
+ log.Printf("bound to %s", device.getRoutingKey())
+ }
+ rabbit = helper.Bind("temp", rabbit)
+ log.Printf("bound to temp")
+ deliveries := helper.StartConsuming(rabbit)
+ for delivery := range deliveries {
+ //log.Printf("got a delivery %+v", delivery)
+ item := helper.DecodeDelivery(delivery)
+ //log.Printf("%+v", item)
+ if delivery.RoutingKey == "temp" {
+ handleTemp(item)
+ }
+ for _, device := range devices {
+ if device.getRoutingKey() == delivery.RoutingKey {
+ log.Printf("Found device for routing key %s, %v", delivery.RoutingKey, device)
+ switch t := device.(type) {
+ case *DIYDevice:
+ log.Printf("DIY device, sending to handle")
+ handleDIY(item, channel)
+ default:
+ log.Printf("other device, type was %s", t)
+ }
+ }
+ }
+ //time.Sleep(time.Duration(0.5*float64(time.Second)))
+ }
+}
+func timeLoop(channel chan helper.RabbitSend) {
+ for {
+ time.Sleep(time.Duration(0.5*float64(time.Second)))
+ }
+}
+
+func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig){
+ for sendItem := range channel {
+ log.Printf("would try to send here %v", sendItem)
+ helper.SendData(sendItem, rabbit, true)
+ //helper.SendData(sendData, rabbit, false)
+ }
+}
+
+func promSetup(){
+ // prometheus setup
+ http.Handle("/metrics", promhttp.Handler())
+ http.ListenAndServe(":9099", nil)
+}
+
+
+func expireStaleMetrics(){
+ now := time.Now().UTC()
+ // tempExpire
+ tempExpireValues := make([][2]string, 0)
+ for labels, lastUpdate := range tempExpire {
+ curStale := now.Sub(lastUpdate)
+ if curStale > staleTime {
+ log.Printf("stale time %v, expiring labels %v\tlastUpdate: %v", curStale, labels, lastUpdate)
+ tempExpireValues = append(tempExpireValues, labels)
+ }
+ }
+
+ // now we have the values to delete
+ for _, labels := range tempExpireValues {
+ complete := tempGauge.DeleteLabelValues(labels[:]...)
+ if complete {
+ delete(tempExpire, labels)
+ } else {
+ log.Panicf("oh no wasn't able to delete metrics")
+ }
+ }
+
+}
+
+func main() {
+ rabbit := helper.SetupRabbit(os.Args[1], "", "reprocess") // config file, default routing key, source
+ const sleepTime time.Duration = 5 * time.Second
+
+ devices := make([]Device, 0)
+ devices = append(devices, NewDIYDevice("office_pico"))
+
+ //currentTemp, err := fetchTemp(weatherStation)
+ channel := make(chan helper.RabbitSend)
+
+ go promSetup()
+ go readLoop(channel, devices, rabbit)
+ go timeLoop(channel)
+ go sendLoop(channel, rabbit)
+
+ for true {
+ time.Sleep(sleepTime)
+ go expireStaleMetrics()
+ }
+
+}
package main
import (
- "encoding/json"
+ //"encoding/json"
//"fmt"
//"io"
"log"
return sum
}
-func timeLoop(channel chan []byte, timeColors []TimeColor) {
+func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) {
loc, _ := time.LoadLocation("America/Los_Angeles")
for {
//log.Printf("okay just looping here")
sendObj["red"] = totalRGB.Red
sendObj["green"] = totalRGB.Green
sendObj["blue"] = totalRGB.Blue
-
- data, _ := json.Marshal(sendObj)
+ data := helper.RabbitSend{Data: sendObj}
channel <- data
time.Sleep(time.Duration(0.5*float64(time.Second)))
}
}
-func sendLoop(channel chan []byte, rabbit helper.RabbitConfig){
+func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig){
for sendData := range channel {
- //log.Printf("would try to send here %s", sendData)
- helper.SendJSON(sendData, rabbit, false)
+ helper.SendData(sendData, rabbit, true)
}
}
func main() {
- rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py") // config file, routing key
+ rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py", "timecolorshift") // config file, routing key
const sleepTime time.Duration = 30 * time.Second
//currentTemp, err := fetchTemp(weatherStation)
- channel := make(chan []byte)
+ channel := make(chan helper.RabbitSend)
all_tcs := make([]TimeColor, 0)
all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour:12}))
all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour:8}))
package main
import (
- "encoding/json"
+ //"encoding/json"
"fmt"
"io"
"log"
"time"
"github.com/PuerkitoBio/goquery"
- "github.com/leekchan/timeutil"
+ //"github.com/leekchan/timeutil"
"unpiege.net/rabbit_go.git/helper"
)
} else {
sendTemp(currentTemp, celsius, rabbit)
}
- fmt.Printf("temp list is %s, avg is %f\n", tempList, tempAvg)
+ fmt.Printf("temp list is %+v, avg is %f\n", tempList, tempAvg)
}
}
}
sendObj["fahrenheit"] = tempValueF
// just a constant now
sendObj["location"] = "Outside (Wunderground)"
- utcNow := time.Now().UTC()
- dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
- sendObj["date"] = dateString
- data, err := json.Marshal(sendObj)
- failOnError(err, "oh no json marshal")
- helper.SendJSON(data, rabbit, true)
+ rabbitData := helper.RabbitSend{Data: sendObj, IncludeDate: true}
+ helper.SendData(rabbitData, rabbit, true)
}
func main() {
- rabbit := helper.SetupRabbit(os.Args[1], "temp") // config file, routing key
+ rabbit := helper.SetupRabbit(os.Args[1], "temp", "wunder") // config file, routing key
const weatherStation string = "KWASEATT2696"
const sleepTime time.Duration = 30 * time.Second