Large logging and flags revamp
authorjweigele <jweigele@local>
Mon, 12 Dec 2022 01:25:50 +0000 (17:25 -0800)
committerjweigele <jweigele@local>
Mon, 12 Dec 2022 01:25:50 +0000 (17:25 -0800)
 * Everything is using klog (interfaced with logr) to do log output now
 * This provides different visibility levels for debug and a better
   format
 * Init flags and logger creation in main (emulate other files mostly)
 * As part of the revamp, provide some structured yaml marshalling for
   future config

Dockerfile.reprocess
Dockerfile.timecolorshift
Dockerfile.wunder
go.mod
helper/helper.go
reprocess/main.go
timecolorshift/main.go
wunder/main.go

index 4c7a49d21a459abe92eb33f09abcb16550f61f95..3d325552716ce8dffd6e38db26b099b2e03c8bce 100644 (file)
@@ -18,4 +18,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags="-w -s" -o
 
 FROM alpine:edge
 COPY --from=builder /go/bin/reprocess /bin/reprocess
-ENTRYPOINT ["/bin/reprocess"]
+ENTRYPOINT ["/bin/reprocess", "-config", "/conf/config-docker.json"]
index 91ac4065b1c88f7fe7f44ed04c396dc35538c7be..af0a5fb8dea01b67c66f1f045064a9c2d680b30a 100644 (file)
@@ -19,4 +19,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags="-w -s" -o
 FROM alpine:edge
 COPY --from=builder /go/bin/timecolorshift /bin/timecolorshift
 RUN apk add tzdata
-ENTRYPOINT ["/bin/timecolorshift"]
+ENTRYPOINT ["/bin/timecolorshift", "-config", "/conf/config-docker.json"]
index 70baabecea08317e68c9116b4f93f346302d76ce..51a4de7180f502af5f334edc499c63e566fd5926 100644 (file)
@@ -18,4 +18,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags="-w -s" -o
 
 FROM alpine:edge
 COPY --from=builder /go/bin/wunder /bin/wunder
