From: jweigele Date: Mon, 12 Dec 2022 01:25:50 +0000 (-0800) Subject: Large logging and flags revamp X-Git-Url: http://git.hexthepla.net/?a=commitdiff_plain;h=cb0b7cddb9dacac14760607fe2905b846e80ca77;p=rabbit_go Large logging and flags revamp * Everything is using klog (interfaced with logr) to do log output now * This provides different visibility levels for debug and a better format * Init flags and logger creation in main (emulate other files mostly) * As part of the revamp, provide some structured yaml marshalling for future config --- diff --git a/Dockerfile.reprocess b/Dockerfile.reprocess index 4c7a49d..3d32555 100644 --- a/Dockerfile.reprocess +++ b/Dockerfile.reprocess @@ -18,4 +18,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags="-w -s" -o FROM alpine:edge COPY --from=builder /go/bin/reprocess /bin/reprocess -ENTRYPOINT ["/bin/reprocess"] +ENTRYPOINT ["/bin/reprocess", "-config", "/conf/config-docker.json"] diff --git a/Dockerfile.timecolorshift b/Dockerfile.timecolorshift index 91ac406..af0a5fb 100644 --- a/Dockerfile.timecolorshift +++ b/Dockerfile.timecolorshift @@ -19,4 +19,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags="-w -s" -o FROM alpine:edge COPY --from=builder /go/bin/timecolorshift /bin/timecolorshift RUN apk add tzdata -ENTRYPOINT ["/bin/timecolorshift"] +ENTRYPOINT ["/bin/timecolorshift", "-config", "/conf/config-docker.json"] diff --git a/Dockerfile.wunder b/Dockerfile.wunder index 70baabe..51a4de7 100644 --- a/Dockerfile.wunder +++ b/Dockerfile.wunder @@ -18,4 +18,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags="-w -s" -o FROM alpine:edge COPY --from=builder /go/bin/wunder /bin/wunder -ENTRYPOINT ["/bin/wunder"] +ENTRYPOINT ["/bin/wunder", "-config", "/conf/config-docker.json"] diff --git a/go.mod b/go.mod index 4510935..e54c84e 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,12 @@ go 1.18 require ( github.com/PuerkitoBio/goquery v1.8.0 + github.com/go-logr/logr v1.2.3 github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d github.com/prometheus/client_golang v1.14.0 github.com/rabbitmq/amqp091-go v1.5.0 + gopkg.in/yaml.v2 v2.4.0 + k8s.io/klog/v2 v2.80.1 ) require ( diff --git a/helper/helper.go b/helper/helper.go index 9e966aa..be65718 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -1,196 +1,256 @@ -package helper +// Package helper - various helper functions associated with managing rabbitmq connections easier +package helper import ( - "context" - "crypto/tls" - "crypto/x509" - "encoding/json" - "fmt" -// "io" - "io/ioutil" - "log" - "strings" -// "math" -// "net/http" -// "os" -// "strconv" - "time" - -// "github.com/PuerkitoBio/goquery" - "github.com/leekchan/timeutil" - amqp "github.com/rabbitmq/amqp091-go" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + + "io/ioutil" + "strings" + + "sync" + "time" + + "github.com/go-logr/logr" + "github.com/leekchan/timeutil" + amqp "github.com/rabbitmq/amqp091-go" + "k8s.io/klog/v2" + "k8s.io/klog/v2/klogr" + //logging things ) +var logger logr.Logger +var sendMutex sync.Mutex + +func init() { + // do this for everyone, once + klog.InitFlags(nil) + // our package logger + logger = klogr.New() +} + +// NewLogger helps to keep the specific logging implementation details hidden from other files +// allowing for a switch if needed (who knows if this really matters) +func NewLogger() logr.Logger { + return klogr.New() +} // RabbitSend just a bundle of a map with a routing key type RabbitSend struct { - Data map[string]interface{} - RoutingKey string - IncludeDate bool // defaults to false, so we don't send the date + Data map[string]interface{} + RoutingKey string + IncludeDate bool // defaults to false, so we don't send the date } // RabbitConfig simple values to pass around type RabbitConfig struct { - Exchange string - Channel *amqp.Channel - Queue *amqp.Queue - DefaultRoutingKey string - Source string + Exchange string + Channel *amqp.Channel + Queue *amqp.Queue + DefaultRoutingKey string + Source string } -func failOnError(err error, msg string) { - if err != nil { - log.Panicf("%s: %s", msg, err) - } -} +// SetupRabbit actually connects to the rabbitmq server, and returns global config values for later use +func SetupRabbit(configFilename string, defaultRoutingKey string, source string) (RabbitConfig, error) { + // grab the mutex, unlock when we're done + sendMutex.Lock() + defer sendMutex.Unlock() + + configJSON, err := ioutil.ReadFile(configFilename) + if err != nil { + return RabbitConfig{}, err + } + var result map[string]interface{} + json.Unmarshal([]byte(configJSON), &result) + + // no cert verification because we're self-signed + cfg := new(tls.Config) + cfg.RootCAs = x509.NewCertPool() + cfg.InsecureSkipVerify = true + conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg) + if err != nil { + return RabbitConfig{}, err + } + //defer conn.Close() -func SetupRabbit(configFilename string, defaultRoutingKey string, source string) RabbitConfig { - configJSON, err := ioutil.ReadFile(configFilename) - if err != nil { - failOnError(err, "oh no read") - } - var result map[string]interface{} - json.Unmarshal([]byte(configJSON), &result) - fmt.Println(result) - - // no cert verification because we're self-signed - cfg := new(tls.Config) - cfg.RootCAs = x509.NewCertPool() - cfg.InsecureSkipVerify = true - - conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg) - failOnError(err, "Failed to connect to RabbitMQ") - //defer conn.Close() - - rabbitChannel, err := conn.Channel() - failOnError(err, "Failed to open a rabbitChannel") - //defer rabbitChannel.Close() - - err = rabbitChannel.ExchangeDeclare( - result["exchange"].(string), // name - amqp.ExchangeTopic, // type - true, // durable - false, // autodelete - false, // internal (doesn't accept publish so...) - false, // block and wait for call to finish - nil, // optional args I don't know any - ) - failOnError(err, "Failed exchange stuff") - - err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages - failOnError(err, "Failed confirm") - var rabbit RabbitConfig - rabbit.Exchange = result["exchange"].(string) - rabbit.Channel = rabbitChannel - // constants - rabbit.DefaultRoutingKey = defaultRoutingKey - rabbit.Source = source - return rabbit + rabbitChannel, err := conn.Channel() + if err != nil { + return RabbitConfig{}, err + } + //defer rabbitChannel.Close() + + err = rabbitChannel.ExchangeDeclare( + result["exchange"].(string), // name + amqp.ExchangeTopic, // type + true, // durable + false, // autodelete + false, // internal (doesn't accept publish so...) + false, // block and wait for call to finish + nil, // optional args I don't know any + ) + if err != nil { + return RabbitConfig{}, err + } + + err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages + if err != nil { + return RabbitConfig{}, err + } + var rabbit RabbitConfig + rabbit.Exchange = result["exchange"].(string) + rabbit.Channel = rabbitChannel + // constants + rabbit.DefaultRoutingKey = defaultRoutingKey + rabbit.Source = source + logger.Info("Connected to RabbitMQ", + "host", result["host"], + "user", result["user"], + "vhost", result["vhost"], + "defaultRoutingKey", defaultRoutingKey, + "source", source, + ) + return rabbit, nil } -// simple, bind to a (topic) with our default params +// Bind - binds to a topic with our default params // will declare a queue if we don't have one -func Bind(routingKey string, rabbit RabbitConfig) (RabbitConfig){ - // if queue hasn't yet been initialized (no other bound topics basically) - if rabbit.Queue == nil { - queue, err := rabbit.Channel.QueueDeclare( - "", // queue name (auto-gen) - false, // durable - true, // autoDelete (get rid of if consumer discos or server restarts) - true, // exclusive - false, // noWait (i.e. block until finished) - nil, // args - ) - // pass by reference so we can fit in our struct - rabbit.Queue = &queue - failOnError(err, "failed queue declaration") - log.Printf("new queue declared %s", rabbit.Queue.Name) - } - - // last two are block on binding and args - err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil) - failOnError(err, "failed queue binding") - return rabbit +func Bind(routingKey string, rabbit *RabbitConfig) error { + // grab the mutex, unlock when we're done + sendMutex.Lock() + defer sendMutex.Unlock() + + // if queue hasn't yet been initialized (no other bound topics basically) + if rabbit.Queue == nil { + queue, err := rabbit.Channel.QueueDeclare( + "", // queue name (auto-gen) + false, // durable + true, // autoDelete (get rid of if consumer discos or server restarts) + true, // exclusive + false, // noWait (i.e. block until finished) + nil, // args + ) + // pass by reference so we can fit in our struct + rabbit.Queue = &queue + if err != nil { + return err + } + logger.Info("new queue declared", "queue", rabbit.Queue) + } + + // last two are block on binding and args + err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil) + if err != nil { + return err + } + logger.Info("Bound queue to exchange", "queue", rabbit.Queue, "routingKey", routingKey) + return nil } -// starts consuming with all our presets and returns channel -func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery){ - log.Printf("consuming on queue %+v", rabbit.Queue) - deliveries, err := rabbit.Channel.Consume( - rabbit.Queue.Name, - "", // consumer (auto-gen) - true, // autoAck - true, // exclusive - false, // noLocal - does nothing? - false, // wait and block please - nil, // args - ) - failOnError(err, "failed channel consume") - return deliveries +// StartConsuming - starts consuming with all our presets and returns channel of deliveries +func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery, error) { + // grab the mutex, unlock when we're done + sendMutex.Lock() + defer sendMutex.Unlock() + + deliveries, err := rabbit.Channel.Consume( + rabbit.Queue.Name, + "", // consumer (auto-gen) + true, // autoAck + true, // exclusive + false, // noLocal - does nothing? + false, // immediate (wait and block please) + nil, // args + ) + logger.Info("consuming on queue", "queue", rabbit.Queue) + return deliveries, err } -func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}){ - //var result map[string]string //interface{} - result := make(map[string]interface{}) - // either explicitly json, or zigbee2mqtt (which we have set to only output json) - isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt") - if delivery.ContentType == "application/json" || isZigbee { - err := json.Unmarshal(delivery.Body, &result) - if isZigbee { - // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value - // try to further decode this in an effort to help the data consumer - _, actionPresent := result["action"] - if actionPresent { - var decode map[string]interface{} - json.Unmarshal([]byte(result["action"].(string)), &decode) - result["action"] = decode - } - } - failOnError(err, "failed to decode json") - } else { - result["data"] = string(delivery.Body[:]) - } - return result +// DecodeDelivery - takes a presumably byte-formatted json stream and makes it a map for us +// will return errors if things don't actually work out that well +func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}, error) { + //var result map[string]string //interface{} + result := make(map[string]interface{}) + // either explicitly json, or zigbee2mqtt (which we have set to only output json) + isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt") + if delivery.ContentType == "application/json" || isZigbee { + err := json.Unmarshal(delivery.Body, &result) + if err != nil { + return result, err + } + if isZigbee { + // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value + // try to further decode this in an effort to help the data consumer + _, actionPresent := result["action"] + if actionPresent { + var decode map[string]interface{} + err := json.Unmarshal([]byte(result["action"].(string)), &decode) + if err != nil { + return decode, err + } + result["action"] = decode + } + } + } else { + result["data"] = string(delivery.Body[:]) + } + return result, nil } - -func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool){ - if rabbitData.IncludeDate { - // add the date as a convenience - utcNow := time.Now().UTC() - dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") - rabbitData.Data["date"] = dateString - } - - if rabbit.Source != "" { - rabbitData.Data["source"] = rabbit.Source - } - - jsonBytes, err := json.Marshal(rabbitData.Data) - failOnError(err, "Improperly formatted JSON, just gonna exit here") - var routingKey string - if rabbitData.RoutingKey != "" { - routingKey = rabbitData.RoutingKey - } else if rabbit.DefaultRoutingKey != "" { - routingKey = rabbit.DefaultRoutingKey - } else { - failOnError(fmt.Errorf("routing key error"), "No valid routing key found to send on") - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err = rabbit.Channel.PublishWithContext(ctx, - rabbit.Exchange, // exchange - routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: jsonBytes, - }) - failOnError(err, "Failed to publish a message") - if verboseSend { - log.Printf("Sent %s routing: %s\n", jsonBytes, routingKey) - } + +// SendData takes data using parameters from our own structs RabbitSend and RabbitConfig, and puts it out on the given routingkey +func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool) error { + // grab the mutex, unlock when we're done + sendMutex.Lock() + defer sendMutex.Unlock() + + if rabbitData.IncludeDate { + // add the date as a convenience + utcNow := time.Now().UTC() + dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S") + rabbitData.Data["date"] = dateString + } + + if rabbit.Source != "" { + rabbitData.Data["source"] = rabbit.Source + } + + jsonBytes, err := json.Marshal(rabbitData.Data) + if err != nil { + return err + } + var routingKey string + if rabbitData.RoutingKey != "" { + routingKey = rabbitData.RoutingKey + } else if rabbit.DefaultRoutingKey != "" { + routingKey = rabbit.DefaultRoutingKey + } else { + if err != nil { + return fmt.Errorf("No valid routing key to send on") + } + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = rabbit.Channel.PublishWithContext(ctx, + rabbit.Exchange, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: jsonBytes, + }) + if err != nil { + return err + } + + if verboseSend { + logger.Info("Sent data to rabbitmq", "data", rabbitData.Data, "routingKey", routingKey) + } + return nil } diff --git a/reprocess/main.go b/reprocess/main.go index 3a3dac4..12745d1 100644 --- a/reprocess/main.go +++ b/reprocess/main.go @@ -2,16 +2,21 @@ package main import ( + "flag" "fmt" - "log" "os" + "reflect" "time" // prometheus imports + "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http" + + // logging interface + "github.com/go-logr/logr" "unpiege.net/rabbit_go.git/helper" ) @@ -24,82 +29,130 @@ var ( }, []string{"scale", "location"}, ) - //tempExpire = map[prometheus.Labels]time.Time - //tempExpire = map[string]string + + powerGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "ePDUOutletStatusActivePower", + Help: "Used as an analog to power snmp fetching", + }, + []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"}, + ) tempExpire = make(map[[2]string]time.Time) - staleTime = time.Duration(15 * time.Second) + staleTime = time.Duration(15 * time.Second) + logger logr.Logger + configFilename string ) -type Device interface { +type device interface { getRoutingKey() string } -type ZigDevice struct { - routingKey string +type zigdevice struct { + friendlyName string } -type DIYDevice struct { - *ZigDevice +type diydevice struct { + *zigdevice } -type PowerDevice struct { - *ZigDevice - powerName string - queryNeeded bool +type powertime struct { + timeLogged time.Time + powerValue float64 } -func (dev *DIYDevice) getRoutingKey() string { - return fmt.Sprintf("zigbee2mqtt.%s", dev.routingKey) +type powerdevice struct { + *zigdevice + powerName string + queryNeeded bool + lastUpdate time.Time + lastUpdateRequested time.Time + powerList []powertime } -func NewDevice(routingKey string) *ZigDevice { - retval := ZigDevice{routingKey: routingKey} - //retval.routingKey = routingKey - return &retval +func (dev *powerdevice) filter() { + // if this gets excessive we can pass it down the chain + now := time.Now().UTC() + // default time to keep power values around + const filterTime = 60 + + // filter to ignore old values + var sliceStart int + for index, pt := range dev.powerList { + // break when we find our first valid value (since they should be sorted) + sliceStart = index + if pt.timeLogged.After(now.Add(-time.Second * filterTime)) { + break + } + } + + // reslice, excluding all previous (cutoff) values + dev.powerList = dev.powerList[sliceStart:] + } -func NewDIYDevice(routingKey string) *DIYDevice { - return &DIYDevice{ - ZigDevice: NewDevice(routingKey), +func (dev *powerdevice) avgPower() float64 { + totalPower := 0.0 + // sum up the power list + for _, pt := range dev.powerList { + totalPower = totalPower + pt.powerValue } + // average and return + return totalPower / float64(len(dev.powerList)) +} +func (dev *powerdevice) add(newTime time.Time, newVal float64) { + dev.powerList = append(dev.powerList, powertime{timeLogged: newTime, powerValue: newVal}) + dev.filter() } -//tempExpire := make(map[[2]string]time.Time) +func (dev *zigdevice) getRoutingKey() string { + return fmt.Sprintf("zigbee2mqtt.%s", dev.friendlyName) +} -func failOnError(err error, msg string) { - if err != nil { - log.Panicf("%s: %s", msg, err) +func (dev *powerdevice) getReqKey() string { + return fmt.Sprintf("zigbee2mqtt.%s.get", dev.friendlyName) +} + +func newdevice(friendlyName string) *zigdevice { + retval := zigdevice{friendlyName: friendlyName} + return &retval +} + +func newdiydevice(friendlyName string) *diydevice { + return &diydevice{ + zigdevice: newdevice(friendlyName), + } + +} + +func newpowerdevice(friendlyName, powerName string, queryNeeded bool) *powerdevice { + return &powerdevice{ + zigdevice: newdevice(friendlyName), + powerName: powerName, + queryNeeded: queryNeeded, } } func handleTemp(obj map[string]interface{}) { - log.Printf("temp! %v", obj) + logger.V(1).Info("Temperature received", "obj", obj) celsius := obj["celsius"].(float64) location := obj["location"].(string) now := time.Now().UTC() - // TODO: last update portion - // do the label update here - //log.Printf("celsius: %f location %s", celsius, location) tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius) + // here, we might have any number of temp values some not covered by our own objects tempExpire[[2]string{"celsius", location}] = now } func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { - log.Printf("diy! %v", obj) + logger.V(1).Info("DIY data received", "obj", obj) _, isAction := obj["action"] if isAction { - // this is yet more JSON, so decode it + // this should also already be decoded for us data := obj["action"].(map[string]interface{}) - /* err := json.Unmarshal([]byte(obj["action"].(string)), &data) - if err != nil { - log.Printf("Unable to decode json action from diy, ignoring") - return - }*/ var senseType string _, isTemp := data["celsius"] @@ -110,7 +163,7 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { if hasType { senseType = data["type"].(string) } else { - log.Printf("Did not find any sense type, just returning") + logger.Info("Did not find any sense type, just returning") return } } @@ -122,7 +175,7 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { dataMap["fahrenheit"] = data["celsius"].(float64)*9/5 + 32 dataMap["celsius"] = data["celsius"].(float64) dataMap["location"] = data["location"].(string) - log.Printf("would send %v", dataMap) + logger.V(2).Info("Sending reprocessed temperature") // hardcoded temp routingKey for this type of measurement sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true} sendChannel <- sendThis @@ -130,58 +183,131 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { dataMap["motion_detected"] = data["state"].(bool) // copied from above, but it might change later so shrug dataMap["location"] = data["location"].(string) - log.Printf("sending reprocessed motion: %v", dataMap) + logger.V(2).Info("Sending reprocessed motion") sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true} sendChannel <- sendThis } else { - log.Printf("error! sense type not detected, ignoring") + logger.Info("Sense type not detected, ignoring") } } else { - log.Printf("not action, ignoring") + logger.V(2).Info("not action, ignoring") } } -func readLoop(channel chan helper.RabbitSend, devices []Device, rabbit helper.RabbitConfig) { +// are we getting new data on behalf of a request (if spammy), or is the reporting configured so it doesn't spam? +// in other words, do we trust this data is evenly spaced? +func (dev *powerdevice) shouldUpdate(now time.Time) bool { + const updateStalenessSeconds = 2 + // well-behaved OR in the last 2 seconds since we requested + if now.Add(updateStalenessSeconds * -time.Second).Before(dev.lastUpdateRequested) { + return true + } + return false +} + +func handlePower(obj map[string]interface{}, sendChannel chan helper.RabbitSend, triggerDevice *powerdevice) { + now := time.Now().UTC() + logger.V(1).Info("Power data received", "obj", obj) + //name := triggerDevice.powerName + // here, we're directly tracking power devices so we can bind updated directly to the object + triggerDevice.lastUpdate = now + if triggerDevice.shouldUpdate(now) { + triggerDevice.add(now, obj["power"].(float64)) + powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": triggerDevice.friendlyName, "ePDUOutletStatusOutletName": triggerDevice.powerName}).Set(triggerDevice.avgPower()) + // don't worry about expiration, since we have all the values in our objects + } + +} + +func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.RabbitConfig) { for _, device := range devices { - rabbit = helper.Bind(device.getRoutingKey(), rabbit) - log.Printf("bound to %s", device.getRoutingKey()) + err := helper.Bind(device.getRoutingKey(), &rabbit) + // TODO: find a better way to bubble this up + if err != nil { + logger.Error(err, "unable to bind successfully to exchange", "routingKey", device.getRoutingKey()) + os.Exit(1) + } + } + /// this one is hardcoded, for the generic temperature output + err := helper.Bind("temp", &rabbit) + if err != nil { + logger.Error(err, "unable to bind successfully to exchange", "routingKey", "temp") + os.Exit(1) } - rabbit = helper.Bind("temp", rabbit) - log.Printf("bound to temp") - deliveries := helper.StartConsuming(rabbit) + + deliveries, err := helper.StartConsuming(rabbit) + if err != nil { + logger.Error(err, "unable to start consuming data from rabbit") + os.Exit(1) + } + for delivery := range deliveries { - //log.Printf("got a delivery %+v", delivery) - item := helper.DecodeDelivery(delivery) - //log.Printf("%+v", item) + item, err := helper.DecodeDelivery(delivery) + // it's just one delivery so we're going to yell and then continue along unabated + if err != nil { + logger.Error(err, "unable to decode delivery", "delivery", delivery) + continue + } + + // only if you really want to see it all + logger.V(2).Info("Received a delivery", "delivery", delivery) if delivery.RoutingKey == "temp" { handleTemp(item) } for _, device := range devices { if device.getRoutingKey() == delivery.RoutingKey { - log.Printf("Found device for routing key %s, %v", delivery.RoutingKey, device) + logger.V(3).Info("Found device for routing key", "routingKey", delivery.RoutingKey, "device", reflect.TypeOf(device).String()) switch t := device.(type) { - case *DIYDevice: - log.Printf("DIY device, sending to handle") + case *diydevice: + logger.V(2).Info("DIY device, sending to handle", "device", device) handleDIY(item, channel) + case *powerdevice: + logger.V(2).Info("Power device, sending to handle", "device", device) + // it's explicitly a powerdevice, so cast it before sending + handlePower(item, channel, device.(*powerdevice)) default: - log.Printf("other device, type was %s", t) + logger.Info("Found a device we can't classify", "device", device, "type", t) } } } //time.Sleep(time.Duration(0.5*float64(time.Second))) } } -func timeLoop(channel chan helper.RabbitSend) { +func powerLoop(sendChannel chan helper.RabbitSend, devices []device) { + const powerRequestSeconds = 15 + // first, build up a list of the power devices since those are the ones we care about + var pds []*powerdevice + for _, device := range devices { + switch device.(type) { + case *powerdevice: + pds = append(pds, device.(*powerdevice)) + } + } + + logger.V(3).Info("Built up PD list", "pds", pds) + + var reqMap = make(map[string]interface{}, 0) + reqMap["power"] = "" for { - time.Sleep(time.Duration(0.5 * float64(time.Second))) + // send requests for power on all devices + now := time.Now().UTC() + for _, pd := range pds { + logger.V(1).Info("Requesting power for device", "device", pd) + sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey()} + sendChannel <- sendThis + + pd.lastUpdateRequested = now + } + + // then sleep for the next cycle + time.Sleep(time.Duration(powerRequestSeconds * float64(time.Second))) } } func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) { for sendItem := range channel { - log.Printf("would try to send here %v", sendItem) helper.SendData(sendItem, rabbit, true) //helper.SendData(sendData, rabbit, false) } @@ -200,7 +326,7 @@ func expireStaleMetrics() { for labels, lastUpdate := range tempExpire { curStale := now.Sub(lastUpdate) if curStale > staleTime { - log.Printf("stale time %v, expiring labels %v\tlastUpdate: %v", curStale, labels, lastUpdate) + logger.Info("stale metric update time, expiring labels", "curStale", curStale, "labels", labels, "lastUpdate", lastUpdate) tempExpireValues = append(tempExpireValues, labels) } } @@ -211,36 +337,52 @@ func expireStaleMetrics() { if complete { delete(tempExpire, labels) } else { - log.Panicf("oh no wasn't able to delete metrics") + logger.Info("unable to delete metrics, exiting") + os.Exit(1) } } } -func setupLogging() { - log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) -} - func main() { - setupLogging() + // logging and flag initialization + flag.StringVar(&configFilename, "config", "", "the config filename") + logger = helper.NewLogger() + flag.Parse() + + if configFilename == "" { + logger.Error(fmt.Errorf("need to define config in order to load!"), "invalid config") + os.Exit(1) + } - rabbit := helper.SetupRabbit(os.Args[1], "", "reprocess") // config file, default routing key, source + rabbit, err := helper.SetupRabbit(configFilename, "", "reprocess") // config file, default routing key, source + if err != nil { + logger.Error(err, "unable to setup rabbit") + os.Exit(1) + } + + defer rabbit.Channel.Close() const sleepTime time.Duration = 5 * time.Second - devices := make([]Device, 0) - devices = append(devices, NewDIYDevice("office_pico")) - devices = append(devices, NewDIYDevice("stairway_pico")) - devices = append(devices, NewDIYDevice("stairway_pico")) - devices = append(devices, NewDIYDevice("downstairs_stairway_pico")) - devices = append(devices, NewDIYDevice("upstairs_pico")) - devices = append(devices, NewDIYDevice("data_closet_pico")) + devices := make([]device, 0) + + // DIY devices + devices = append(devices, newdiydevice("office_pico")) + devices = append(devices, newdiydevice("stairway_pico")) + devices = append(devices, newdiydevice("stairway_pico")) + devices = append(devices, newdiydevice("downstairs_stairway_pico")) + devices = append(devices, newdiydevice("upstairs_pico")) + devices = append(devices, newdiydevice("data_closet_pico")) + + // power devices + devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true)) //currentTemp, err := fetchTemp(weatherStation) channel := make(chan helper.RabbitSend) go promSetup() go readLoop(channel, devices, rabbit) - go timeLoop(channel) + go powerLoop(channel, devices) go sendLoop(channel, rabbit) for true { diff --git a/timecolorshift/main.go b/timecolorshift/main.go index 355a02b..af0f04d 100644 --- a/timecolorshift/main.go +++ b/timecolorshift/main.go @@ -2,40 +2,47 @@ package main import ( - //"encoding/json" - //"fmt" - //"io" - "log" + "flag" + "io/ioutil" "math" - //"net/http" + "os" - //"strconv" "time" //"github.com/leekchan/timeutil" + "github.com/go-logr/logr" + "gopkg.in/yaml.v2" "unpiege.net/rabbit_go.git/helper" ) +var logger logr.Logger +var configFilename, timeConfigFilename string + +type timeConfigYaml struct { + PeakTime TimeFloat `yaml:"PeakTime"` + TimeColors []TimeColor `yaml:"TimeColors"` +} + // TimeFloat allows user to specify times in a more meaningful way type TimeFloat struct { - Hour int - Minute int - Second int + Hour int `yaml:"Hour"` + Minute int `yaml:"Minute"` + Second int `yaml:"Second"` } // TimeColor is one instance of RGB values changing over time type TimeColor struct { - PeakTime TimeFloat - Red float64 - Green float64 - Blue float64 - Extent TimeFloat + PeakTime TimeFloat `yaml:"PeakTime"` + Red float64 `yaml:"Red"` + Green float64 `yaml:"Green"` + Blue float64 `yaml:"Blue"` + Extent TimeFloat `yaml:"Extent"` } type RGB struct { - Red float64 - Green float64 - Blue float64 + Red float64 `yaml:"Red"` + Green float64 `yaml:"Green"` + Blue float64 `yaml:"Blue"` } // make sure no value is greater than 1 @@ -99,27 +106,18 @@ func (tc *TimeColor) RGBValues(evaluationTime time.Time) RGB { minDistance = val } } - //log.Printf("minDistance: %f", minDistance) if minDistance > tc.Extent.Float()/2 { - //log.Printf("turned off") // multiplier is already initialized zero so do nothing } else { extentMultiplier := 1 / tc.Extent.Float() multiplier = math.Cos(math.Pi * extentMultiplier * minDistance) } - //log.Printf("multiplier is %f", multiplier) retval.Mult(multiplier) return retval } -func failOnError(err error, msg string) { - if err != nil { - log.Panicf("%s: %s", msg, err) - } -} - func sum(arr []float64) float64 { sum := 0.0 for _, value := range arr { @@ -131,18 +129,15 @@ func sum(arr []float64) float64 { func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) { loc, _ := time.LoadLocation("America/Los_Angeles") for { - //log.Printf("okay just looping here") curTime := time.Now().UTC().In(loc) - //log.Printf("here's the current time %s", curTime) totalRGB := RGB{} for _, tc := range timeColors { - //log.Printf("Evaluating RGB for %+v", tc) rgb := tc.RGBValues(curTime) - //log.Printf("%+v", rgb) + logger.V(3).Info("RGB values for tc", "tc", tc, "rgb", rgb) totalRGB = totalRGB.Add(rgb) } totalRGB.Mult(0.877) - //log.Printf("TotalRGB: %+v", totalRGB) + logger.V(3).Info("total RGB values", "rgb", totalRGB) sendObj := make(map[string]interface{}) sendObj["red"] = totalRGB.Red @@ -158,22 +153,66 @@ func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) { func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) { for sendData := range channel { - helper.SendData(sendData, rabbit, false) + err := helper.SendData(sendData, rabbit, false) + // TODO: find a cleaner way to bubble this up + if err != nil { + logger.Error(err, "error sending data in send loop, exiting") + os.Exit(1) + } } } +func (c *timeConfigYaml) Parse(data []byte) error { + return yaml.Unmarshal(data, c) +} + func main() { - rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py", "timecolorshift") // config file, routing key + // logging and flag initialization + flag.StringVar(&configFilename, "config", "", "the config filename") + flag.StringVar(&timeConfigFilename, "timeConfig", "", "yaml config filename (for colors)") + + logger = helper.NewLogger() + flag.Parse() + + rabbit, err := helper.SetupRabbit(configFilename, "timecolorshift.py", "timecolorshift") // config file, default routing key, source + if err != nil { + logger.Error(err, "failed to connect to rabbitmq") + os.Exit(1) + } const sleepTime time.Duration = 30 * time.Second //currentTemp, err := fetchTemp(weatherStation) channel := make(chan helper.RabbitSend) - all_tcs := make([]TimeColor, 0) - all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour: 12})) - all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour: 8})) - all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 20}, 0.25, 0.25, 0, TimeFloat{Hour: 12})) - go timeLoop(channel, all_tcs) + var tcy timeConfigYaml + + if timeConfigFilename != "" { + logger.Info("Parsing provided timeConfigFilename", "timeConfigFilename", timeConfigFilename) + data, err := ioutil.ReadFile(timeConfigFilename) + if err != nil { + logger.Error(err, "error reading time config yaml file") + os.Exit(1) + } + //err = yaml.Unmarshal(data, &tcy) + err = tcy.Parse(data) + if err != nil { + logger.Error(err, "error parsing time config yaml file") + os.Exit(1) + } + } else { + logger.Info("Adding default (compiled) time colors") + all_tcs := make([]TimeColor, 0) + all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour: 12})) + all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour: 8})) + all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 20}, 0.25, 0.25, 0, TimeFloat{Hour: 12})) + tcy.TimeColors = all_tcs + } + // just display everything we configured before we boot further + for _, tc := range tcy.TimeColors { + logger.Info("displaying processed timeColors", "TimeColor", tc) + } + + go timeLoop(channel, tcy.TimeColors) go sendLoop(channel, rabbit) for true { time.Sleep(sleepTime) diff --git a/wunder/main.go b/wunder/main.go index a302c0d..53150c4 100644 --- a/wunder/main.go +++ b/wunder/main.go @@ -3,9 +3,9 @@ package main import ( //"encoding/json" + "flag" "fmt" "io" - "log" "math" "net/http" "os" @@ -13,15 +13,14 @@ import ( "time" "github.com/PuerkitoBio/goquery" + "github.com/go-logr/logr" + //"github.com/leekchan/timeutil" "unpiege.net/rabbit_go.git/helper" ) -func failOnError(err error, msg string) { - if err != nil { - log.Panicf("%s: %s", msg, err) - } -} +var logger logr.Logger +var configFilename string func sum(arr []float64) float64 { sum := 0.0 @@ -51,7 +50,7 @@ func extractTemp(htmlReader io.Reader) (float64, error) { if childClass == "wu-value wu-value-to" { currentTemp, err = strconv.ParseFloat(t.Text(), 10) if err != nil { - log.Println(err) + logger.Error(err, "error finding current temp") } foundTemp = true } @@ -59,7 +58,6 @@ func extractTemp(htmlReader io.Reader) (float64, error) { } }) if !foundTemp { - fmt.Println("oh no!") return currentTemp, fmt.Errorf("An error occurred while parsing html data") } return currentTemp, nil @@ -72,7 +70,7 @@ func fetchTemp(weatherStation string) (float64, error) { Timeout: time.Second * 10, } fullURL := fmt.Sprintf("%s/%s", baseURL, weatherStation) - log.Printf("Fetching full url %s\n", fullURL) + logger.Info("Fetching full url", "url", fullURL) req, err := http.NewRequest("GET", fullURL, nil) if err != nil { return 0, err @@ -90,15 +88,14 @@ func fetchTemp(weatherStation string) (float64, error) { func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error { tempList := make([]float64, 0) - for count := range channel { + for _ = range channel { const tempLength int = 60 - log.Printf("doing another fetch, count %d", count) currentTemp, err := fetchTemp(weatherStation) if err != nil { - log.Printf("Wasn't able to get temp %s", err) + logger.Error(err, "msg", "Wasn't able to get temp, ignoring") } else { if currentTemp == 0 { - fmt.Printf("Got 0F for temp, ignoring") + logger.Info("Got 0F for temp, ignoring") } else { celsius := (currentTemp - 32) * 5 / 9 tempList = append(tempList, celsius) @@ -107,11 +104,11 @@ func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfi } tempAvg := sum(tempList) / float64(len(tempList)) if math.Abs(tempAvg-celsius) > 3.0 { - log.Printf("temperature %f outside mean %f, not printing", celsius, tempAvg) + logger.Info("msg", "temperature outside mean, not printing", "current", celsius, "average", tempAvg) } else { sendTemp(currentTemp, celsius, rabbit) } - fmt.Printf("temp list is %+v, avg is %f\n", tempList, tempAvg) + logger.V(2).Info("updated templist and average", "tempList", tempList, "tempAvg", tempAvg) } } } @@ -130,7 +127,16 @@ func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig } func main() { - rabbit := helper.SetupRabbit(os.Args[1], "temp", "wunder") // config file, routing key + // logging and flag initialization + flag.StringVar(&configFilename, "config", "", "the config filename") + logger = helper.NewLogger() + flag.Parse() + + rabbit, err := helper.SetupRabbit(configFilename, "temp", "wunder") // config file, routing key + if err != nil { + logger.Error(err, "failed in rabbitmq setup") + os.Exit(1) + } const weatherStation string = "KWASEATT2696" const sleepTime time.Duration = 30 * time.Second @@ -138,10 +144,8 @@ func main() { channel := make(chan int) go tempLoop(channel, weatherStation, rabbit) //time.Sleep(sleepTime) - var count int for true { - channel <- count - count = count + 1 + channel <- 0 time.Sleep(sleepTime) }