-ENTRYPOINT ["/bin/wunder"]
+ENTRYPOINT ["/bin/wunder", "-config", "/conf/config-docker.json"]
diff --git a/go.mod b/go.mod
index 4510935f2bd90b3754df025459060366031b394d..e54c84e992b048e5b7bcd8ef7266ebe5997f43ce 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -4,9 +4,12 @@ go 1.18
 
 require (
        github.com/PuerkitoBio/goquery v1.8.0
+       github.com/go-logr/logr v1.2.3
        github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
        github.com/prometheus/client_golang v1.14.0
        github.com/rabbitmq/amqp091-go v1.5.0
+       gopkg.in/yaml.v2 v2.4.0
+       k8s.io/klog/v2 v2.80.1
 )
 
 require (
index 9e966aa975692b85065a15d2d4e6e8ecb168bb6c..be6571837faea146206920d0590787ef50351111 100644 (file)
-package helper 
+// Package helper - various helper functions associated with managing rabbitmq connections easier
+package helper
 
 import (
-        "context"
-        "crypto/tls"
-        "crypto/x509"
-        "encoding/json"
-        "fmt"
-//        "io"
-        "io/ioutil"
-        "log"
-        "strings"
-//        "math"
-//        "net/http"
-//        "os"
-//        "strconv"
-        "time"
-
-//        "github.com/PuerkitoBio/goquery"
-        "github.com/leekchan/timeutil"
-        amqp "github.com/rabbitmq/amqp091-go"
+       "context"
+       "crypto/tls"
+       "crypto/x509"
+       "encoding/json"
+       "fmt"
+
+       "io/ioutil"
+       "strings"
+
+       "sync"
+       "time"
+
+       "github.com/go-logr/logr"
+       "github.com/leekchan/timeutil"
+       amqp "github.com/rabbitmq/amqp091-go"
+       "k8s.io/klog/v2"
+       "k8s.io/klog/v2/klogr"
+       //logging things
 )
 
+var logger logr.Logger
+var sendMutex sync.Mutex
+
+func init() {
+       // do this for everyone, once
+       klog.InitFlags(nil)
+       // our package logger
+       logger = klogr.New()
+}
+
+// NewLogger helps to keep the specific logging implementation details hidden from other files
+// allowing for a switch if needed (who knows if this really matters)
+func NewLogger() logr.Logger {
+       return klogr.New()
+}
 
 // RabbitSend just a bundle of a map with a routing key
 type RabbitSend struct {
-        Data map[string]interface{}
-        RoutingKey string
-        IncludeDate bool            // defaults to false, so we don't send the date
+       Data        map[string]interface{}
+       RoutingKey  string
+       IncludeDate bool // defaults to false, so we don't send the date
 }
 
 // RabbitConfig simple values to pass around
 type RabbitConfig struct {
-        Exchange            string
-        Channel             *amqp.Channel
-        Queue               *amqp.Queue
-        DefaultRoutingKey   string
-        Source              string
+       Exchange          string
+       Channel           *amqp.Channel
+       Queue             *amqp.Queue
+       DefaultRoutingKey string
+       Source            string
 }
 
-func failOnError(err error, msg string) {
-        if err != nil {
-                log.Panicf("%s: %s", msg, err)
-        }
-}
+// SetupRabbit actually connects to the rabbitmq server, and returns global config values for later use
+func SetupRabbit(configFilename string, defaultRoutingKey string, source string) (RabbitConfig, error) {
+       // grab the mutex, unlock when we're done
+       sendMutex.Lock()
+       defer sendMutex.Unlock()
+
+       configJSON, err := ioutil.ReadFile(configFilename)
+       if err != nil {
+               return RabbitConfig{}, err
+       }
+       var result map[string]interface{}
+       json.Unmarshal([]byte(configJSON), &result)
+
+       // no cert verification because we're self-signed
+       cfg := new(tls.Config)
+       cfg.RootCAs = x509.NewCertPool()
+       cfg.InsecureSkipVerify = true
 
+       conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg)
+       if err != nil {
+               return RabbitConfig{}, err
+       }
+       //defer conn.Close()
 
-func SetupRabbit(configFilename string, defaultRoutingKey string, source string) RabbitConfig {
-        configJSON, err := ioutil.ReadFile(configFilename)
-        if err != nil {
-                failOnError(err, "oh no read")
-        }
-        var result map[string]interface{}
-        json.Unmarshal([]byte(configJSON), &result)
-        fmt.Println(result)
-
-        // no cert verification because we're self-signed
-        cfg := new(tls.Config)
-        cfg.RootCAs = x509.NewCertPool()
-        cfg.InsecureSkipVerify = true
-
-        conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg)
-        failOnError(err, "Failed to connect to RabbitMQ")
-        //defer conn.Close()
-
-        rabbitChannel, err := conn.Channel()
-        failOnError(err, "Failed to open a rabbitChannel")
-        //defer rabbitChannel.Close()
-
-        err = rabbitChannel.ExchangeDeclare(
-                result["exchange"].(string), // name
-                amqp.ExchangeTopic,          // type
-                true,                        // durable
-                false,                       // autodelete
-                false,                       // internal (doesn't accept publish so...)
-                false,                       // block and wait for call to finish
-                nil,                         // optional args I don't know any
-        )
-        failOnError(err, "Failed exchange stuff")
-
-        err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages
-        failOnError(err, "Failed confirm")
-        var rabbit RabbitConfig
-        rabbit.Exchange = result["exchange"].(string)
-        rabbit.Channel = rabbitChannel
-        // constants
-        rabbit.DefaultRoutingKey = defaultRoutingKey 
-        rabbit.Source = source
-        return rabbit
+       rabbitChannel, err := conn.Channel()
+       if err != nil {
+               return RabbitConfig{}, err
+       }
+       //defer rabbitChannel.Close()
+
+       err = rabbitChannel.ExchangeDeclare(
+               result["exchange"].(string), // name
+               amqp.ExchangeTopic,          // type
+               true,                        // durable
+               false,                       // autodelete
+               false,                       // internal (doesn't accept publish so...)
+               false,                       // block and wait for call to finish
+               nil,                         // optional args I don't know any
+       )
+       if err != nil {
+               return RabbitConfig{}, err
+       }
+
+       err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages
+       if err != nil {
+               return RabbitConfig{}, err
+       }
+       var rabbit RabbitConfig
+       rabbit.Exchange = result["exchange"].(string)
+       rabbit.Channel = rabbitChannel
+       // constants
+       rabbit.DefaultRoutingKey = defaultRoutingKey
+       rabbit.Source = source
+       logger.Info("Connected to RabbitMQ",
+               "host", result["host"],
+               "user", result["user"],
+               "vhost", result["vhost"],
+               "defaultRoutingKey", defaultRoutingKey,
+               "source", source,
+       )
+       return rabbit, nil
 }
 
-// simple, bind to a (topic) with our default params
+// Bind - binds to a topic with our default params
 // will declare a queue if we don't have one
-func Bind(routingKey string, rabbit RabbitConfig) (RabbitConfig){
-    // if queue hasn't yet been initialized (no other bound topics basically)
-    if rabbit.Queue == nil {
-        queue, err := rabbit.Channel.QueueDeclare(
-                "",         // queue name (auto-gen)
-                false,      // durable
-                true,       // autoDelete (get rid of if consumer discos or server restarts)
-                true,       // exclusive
-                false,      // noWait (i.e. block until finished)
-                nil,        // args
-        )
-        // pass by reference so we can fit in our struct
-        rabbit.Queue = &queue
-        failOnError(err, "failed queue declaration") 
-        log.Printf("new queue declared %s", rabbit.Queue.Name)
-    }
-
-    // last two are block on binding and args
-    err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil)
-    failOnError(err, "failed queue binding")
-    return rabbit
+func Bind(routingKey string, rabbit *RabbitConfig) error {
+       // grab the mutex, unlock when we're done
+       sendMutex.Lock()
+       defer sendMutex.Unlock()
+
+       // if queue hasn't yet been initialized (no other bound topics basically)
+       if rabbit.Queue == nil {
+               queue, err := rabbit.Channel.QueueDeclare(
+                       "",    // queue name (auto-gen)
+                       false, // durable
+                       true,  // autoDelete (get rid of if consumer discos or server restarts)
+                       true,  // exclusive
+                       false, // noWait (i.e. block until finished)
+                       nil,   // args
+               )
+               // pass by reference so we can fit in our struct
+               rabbit.Queue = &queue
+               if err != nil {
+                       return err
+               }
+               logger.Info("new queue declared", "queue", rabbit.Queue)
+       }
+
+       // last two are block on binding and args
+       err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil)
+       if err != nil {
+               return err
+       }
+       logger.Info("Bound queue to exchange", "queue", rabbit.Queue, "routingKey", routingKey)
+       return nil
 
 }
 
-// starts consuming with all our presets and returns channel
-func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery){
-    log.Printf("consuming on queue %+v", rabbit.Queue)
-    deliveries, err := rabbit.Channel.Consume(
-            rabbit.Queue.Name,
-            "",                 // consumer (auto-gen)
-            true,               // autoAck
-            true,               // exclusive
-            false,              // noLocal - does nothing?
-            false,              // wait and block please
-            nil,                // args
-    )
-    failOnError(err, "failed channel consume")
-    return deliveries
+// StartConsuming - starts consuming with all our presets and returns channel of deliveries
+func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery, error) {
+       // grab the mutex, unlock when we're done
+       sendMutex.Lock()
+       defer sendMutex.Unlock()
+
+       deliveries, err := rabbit.Channel.Consume(
+               rabbit.Queue.Name,
+               "",    // consumer (auto-gen)
+               true,  // autoAck
+               true,  // exclusive
+               false, // noLocal - does nothing?
+               false, // immediate (wait and block please)
+               nil,   // args
+       )
+       logger.Info("consuming on queue", "queue", rabbit.Queue)
+       return deliveries, err
 }
 
-func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}){
-    //var result map[string]string //interface{} 
-    result := make(map[string]interface{})
-    // either explicitly json, or zigbee2mqtt (which we have set to only output json)
-    isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt")
-    if delivery.ContentType == "application/json" || isZigbee {
-        err := json.Unmarshal(delivery.Body, &result)
-        if isZigbee {
-            // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
-            // try to further decode this in an effort to help the data consumer
-            _, actionPresent := result["action"]
-            if actionPresent {
-                var decode map[string]interface{}
-                json.Unmarshal([]byte(result["action"].(string)), &decode)
-                result["action"] = decode
-            }
-        }
-        failOnError(err, "failed to decode json")
-    } else {
-        result["data"] = string(delivery.Body[:])
-    }
-    return result
+// DecodeDelivery - takes a presumably byte-formatted json stream and makes it a map for us
+//  will return errors if things don't actually work out that well
+func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}, error) {
+       //var result map[string]string //interface{}
+       result := make(map[string]interface{})
+       // either explicitly json, or zigbee2mqtt (which we have set to only output json)
+       isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt")
+       if delivery.ContentType == "application/json" || isZigbee {
+               err := json.Unmarshal(delivery.Body, &result)
+               if err != nil {
+                       return result, err
+               }
+               if isZigbee {
+                       // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
+                       // try to further decode this in an effort to help the data consumer
+                       _, actionPresent := result["action"]
+                       if actionPresent {
+                               var decode map[string]interface{}
+                               err := json.Unmarshal([]byte(result["action"].(string)), &decode)
+                               if err != nil {
+                                       return decode, err
+                               }
+                               result["action"] = decode
+                       }
+               }
+       } else {
+               result["data"] = string(delivery.Body[:])
+       }
+       return result, nil
 }
-        
-func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool){
-    if rabbitData.IncludeDate {
-        // add the date as a convenience
-        utcNow := time.Now().UTC()
-        dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
-        rabbitData.Data["date"] = dateString
-    }
-
-    if rabbit.Source != "" {
-        rabbitData.Data["source"] = rabbit.Source
-    }
-
-    jsonBytes, err := json.Marshal(rabbitData.Data)
-    failOnError(err, "Improperly formatted JSON, just gonna exit here")
-    var routingKey string
-    if rabbitData.RoutingKey != "" {
-        routingKey = rabbitData.RoutingKey
-    } else if rabbit.DefaultRoutingKey != "" {
-        routingKey = rabbit.DefaultRoutingKey
-    } else {
-        failOnError(fmt.Errorf("routing key error"), "No valid routing key found to send on")
-    }
-    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-    defer cancel()
-    err = rabbit.Channel.PublishWithContext(ctx,
-            rabbit.Exchange,        // exchange
-            routingKey,  // routing key
-            false,                  // mandatory
-            false,                  // immediate
-            amqp.Publishing{
-                    ContentType: "application/json",
-                    Body:        jsonBytes,
-            })
-    failOnError(err, "Failed to publish a message")
-    if verboseSend {
-        log.Printf("Sent %s routing: %s\n", jsonBytes, routingKey)
-    }
+
+// SendData takes data using parameters from our own structs RabbitSend and RabbitConfig, and puts it out on the given routingkey
+func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool) error {
+       // grab the mutex, unlock when we're done
+       sendMutex.Lock()
+       defer sendMutex.Unlock()
+
+       if rabbitData.IncludeDate {
+               // add the date as a convenience
+               utcNow := time.Now().UTC()
+               dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
+               rabbitData.Data["date"] = dateString
+       }
+
+       if rabbit.Source != "" {
+               rabbitData.Data["source"] = rabbit.Source
+       }
+
+       jsonBytes, err := json.Marshal(rabbitData.Data)
+       if err != nil {
+               return err
+       }
+       var routingKey string
+       if rabbitData.RoutingKey != "" {
+               routingKey = rabbitData.RoutingKey
+       } else if rabbit.DefaultRoutingKey != "" {
+               routingKey = rabbit.DefaultRoutingKey
+       } else {
+               if err != nil {
+                       return fmt.Errorf("No valid routing key to send on")
+               }
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       err = rabbit.Channel.PublishWithContext(ctx,
+               rabbit.Exchange, // exchange
+               routingKey,      // routing key
+               false,           // mandatory
+               false,           // immediate
+               amqp.Publishing{
+                       ContentType: "application/json",
+                       Body:        jsonBytes,
+               })
+       if err != nil {
+               return err
+       }
+
+       if verboseSend {
+               logger.Info("Sent data to rabbitmq", "data", rabbitData.Data, "routingKey", routingKey)
+       }
+       return nil
 
 }
index 3a3dac405e3a7c63337b84530e42b699a683f942..12745d1b24919596e3534d65072092e0d66925ca 100644 (file)
@@ -2,16 +2,21 @@
 package main
 
 import (
+       "flag"
        "fmt"
-       "log"
        "os"
+       "reflect"
        "time"
 
        // prometheus imports
+       "net/http"
+
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
        "github.com/prometheus/client_golang/prometheus/promhttp"
-       "net/http"
+
+       // logging interface
+       "github.com/go-logr/logr"
 
        "unpiege.net/rabbit_go.git/helper"
 )
@@ -24,82 +29,130 @@ var (
                },
                []string{"scale", "location"},
        )
-       //tempExpire = map[prometheus.Labels]time.Time
-       //tempExpire = map[string]string
+
+       powerGauge = promauto.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "ePDUOutletStatusActivePower",
+                       Help: "Used as an analog to power snmp fetching",
+               },
+               []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"},
+       )
 
        tempExpire = make(map[[2]string]time.Time)
 
-       staleTime = time.Duration(15 * time.Second)
+       staleTime      = time.Duration(15 * time.Second)
+       logger         logr.Logger
+       configFilename string
 )
 
-type Device interface {
+type device interface {
        getRoutingKey() string
 }
 
-type ZigDevice struct {
-       routingKey string
+type zigdevice struct {
+       friendlyName string
 }
 
-type DIYDevice struct {
-       *ZigDevice
+type diydevice struct {
+       *zigdevice
 }
 
-type PowerDevice struct {
-       *ZigDevice
-       powerName   string
-       queryNeeded bool
+type powertime struct {
+       timeLogged time.Time
+       powerValue float64
 }
 
-func (dev *DIYDevice) getRoutingKey() string {
-       return fmt.Sprintf("zigbee2mqtt.%s", dev.routingKey)
+type powerdevice struct {
+       *zigdevice
+       powerName           string
+       queryNeeded         bool
+       lastUpdate          time.Time
+       lastUpdateRequested time.Time
+       powerList           []powertime
 }
 
-func NewDevice(routingKey string) *ZigDevice {
-       retval := ZigDevice{routingKey: routingKey}
-       //retval.routingKey = routingKey
-       return &retval
+func (dev *powerdevice) filter() {
+       // if this gets excessive we can pass it down the chain
+       now := time.Now().UTC()
+       // default time to keep power values around
+       const filterTime = 60
+
+       // filter to ignore old values
+       var sliceStart int
+       for index, pt := range dev.powerList {
+               // break when we find our first valid value (since they should be sorted)
+               sliceStart = index
+               if pt.timeLogged.After(now.Add(-time.Second * filterTime)) {
+                       break
+               }
+       }
+
+       // reslice, excluding all previous (cutoff) values
+       dev.powerList = dev.powerList[sliceStart:]
+
 }
 
-func NewDIYDevice(routingKey string) *DIYDevice {
-       return &DIYDevice{
-               ZigDevice: NewDevice(routingKey),
+func (dev *powerdevice) avgPower() float64 {
+       totalPower := 0.0
+       // sum up the power list
+       for _, pt := range dev.powerList {
+               totalPower = totalPower + pt.powerValue
        }
+       // average and return
+       return totalPower / float64(len(dev.powerList))
+}
 
+func (dev *powerdevice) add(newTime time.Time, newVal float64) {
+       dev.powerList = append(dev.powerList, powertime{timeLogged: newTime, powerValue: newVal})
+       dev.filter()
 }
 
-//tempExpire := make(map[[2]string]time.Time)
+func (dev *zigdevice) getRoutingKey() string {
+       return fmt.Sprintf("zigbee2mqtt.%s", dev.friendlyName)
+}
 
-func failOnError(err error, msg string) {
-       if err != nil {
-               log.Panicf("%s: %s", msg, err)
+func (dev *powerdevice) getReqKey() string {
+       return fmt.Sprintf("zigbee2mqtt.%s.get", dev.friendlyName)
+}
+
+func newdevice(friendlyName string) *zigdevice {
+       retval := zigdevice{friendlyName: friendlyName}
+       return &retval
+}
+
+func newdiydevice(friendlyName string) *diydevice {
+       return &diydevice{
+               zigdevice: newdevice(friendlyName),
+       }
+
+}
+
+func newpowerdevice(friendlyName, powerName string, queryNeeded bool) *powerdevice {
+       return &powerdevice{
+               zigdevice:   newdevice(friendlyName),
+               powerName:   powerName,
+               queryNeeded: queryNeeded,
        }
 }
 
 func handleTemp(obj map[string]interface{}) {
-       log.Printf("temp! %v", obj)
+       logger.V(1).Info("Temperature received", "obj", obj)
        celsius := obj["celsius"].(float64)
        location := obj["location"].(string)
        now := time.Now().UTC()
-       // TODO: last update portion
-
        // do the label update here
-       //log.Printf("celsius: %f location %s", celsius, location)
        tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius)
+       // here, we might have any number of temp values some not covered by our own objects
        tempExpire[[2]string{"celsius", location}] = now
 
 }
 
 func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
-       log.Printf("diy! %v", obj)
+       logger.V(1).Info("DIY data received", "obj", obj)
        _, isAction := obj["action"]
        if isAction {
-               // this is yet more JSON, so decode it
+               // this should also already be decoded for us
                data := obj["action"].(map[string]interface{})
-               /*        err := json.Unmarshal([]byte(obj["action"].(string)), &data)
-                         if err != nil {
-                             log.Printf("Unable to decode json action from diy, ignoring")
-                             return
-                         }*/
                var senseType string
 
                _, isTemp := data["celsius"]
@@ -110,7 +163,7 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
                        if hasType {
                                senseType = data["type"].(string)
                        } else {
-                               log.Printf("Did not find any sense type, just returning")
+                               logger.Info("Did not find any sense type, just returning")
                                return
                        }
                }
@@ -122,7 +175,7 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
                        dataMap["fahrenheit"] = data["celsius"].(float64)*9/5 + 32
                        dataMap["celsius"] = data["celsius"].(float64)
                        dataMap["location"] = data["location"].(string)
-                       log.Printf("would send %v", dataMap)
+                       logger.V(2).Info("Sending reprocessed temperature")
                        // hardcoded temp routingKey for this type of measurement
                        sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true}
                        sendChannel <- sendThis
@@ -130,58 +183,131 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
                        dataMap["motion_detected"] = data["state"].(bool)
                        // copied from above, but it might change later so shrug
                        dataMap["location"] = data["location"].(string)
-                       log.Printf("sending reprocessed motion: %v", dataMap)
+                       logger.V(2).Info("Sending reprocessed motion")
                        sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true}
                        sendChannel <- sendThis
                } else {
-                       log.Printf("error! sense type not detected, ignoring")
+                       logger.Info("Sense type not detected, ignoring")
                }
 
        } else {
-               log.Printf("not action, ignoring")
+               logger.V(2).Info("not action, ignoring")
        }
 
 }
 
-func readLoop(channel chan helper.RabbitSend, devices []Device, rabbit helper.RabbitConfig) {
+// are we getting new data on behalf of a request (if spammy), or is the reporting configured so it doesn't spam?
+// in other words, do we trust this data is evenly spaced?
+func (dev *powerdevice) shouldUpdate(now time.Time) bool {
+       const updateStalenessSeconds = 2
+       // well-behaved OR in the last 2 seconds since we requested
+       if now.Add(updateStalenessSeconds * -time.Second).Before(dev.lastUpdateRequested) {
+               return true
+       }
+       return false
+}
+
+func handlePower(obj map[string]interface{}, sendChannel chan helper.RabbitSend, triggerDevice *powerdevice) {
+       now := time.Now().UTC()
+       logger.V(1).Info("Power data received", "obj", obj)
+       //name := triggerDevice.powerName
+       // here, we're directly tracking power devices so we can bind updated directly to the object
+       triggerDevice.lastUpdate = now
+       if triggerDevice.shouldUpdate(now) {
+               triggerDevice.add(now, obj["power"].(float64))
+               powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": triggerDevice.friendlyName, "ePDUOutletStatusOutletName": triggerDevice.powerName}).Set(triggerDevice.avgPower())
+               // don't worry about expiration, since we have all the values in our objects
+       }
+
+}
+
+func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.RabbitConfig) {
        for _, device := range devices {
-               rabbit = helper.Bind(device.getRoutingKey(), rabbit)
-               log.Printf("bound to %s", device.getRoutingKey())
+               err := helper.Bind(device.getRoutingKey(), &rabbit)
+               // TODO: find a better way to bubble this up
+               if err != nil {
+                       logger.Error(err, "unable to bind successfully to exchange", "routingKey", device.getRoutingKey())
+                       os.Exit(1)
+               }
+       }
+       /// this one is hardcoded, for the generic temperature output
+       err := helper.Bind("temp", &rabbit)
+       if err != nil {
+               logger.Error(err, "unable to bind successfully to exchange", "routingKey", "temp")
+               os.Exit(1)
        }
-       rabbit = helper.Bind("temp", rabbit)
-       log.Printf("bound to temp")
-       deliveries := helper.StartConsuming(rabbit)
+
+       deliveries, err := helper.StartConsuming(rabbit)
+       if err != nil {
+               logger.Error(err, "unable to start consuming data from rabbit")
+               os.Exit(1)
+       }
+
        for delivery := range deliveries {
-               //log.Printf("got a delivery %+v", delivery)
-               item := helper.DecodeDelivery(delivery)
-               //log.Printf("%+v", item)
+               item, err := helper.DecodeDelivery(delivery)
+               // it's just one delivery so we're going to yell and then continue along unabated
+               if err != nil {
+                       logger.Error(err, "unable to decode delivery", "delivery", delivery)
+                       continue
+               }
+
+               // only if you really want to see it all
+               logger.V(2).Info("Received a delivery", "delivery", delivery)
                if delivery.RoutingKey == "temp" {
                        handleTemp(item)
                }
                for _, device := range devices {
                        if device.getRoutingKey() == delivery.RoutingKey {
-                               log.Printf("Found device for routing key %s, %v", delivery.RoutingKey, device)
+                               logger.V(3).Info("Found device for routing key", "routingKey", delivery.RoutingKey, "device", reflect.TypeOf(device).String())
                                switch t := device.(type) {
-                               case *DIYDevice:
-                                       log.Printf("DIY device, sending to handle")
+                               case *diydevice:
+                                       logger.V(2).Info("DIY device, sending to handle", "device", device)
                                        handleDIY(item, channel)
+                               case *powerdevice:
+                                       logger.V(2).Info("Power device, sending to handle", "device", device)
+                                       // it's explicitly a powerdevice, so cast it before sending
+                                       handlePower(item, channel, device.(*powerdevice))
                                default:
-                                       log.Printf("other device, type was %s", t)
+                                       logger.Info("Found a device we can't classify", "device", device, "type", t)
                                }
                        }
                }
                //time.Sleep(time.Duration(0.5*float64(time.Second)))
        }
 }
-func timeLoop(channel chan helper.RabbitSend) {
+func powerLoop(sendChannel chan helper.RabbitSend, devices []device) {
+       const powerRequestSeconds = 15
+       // first, build up a list of the power devices since those are the ones we care about
+       var pds []*powerdevice
+       for _, device := range devices {
+               switch device.(type) {
+               case *powerdevice:
+                       pds = append(pds, device.(*powerdevice))
+               }
+       }
+
+       logger.V(3).Info("Built up PD list", "pds", pds)
+
+       var reqMap = make(map[string]interface{}, 0)
+       reqMap["power"] = ""
        for {
-               time.Sleep(time.Duration(0.5 * float64(time.Second)))
+               // send requests for power on all devices
+               now := time.Now().UTC()
+               for _, pd := range pds {
+                       logger.V(1).Info("Requesting power for device", "device", pd)
+                       sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey()}
+                       sendChannel <- sendThis
+
+                       pd.lastUpdateRequested = now
+               }
+
+               // then sleep for the next cycle
+               time.Sleep(time.Duration(powerRequestSeconds * float64(time.Second)))
        }
 }
 
 func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) {
        for sendItem := range channel {
-               log.Printf("would try to send here %v", sendItem)
                helper.SendData(sendItem, rabbit, true)
                //helper.SendData(sendData, rabbit, false)
        }
@@ -200,7 +326,7 @@ func expireStaleMetrics() {
        for labels, lastUpdate := range tempExpire {
                curStale := now.Sub(lastUpdate)
                if curStale > staleTime {
-                       log.Printf("stale time %v, expiring labels %v\tlastUpdate: %v", curStale, labels, lastUpdate)
+                       logger.Info("stale metric update time, expiring labels", "curStale", curStale, "labels", labels, "lastUpdate", lastUpdate)
                        tempExpireValues = append(tempExpireValues, labels)
                }
        }
@@ -211,36 +337,52 @@ func expireStaleMetrics() {
                if complete {
                        delete(tempExpire, labels)
                } else {
-                       log.Panicf("oh no wasn't able to delete metrics")
+                       logger.Info("unable to delete metrics, exiting")
+                       os.Exit(1)
                }
        }
 
 }
 
-func setupLogging() {
-       log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
-}
-
 func main() {
-       setupLogging()
+       // logging and flag initialization
+       flag.StringVar(&configFilename, "config", "", "the config filename")
+       logger = helper.NewLogger()
+       flag.Parse()
+
+       if configFilename == "" {
+               logger.Error(fmt.Errorf("need to define config in order to load!"), "invalid config")
+               os.Exit(1)
+       }
 
-       rabbit := helper.SetupRabbit(os.Args[1], "", "reprocess") // config file, default routing key, source
+       rabbit, err := helper.SetupRabbit(configFilename, "", "reprocess") // config file, default routing key, source
+       if err != nil {
+               logger.Error(err, "unable to setup rabbit")
+               os.Exit(1)
+       }
+
+       defer rabbit.Channel.Close()
        const sleepTime time.Duration = 5 * time.Second
 
-       devices := make([]Device, 0)
-       devices = append(devices, NewDIYDevice("office_pico"))
-       devices = append(devices, NewDIYDevice("stairway_pico"))
-       devices = append(devices, NewDIYDevice("stairway_pico"))
-       devices = append(devices, NewDIYDevice("downstairs_stairway_pico"))
-       devices = append(devices, NewDIYDevice("upstairs_pico"))
-       devices = append(devices, NewDIYDevice("data_closet_pico"))
+       devices := make([]device, 0)
+
+       // DIY devices
+       devices = append(devices, newdiydevice("office_pico"))
+       devices = append(devices, newdiydevice("stairway_pico"))
+       devices = append(devices, newdiydevice("stairway_pico"))
+       devices = append(devices, newdiydevice("downstairs_stairway_pico"))
+       devices = append(devices, newdiydevice("upstairs_pico"))
+       devices = append(devices, newdiydevice("data_closet_pico"))
+
+       // power devices
+       devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true))
 
        //currentTemp, err := fetchTemp(weatherStation)
        channel := make(chan helper.RabbitSend)
 
        go promSetup()
        go readLoop(channel, devices, rabbit)
-       go timeLoop(channel)
+       go powerLoop(channel, devices)
        go sendLoop(channel, rabbit)
 
        for true {
index 355a02b9386b56b4ffe7c4e932f7c66dba2784c0..af0f04d8c8796a41a46e968e7e795b8ba76c6313 100644 (file)
@@ -2,40 +2,47 @@
 package main
 
 import (
-       //"encoding/json"
-       //"fmt"
-       //"io"
-       "log"
+       "flag"
+       "io/ioutil"
        "math"
-       //"net/http"
+
        "os"
-       //"strconv"
        "time"
 
        //"github.com/leekchan/timeutil"
+       "github.com/go-logr/logr"
+       "gopkg.in/yaml.v2"
        "unpiege.net/rabbit_go.git/helper"
 )
 
+var logger logr.Logger
+var configFilename, timeConfigFilename string
+
+type timeConfigYaml struct {
+       PeakTime   TimeFloat   `yaml:"PeakTime"`
+       TimeColors []TimeColor `yaml:"TimeColors"`
+}
+
 // TimeFloat allows user to specify times in a more meaningful way
 type TimeFloat struct {
-       Hour   int
-       Minute int
-       Second int
+       Hour   int `yaml:"Hour"`
+       Minute int `yaml:"Minute"`
+       Second int `yaml:"Second"`
 }
 
 // TimeColor is one instance of RGB values changing over time
 type TimeColor struct {
-       PeakTime TimeFloat
-       Red      float64
-       Green    float64
-       Blue     float64
-       Extent   TimeFloat
+       PeakTime TimeFloat `yaml:"PeakTime"`
+       Red      float64   `yaml:"Red"`
+       Green    float64   `yaml:"Green"`
+       Blue     float64   `yaml:"Blue"`
+       Extent   TimeFloat `yaml:"Extent"`
 }
 
 type RGB struct {
-       Red   float64
-       Green float64
-       Blue  float64
+       Red   float64 `yaml:"Red"`
+       Green float64 `yaml:"Green"`
+       Blue  float64 `yaml:"Blue"`
 }
 
 // make sure no value is greater than 1
@@ -99,27 +106,18 @@ func (tc *TimeColor) RGBValues(evaluationTime time.Time) RGB {
                        minDistance = val
                }
        }
-       //log.Printf("minDistance: %f", minDistance)
 
        if minDistance > tc.Extent.Float()/2 {
-               //log.Printf("turned off")
                // multiplier is already initialized zero so do nothing
        } else {
                extentMultiplier := 1 / tc.Extent.Float()
                multiplier = math.Cos(math.Pi * extentMultiplier * minDistance)
        }
-       //log.Printf("multiplier is %f", multiplier)
        retval.Mult(multiplier)
 
        return retval
 }
 
-func failOnError(err error, msg string) {
-       if err != nil {
-               log.Panicf("%s: %s", msg, err)
-       }
-}
-
 func sum(arr []float64) float64 {
        sum := 0.0
        for _, value := range arr {
@@ -131,18 +129,15 @@ func sum(arr []float64) float64 {
 func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) {
        loc, _ := time.LoadLocation("America/Los_Angeles")
        for {
-               //log.Printf("okay just looping here")
                curTime := time.Now().UTC().In(loc)
-               //log.Printf("here's the current time %s", curTime)
                totalRGB := RGB{}
                for _, tc := range timeColors {
-                       //log.Printf("Evaluating RGB for %+v", tc)
                        rgb := tc.RGBValues(curTime)
-                       //log.Printf("%+v", rgb)
+                       logger.V(3).Info("RGB values for tc", "tc", tc, "rgb", rgb)
                        totalRGB = totalRGB.Add(rgb)
                }
                totalRGB.Mult(0.877)
-               //log.Printf("TotalRGB: %+v", totalRGB)
+               logger.V(3).Info("total RGB values", "rgb", totalRGB)
 
                sendObj := make(map[string]interface{})
                sendObj["red"] = totalRGB.Red
@@ -158,22 +153,66 @@ func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) {
 
 func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) {
        for sendData := range channel {
-               helper.SendData(sendData, rabbit, false)
+               err := helper.SendData(sendData, rabbit, false)
+               // TODO: find a cleaner way to bubble this up
+               if err != nil {
+                       logger.Error(err, "error sending data in send loop, exiting")
+                       os.Exit(1)
+               }
        }
 }
 
+func (c *timeConfigYaml) Parse(data []byte) error {
+       return yaml.Unmarshal(data, c)
+}
+
 func main() {
-       rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py", "timecolorshift") // config file, routing key
+       // logging and flag initialization
+       flag.StringVar(&configFilename, "config", "", "the config filename")
+       flag.StringVar(&timeConfigFilename, "timeConfig", "", "yaml config filename (for colors)")
+
+       logger = helper.NewLogger()
+       flag.Parse()
+
+       rabbit, err := helper.SetupRabbit(configFilename, "timecolorshift.py", "timecolorshift") // config file, default routing key, source
+       if err != nil {
+               logger.Error(err, "failed to connect to rabbitmq")
+               os.Exit(1)
+       }
        const sleepTime time.Duration = 30 * time.Second
 
        //currentTemp, err := fetchTemp(weatherStation)
        channel := make(chan helper.RabbitSend)
-       all_tcs := make([]TimeColor, 0)
-       all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour: 12}))
-       all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour: 8}))
-       all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 20}, 0.25, 0.25, 0, TimeFloat{Hour: 12}))
 
-       go timeLoop(channel, all_tcs)
+       var tcy timeConfigYaml
+
+       if timeConfigFilename != "" {
+               logger.Info("Parsing provided timeConfigFilename", "timeConfigFilename", timeConfigFilename)
+               data, err := ioutil.ReadFile(timeConfigFilename)
+               if err != nil {
+                       logger.Error(err, "error reading time config yaml file")
+                       os.Exit(1)
+               }
+               //err = yaml.Unmarshal(data, &tcy)
+               err = tcy.Parse(data)
+               if err != nil {
+                       logger.Error(err, "error parsing time config yaml file")
+                       os.Exit(1)
+               }
+       } else {
+               logger.Info("Adding default (compiled) time colors")
+               all_tcs := make([]TimeColor, 0)
+               all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour: 12}))
+               all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour: 8}))
+               all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 20}, 0.25, 0.25, 0, TimeFloat{Hour: 12}))
+               tcy.TimeColors = all_tcs
+       }
+       // just display everything we configured before we boot further
+       for _, tc := range tcy.TimeColors {
+               logger.Info("displaying processed timeColors", "TimeColor", tc)
+       }
+
+       go timeLoop(channel, tcy.TimeColors)
        go sendLoop(channel, rabbit)
        for true {
                time.Sleep(sleepTime)
index a302c0d0b4cf7566b1237d6dc10c025362fb3581..53150c41ed8a9bada1f637e0be40c3a126624965 100644 (file)
@@ -3,9 +3,9 @@ package main
 
 import (
        //"encoding/json"
+       "flag"
        "fmt"
        "io"
-       "log"
        "math"
        "net/http"
        "os"
@@ -13,15 +13,14 @@ import (
        "time"
 
        "github.com/PuerkitoBio/goquery"
+       "github.com/go-logr/logr"
+
        //"github.com/leekchan/timeutil"
        "unpiege.net/rabbit_go.git/helper"
 )
 
-func failOnError(err error, msg string) {
-       if err != nil {
-               log.Panicf("%s: %s", msg, err)
-       }
-}
+var logger logr.Logger
+var configFilename string
 
 func sum(arr []float64) float64 {
        sum := 0.0
@@ -51,7 +50,7 @@ func extractTemp(htmlReader io.Reader) (float64, error) {
                                if childClass == "wu-value wu-value-to" {
                                        currentTemp, err = strconv.ParseFloat(t.Text(), 10)
                                        if err != nil {
-                                               log.Println(err)
+                                               logger.Error(err, "error finding current temp")
                                        }
                                        foundTemp = true
                                }
@@ -59,7 +58,6 @@ func extractTemp(htmlReader io.Reader) (float64, error) {
                }
        })
        if !foundTemp {
-               fmt.Println("oh no!")
                return currentTemp, fmt.Errorf("An error occurred while parsing html data")
        }
        return currentTemp, nil
@@ -72,7 +70,7 @@ func fetchTemp(weatherStation string) (float64, error) {
                Timeout: time.Second * 10,
        }
        fullURL := fmt.Sprintf("%s/%s", baseURL, weatherStation)
-       log.Printf("Fetching full url %s\n", fullURL)
+       logger.Info("Fetching full url", "url", fullURL)
        req, err := http.NewRequest("GET", fullURL, nil)
        if err != nil {
                return 0, err
@@ -90,15 +88,14 @@ func fetchTemp(weatherStation string) (float64, error) {
 
 func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error {
        tempList := make([]float64, 0)
-       for count := range channel {
+       for = range channel {
                const tempLength int = 60
-               log.Printf("doing another fetch, count %d", count)
                currentTemp, err := fetchTemp(weatherStation)
                if err != nil {
-                       log.Printf("Wasn't able to get temp %s", err)
+                       logger.Error(err, "msg", "Wasn't able to get temp, ignoring")
                } else {
                        if currentTemp == 0 {
-                               fmt.Printf("Got 0F for temp, ignoring")
+                               logger.Info("Got 0F for temp, ignoring")
                        } else {
                                celsius := (currentTemp - 32) * 5 / 9
                                tempList = append(tempList, celsius)
@@ -107,11 +104,11 @@ func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfi
                                }
                                tempAvg := sum(tempList) / float64(len(tempList))
                                if math.Abs(tempAvg-celsius) > 3.0 {
-                                       log.Printf("temperature %f outside mean %f, not printing", celsius, tempAvg)
+                                       logger.Info("msg", "temperature outside mean, not printing", "current", celsius, "average", tempAvg)
                                } else {
                                        sendTemp(currentTemp, celsius, rabbit)
                                }
-                               fmt.Printf("temp list is %+v, avg is %f\n", tempList, tempAvg)
+                               logger.V(2).Info("updated templist and average", "tempList", tempList, "tempAvg", tempAvg)
                        }
                }
        }
@@ -130,7 +127,16 @@ func sendTemp(tempValueF float64, tempValueC float64, rabbit helper.RabbitConfig
 }
 
 func main() {
-       rabbit := helper.SetupRabbit(os.Args[1], "temp", "wunder") // config file, routing key
+       // logging and flag initialization
+       flag.StringVar(&configFilename, "config", "", "the config filename")
+       logger = helper.NewLogger()
+       flag.Parse()
+
+       rabbit, err := helper.SetupRabbit(configFilename, "temp", "wunder") // config file, routing key
+       if err != nil {
+               logger.Error(err, "failed in rabbitmq setup")
+               os.Exit(1)
+       }
        const weatherStation string = "KWASEATT2696"
        const sleepTime time.Duration = 30 * time.Second
 
@@ -138,10 +144,8 @@ func main() {
        channel := make(chan int)
        go tempLoop(channel, weatherStation, rabbit)
        //time.Sleep(sleepTime)
-       var count int
        for true {
-               channel <- count
-               count = count + 1
+               channel <- 0
                time.Sleep(sleepTime)
        }