FROM alpine:edge
COPY --from=builder /go/bin/reprocess /bin/reprocess
-ENTRYPOINT ["/bin/reprocess"]
+ENTRYPOINT ["/bin/reprocess", "-config", "/conf/config-docker.json"]
FROM alpine:edge
COPY --from=builder /go/bin/timecolorshift /bin/timecolorshift
RUN apk add tzdata
-ENTRYPOINT ["/bin/timecolorshift"]
+ENTRYPOINT ["/bin/timecolorshift", "-config", "/conf/config-docker.json"]
FROM alpine:edge
COPY --from=builder /go/bin/wunder /bin/wunder
-ENTRYPOINT ["/bin/wunder"]
+ENTRYPOINT ["/bin/wunder", "-config", "/conf/config-docker.json"]
require (
github.com/PuerkitoBio/goquery v1.8.0
+ github.com/go-logr/logr v1.2.3
github.com/leekchan/timeutil v0.0.0-20150802142658-28917288c48d
github.com/prometheus/client_golang v1.14.0
github.com/rabbitmq/amqp091-go v1.5.0
+ gopkg.in/yaml.v2 v2.4.0
+ k8s.io/klog/v2 v2.80.1
)
require (
-package helper
+// Package helper - various helper functions associated with managing rabbitmq connections easier
+package helper
import (
- "context"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "fmt"
-// "io"
- "io/ioutil"
- "log"
- "strings"
-// "math"
-// "net/http"
-// "os"
-// "strconv"
- "time"
-
-// "github.com/PuerkitoBio/goquery"
- "github.com/leekchan/timeutil"
- amqp "github.com/rabbitmq/amqp091-go"
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/json"
+ "fmt"
+
+ "io/ioutil"
+ "strings"
+
+ "sync"
+ "time"
+
+ "github.com/go-logr/logr"
+ "github.com/leekchan/timeutil"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "k8s.io/klog/v2"
+ "k8s.io/klog/v2/klogr"
+ //logging things
)
+var logger logr.Logger
+var sendMutex sync.Mutex
+
+func init() {
+ // do this for everyone, once
+ klog.InitFlags(nil)
+ // our package logger
+ logger = klogr.New()
+}
+
+// NewLogger helps to keep the specific logging implementation details hidden from other files
+// allowing for a switch if needed (who knows if this really matters)
+func NewLogger() logr.Logger {
+ return klogr.New()
+}
// RabbitSend just a bundle of a map with a routing key
type RabbitSend struct {
- Data map[string]interface{}
- RoutingKey string
- IncludeDate bool // defaults to false, so we don't send the date
+ Data map[string]interface{}
+ RoutingKey string
+ IncludeDate bool // defaults to false, so we don't send the date
}
// RabbitConfig simple values to pass around
type RabbitConfig struct {
- Exchange string
- Channel *amqp.Channel
- Queue *amqp.Queue
- DefaultRoutingKey string
- Source string
+ Exchange string
+ Channel *amqp.Channel
+ Queue *amqp.Queue
+ DefaultRoutingKey string
+ Source string
}
-func failOnError(err error, msg string) {
- if err != nil {
- log.Panicf("%s: %s", msg, err)
- }
-}
+// SetupRabbit actually connects to the rabbitmq server, and returns global config values for later use
+func SetupRabbit(configFilename string, defaultRoutingKey string, source string) (RabbitConfig, error) {
+ // grab the mutex, unlock when we're done
+ sendMutex.Lock()
+ defer sendMutex.Unlock()
+
+ configJSON, err := ioutil.ReadFile(configFilename)
+ if err != nil {
+ return RabbitConfig{}, err
+ }
+ var result map[string]interface{}
+ json.Unmarshal([]byte(configJSON), &result)
+
+ // no cert verification because we're self-signed
+ cfg := new(tls.Config)
+ cfg.RootCAs = x509.NewCertPool()
+ cfg.InsecureSkipVerify = true
+ conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg)
+ if err != nil {
+ return RabbitConfig{}, err
+ }
+ //defer conn.Close()
-func SetupRabbit(configFilename string, defaultRoutingKey string, source string) RabbitConfig {
- configJSON, err := ioutil.ReadFile(configFilename)
- if err != nil {
- failOnError(err, "oh no read")
- }
- var result map[string]interface{}
- json.Unmarshal([]byte(configJSON), &result)
- fmt.Println(result)
-
- // no cert verification because we're self-signed
- cfg := new(tls.Config)
- cfg.RootCAs = x509.NewCertPool()
- cfg.InsecureSkipVerify = true
-
- conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:5671/%s", result["user"], result["password"], result["host"], result["vhost"]), cfg)
- failOnError(err, "Failed to connect to RabbitMQ")
- //defer conn.Close()
-
- rabbitChannel, err := conn.Channel()
- failOnError(err, "Failed to open a rabbitChannel")
- //defer rabbitChannel.Close()
-
- err = rabbitChannel.ExchangeDeclare(
- result["exchange"].(string), // name
- amqp.ExchangeTopic, // type
- true, // durable
- false, // autodelete
- false, // internal (doesn't accept publish so...)
- false, // block and wait for call to finish
- nil, // optional args I don't know any
- )
- failOnError(err, "Failed exchange stuff")
-
- err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages
- failOnError(err, "Failed confirm")
- var rabbit RabbitConfig
- rabbit.Exchange = result["exchange"].(string)
- rabbit.Channel = rabbitChannel
- // constants
- rabbit.DefaultRoutingKey = defaultRoutingKey
- rabbit.Source = source
- return rabbit
+ rabbitChannel, err := conn.Channel()
+ if err != nil {
+ return RabbitConfig{}, err
+ }
+ //defer rabbitChannel.Close()
+
+ err = rabbitChannel.ExchangeDeclare(
+ result["exchange"].(string), // name
+ amqp.ExchangeTopic, // type
+ true, // durable
+ false, // autodelete
+ false, // internal (doesn't accept publish so...)
+ false, // block and wait for call to finish
+ nil, // optional args I don't know any
+ )
+ if err != nil {
+ return RabbitConfig{}, err
+ }
+
+ err = rabbitChannel.Confirm(false) // wait for this to finish too, confirm delivery of messages
+ if err != nil {
+ return RabbitConfig{}, err
+ }
+ var rabbit RabbitConfig
+ rabbit.Exchange = result["exchange"].(string)
+ rabbit.Channel = rabbitChannel
+ // constants
+ rabbit.DefaultRoutingKey = defaultRoutingKey
+ rabbit.Source = source
+ logger.Info("Connected to RabbitMQ",
+ "host", result["host"],
+ "user", result["user"],
+ "vhost", result["vhost"],
+ "defaultRoutingKey", defaultRoutingKey,
+ "source", source,
+ )
+ return rabbit, nil
}
-// simple, bind to a (topic) with our default params
+// Bind - binds to a topic with our default params
// will declare a queue if we don't have one
-func Bind(routingKey string, rabbit RabbitConfig) (RabbitConfig){
- // if queue hasn't yet been initialized (no other bound topics basically)
- if rabbit.Queue == nil {
- queue, err := rabbit.Channel.QueueDeclare(
- "", // queue name (auto-gen)
- false, // durable
- true, // autoDelete (get rid of if consumer discos or server restarts)
- true, // exclusive
- false, // noWait (i.e. block until finished)
- nil, // args
- )
- // pass by reference so we can fit in our struct
- rabbit.Queue = &queue
- failOnError(err, "failed queue declaration")
- log.Printf("new queue declared %s", rabbit.Queue.Name)
- }
-
- // last two are block on binding and args
- err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil)
- failOnError(err, "failed queue binding")
- return rabbit
+func Bind(routingKey string, rabbit *RabbitConfig) error {
+ // grab the mutex, unlock when we're done
+ sendMutex.Lock()
+ defer sendMutex.Unlock()
+
+ // if queue hasn't yet been initialized (no other bound topics basically)
+ if rabbit.Queue == nil {
+ queue, err := rabbit.Channel.QueueDeclare(
+ "", // queue name (auto-gen)
+ false, // durable
+ true, // autoDelete (get rid of if consumer discos or server restarts)
+ true, // exclusive
+ false, // noWait (i.e. block until finished)
+ nil, // args
+ )
+ // pass by reference so we can fit in our struct
+ rabbit.Queue = &queue
+ if err != nil {
+ return err
+ }
+ logger.Info("new queue declared", "queue", rabbit.Queue)
+ }
+
+ // last two are block on binding and args
+ err := rabbit.Channel.QueueBind(rabbit.Queue.Name, routingKey, rabbit.Exchange, false, nil)
+ if err != nil {
+ return err
+ }
+ logger.Info("Bound queue to exchange", "queue", rabbit.Queue, "routingKey", routingKey)
+ return nil
}
-// starts consuming with all our presets and returns channel
-func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery){
- log.Printf("consuming on queue %+v", rabbit.Queue)
- deliveries, err := rabbit.Channel.Consume(
- rabbit.Queue.Name,
- "", // consumer (auto-gen)
- true, // autoAck
- true, // exclusive
- false, // noLocal - does nothing?
- false, // wait and block please
- nil, // args
- )
- failOnError(err, "failed channel consume")
- return deliveries
+// StartConsuming - starts consuming with all our presets and returns channel of deliveries
+func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery, error) {
+ // grab the mutex, unlock when we're done
+ sendMutex.Lock()
+ defer sendMutex.Unlock()
+
+ deliveries, err := rabbit.Channel.Consume(
+ rabbit.Queue.Name,
+ "", // consumer (auto-gen)
+ true, // autoAck
+ true, // exclusive
+ false, // noLocal - does nothing?
+ false, // immediate (wait and block please)
+ nil, // args
+ )
+ logger.Info("consuming on queue", "queue", rabbit.Queue)
+ return deliveries, err
}
-func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}){
- //var result map[string]string //interface{}
- result := make(map[string]interface{})
- // either explicitly json, or zigbee2mqtt (which we have set to only output json)
- isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt")
- if delivery.ContentType == "application/json" || isZigbee {
- err := json.Unmarshal(delivery.Body, &result)
- if isZigbee {
- // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
- // try to further decode this in an effort to help the data consumer
- _, actionPresent := result["action"]
- if actionPresent {
- var decode map[string]interface{}
- json.Unmarshal([]byte(result["action"].(string)), &decode)
- result["action"] = decode
- }
- }
- failOnError(err, "failed to decode json")
- } else {
- result["data"] = string(delivery.Body[:])
- }
- return result
+// DecodeDelivery - takes a presumably byte-formatted json stream and makes it a map for us
+// will return errors if things don't actually work out that well
+func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}, error) {
+ //var result map[string]string //interface{}
+ result := make(map[string]interface{})
+ // either explicitly json, or zigbee2mqtt (which we have set to only output json)
+ isZigbee := strings.HasPrefix(delivery.RoutingKey, "zigbee2mqtt")
+ if delivery.ContentType == "application/json" || isZigbee {
+ err := json.Unmarshal(delivery.Body, &result)
+ if err != nil {
+ return result, err
+ }
+ if isZigbee {
+ // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
+ // try to further decode this in an effort to help the data consumer
+ _, actionPresent := result["action"]
+ if actionPresent {
+ var decode map[string]interface{}
+ err := json.Unmarshal([]byte(result["action"].(string)), &decode)
+ if err != nil {
+ return decode, err
+ }
+ result["action"] = decode
+ }
+ }
+ } else {
+ result["data"] = string(delivery.Body[:])
+ }
+ return result, nil
}
-
-func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool){
- if rabbitData.IncludeDate {
- // add the date as a convenience
- utcNow := time.Now().UTC()
- dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
- rabbitData.Data["date"] = dateString
- }
-
- if rabbit.Source != "" {
- rabbitData.Data["source"] = rabbit.Source
- }
-
- jsonBytes, err := json.Marshal(rabbitData.Data)
- failOnError(err, "Improperly formatted JSON, just gonna exit here")
- var routingKey string
- if rabbitData.RoutingKey != "" {
- routingKey = rabbitData.RoutingKey
- } else if rabbit.DefaultRoutingKey != "" {
- routingKey = rabbit.DefaultRoutingKey
- } else {
- failOnError(fmt.Errorf("routing key error"), "No valid routing key found to send on")
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- err = rabbit.Channel.PublishWithContext(ctx,
- rabbit.Exchange, // exchange
- routingKey, // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "application/json",
- Body: jsonBytes,
- })
- failOnError(err, "Failed to publish a message")
- if verboseSend {
- log.Printf("Sent %s routing: %s\n", jsonBytes, routingKey)
- }
+
+// SendData takes data using parameters from our own structs RabbitSend and RabbitConfig, and puts it out on the given routingkey
+func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool) error {
+ // grab the mutex, unlock when we're done
+ sendMutex.Lock()
+ defer sendMutex.Unlock()
+
+ if rabbitData.IncludeDate {
+ // add the date as a convenience
+ utcNow := time.Now().UTC()
+ dateString := timeutil.Strftime(&utcNow, "%Y-%m-%d %H:%M:%S")
+ rabbitData.Data["date"] = dateString
+ }
+
+ if rabbit.Source != "" {
+ rabbitData.Data["source"] = rabbit.Source
+ }
+
+ jsonBytes, err := json.Marshal(rabbitData.Data)
+ if err != nil {
+ return err
+ }
+ var routingKey string
+ if rabbitData.RoutingKey != "" {
+ routingKey = rabbitData.RoutingKey
+ } else if rabbit.DefaultRoutingKey != "" {
+ routingKey = rabbit.DefaultRoutingKey
+ } else {
+ if err != nil {
+ return fmt.Errorf("No valid routing key to send on")
+ }
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ err = rabbit.Channel.PublishWithContext(ctx,
+ rabbit.Exchange, // exchange
+ routingKey, // routing key
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ ContentType: "application/json",
+ Body: jsonBytes,
+ })
+ if err != nil {
+ return err
+ }
+
+ if verboseSend {
+ logger.Info("Sent data to rabbitmq", "data", rabbitData.Data, "routingKey", routingKey)
+ }
+ return nil
}
package main
import (
+ "flag"
"fmt"
- "log"
"os"
+ "reflect"
"time"
// prometheus imports
+ "net/http"
+
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
- "net/http"
+
+ // logging interface
+ "github.com/go-logr/logr"
"unpiege.net/rabbit_go.git/helper"
)
},
[]string{"scale", "location"},
)
- //tempExpire = map[prometheus.Labels]time.Time
- //tempExpire = map[string]string
+
+ powerGauge = promauto.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "ePDUOutletStatusActivePower",
+ Help: "Used as an analog to power snmp fetching",
+ },
+ []string{"ePDUOutletStatusIndex", "ePDUOutletStatusOutletName"},
+ )
tempExpire = make(map[[2]string]time.Time)
- staleTime = time.Duration(15 * time.Second)
+ staleTime = time.Duration(15 * time.Second)
+ logger logr.Logger
+ configFilename string
)
-type Device interface {
+type device interface {
getRoutingKey() string
}
-type ZigDevice struct {
- routingKey string
+type zigdevice struct {
+ friendlyName string
}
-type DIYDevice struct {
- *ZigDevice
+type diydevice struct {
+ *zigdevice
}
-type PowerDevice struct {
- *ZigDevice
- powerName string
- queryNeeded bool
+type powertime struct {
+ timeLogged time.Time
+ powerValue float64
}
-func (dev *DIYDevice) getRoutingKey() string {
- return fmt.Sprintf("zigbee2mqtt.%s", dev.routingKey)
+type powerdevice struct {
+ *zigdevice
+ powerName string
+ queryNeeded bool
+ lastUpdate time.Time
+ lastUpdateRequested time.Time
+ powerList []powertime
}
-func NewDevice(routingKey string) *ZigDevice {
- retval := ZigDevice{routingKey: routingKey}
- //retval.routingKey = routingKey
- return &retval
+func (dev *powerdevice) filter() {
+ // if this gets excessive we can pass it down the chain
+ now := time.Now().UTC()
+ // default time to keep power values around
+ const filterTime = 60
+
+ // filter to ignore old values
+ var sliceStart int
+ for index, pt := range dev.powerList {
+ // break when we find our first valid value (since they should be sorted)
+ sliceStart = index
+ if pt.timeLogged.After(now.Add(-time.Second * filterTime)) {
+ break
+ }
+ }
+
+ // reslice, excluding all previous (cutoff) values
+ dev.powerList = dev.powerList[sliceStart:]
+
}
-func NewDIYDevice(routingKey string) *DIYDevice {
- return &DIYDevice{
- ZigDevice: NewDevice(routingKey),
+func (dev *powerdevice) avgPower() float64 {
+ totalPower := 0.0
+ // sum up the power list
+ for _, pt := range dev.powerList {
+ totalPower = totalPower + pt.powerValue
}
+ // average and return
+ return totalPower / float64(len(dev.powerList))
+}
+func (dev *powerdevice) add(newTime time.Time, newVal float64) {
+ dev.powerList = append(dev.powerList, powertime{timeLogged: newTime, powerValue: newVal})
+ dev.filter()
}
-//tempExpire := make(map[[2]string]time.Time)
+func (dev *zigdevice) getRoutingKey() string {
+ return fmt.Sprintf("zigbee2mqtt.%s", dev.friendlyName)
+}
-func failOnError(err error, msg string) {
- if err != nil {
- log.Panicf("%s: %s", msg, err)
+func (dev *powerdevice) getReqKey() string {
+ return fmt.Sprintf("zigbee2mqtt.%s.get", dev.friendlyName)
+}
+
+func newdevice(friendlyName string) *zigdevice {
+ retval := zigdevice{friendlyName: friendlyName}
+ return &retval
+}
+
+func newdiydevice(friendlyName string) *diydevice {
+ return &diydevice{
+ zigdevice: newdevice(friendlyName),
+ }
+
+}
+
+func newpowerdevice(friendlyName, powerName string, queryNeeded bool) *powerdevice {
+ return &powerdevice{
+ zigdevice: newdevice(friendlyName),
+ powerName: powerName,
+ queryNeeded: queryNeeded,
}
}
func handleTemp(obj map[string]interface{}) {
- log.Printf("temp! %v", obj)
+ logger.V(1).Info("Temperature received", "obj", obj)
celsius := obj["celsius"].(float64)
location := obj["location"].(string)
now := time.Now().UTC()
- // TODO: last update portion
-
// do the label update here
- //log.Printf("celsius: %f location %s", celsius, location)
tempGauge.With(prometheus.Labels{"scale": "celsius", "location": location}).Set(celsius)
+ // here, we might have any number of temp values some not covered by our own objects
tempExpire[[2]string{"celsius", location}] = now
}
func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
- log.Printf("diy! %v", obj)
+ logger.V(1).Info("DIY data received", "obj", obj)
_, isAction := obj["action"]
if isAction {
- // this is yet more JSON, so decode it
+ // this should also already be decoded for us
data := obj["action"].(map[string]interface{})
- /* err := json.Unmarshal([]byte(obj["action"].(string)), &data)
- if err != nil {
- log.Printf("Unable to decode json action from diy, ignoring")
- return
- }*/
var senseType string
_, isTemp := data["celsius"]
if hasType {
senseType = data["type"].(string)
} else {
- log.Printf("Did not find any sense type, just returning")
+ logger.Info("Did not find any sense type, just returning")
return
}
}
dataMap["fahrenheit"] = data["celsius"].(float64)*9/5 + 32
dataMap["celsius"] = data["celsius"].(float64)
dataMap["location"] = data["location"].(string)
- log.Printf("would send %v", dataMap)
+ logger.V(2).Info("Sending reprocessed temperature")
// hardcoded temp routingKey for this type of measurement
sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true}
sendChannel <- sendThis
dataMap["motion_detected"] = data["state"].(bool)
// copied from above, but it might change later so shrug
dataMap["location"] = data["location"].(string)
- log.Printf("sending reprocessed motion: %v", dataMap)
+ logger.V(2).Info("Sending reprocessed motion")
sendThis := helper.RabbitSend{Data: dataMap, RoutingKey: "lol", IncludeDate: true}
sendChannel <- sendThis
} else {
- log.Printf("error! sense type not detected, ignoring")
+ logger.Info("Sense type not detected, ignoring")
}
} else {
- log.Printf("not action, ignoring")
+ logger.V(2).Info("not action, ignoring")
}
}
-func readLoop(channel chan helper.RabbitSend, devices []Device, rabbit helper.RabbitConfig) {
+// are we getting new data on behalf of a request (if spammy), or is the reporting configured so it doesn't spam?
+// in other words, do we trust this data is evenly spaced?
+func (dev *powerdevice) shouldUpdate(now time.Time) bool {
+ const updateStalenessSeconds = 2
+ // well-behaved OR in the last 2 seconds since we requested
+ if now.Add(updateStalenessSeconds * -time.Second).Before(dev.lastUpdateRequested) {
+ return true
+ }
+ return false
+}
+
+func handlePower(obj map[string]interface{}, sendChannel chan helper.RabbitSend, triggerDevice *powerdevice) {
+ now := time.Now().UTC()
+ logger.V(1).Info("Power data received", "obj", obj)
+ //name := triggerDevice.powerName
+ // here, we're directly tracking power devices so we can bind updated directly to the object
+ triggerDevice.lastUpdate = now
+ if triggerDevice.shouldUpdate(now) {
+ triggerDevice.add(now, obj["power"].(float64))
+ powerGauge.With(prometheus.Labels{"ePDUOutletStatusIndex": triggerDevice.friendlyName, "ePDUOutletStatusOutletName": triggerDevice.powerName}).Set(triggerDevice.avgPower())
+ // don't worry about expiration, since we have all the values in our objects
+ }
+
+}
+
+func readLoop(channel chan helper.RabbitSend, devices []device, rabbit helper.RabbitConfig) {
for _, device := range devices {
- rabbit = helper.Bind(device.getRoutingKey(), rabbit)
- log.Printf("bound to %s", device.getRoutingKey())
+ err := helper.Bind(device.getRoutingKey(), &rabbit)
+ // TODO: find a better way to bubble this up
+ if err != nil {
+ logger.Error(err, "unable to bind successfully to exchange", "routingKey", device.getRoutingKey())
+ os.Exit(1)
+ }
+ }
+ /// this one is hardcoded, for the generic temperature output
+ err := helper.Bind("temp", &rabbit)
+ if err != nil {
+ logger.Error(err, "unable to bind successfully to exchange", "routingKey", "temp")
+ os.Exit(1)
}
- rabbit = helper.Bind("temp", rabbit)
- log.Printf("bound to temp")
- deliveries := helper.StartConsuming(rabbit)
+
+ deliveries, err := helper.StartConsuming(rabbit)
+ if err != nil {
+ logger.Error(err, "unable to start consuming data from rabbit")
+ os.Exit(1)
+ }
+
for delivery := range deliveries {
- //log.Printf("got a delivery %+v", delivery)
- item := helper.DecodeDelivery(delivery)
- //log.Printf("%+v", item)
+ item, err := helper.DecodeDelivery(delivery)
+ // it's just one delivery so we're going to yell and then continue along unabated
+ if err != nil {
+ logger.Error(err, "unable to decode delivery", "delivery", delivery)
+ continue
+ }
+
+ // only if you really want to see it all
+ logger.V(2).Info("Received a delivery", "delivery", delivery)
if delivery.RoutingKey == "temp" {
handleTemp(item)
}
for _, device := range devices {
if device.getRoutingKey() == delivery.RoutingKey {
- log.Printf("Found device for routing key %s, %v", delivery.RoutingKey, device)
+ logger.V(3).Info("Found device for routing key", "routingKey", delivery.RoutingKey, "device", reflect.TypeOf(device).String())
switch t := device.(type) {
- case *DIYDevice:
- log.Printf("DIY device, sending to handle")
+ case *diydevice:
+ logger.V(2).Info("DIY device, sending to handle", "device", device)
handleDIY(item, channel)
+ case *powerdevice:
+ logger.V(2).Info("Power device, sending to handle", "device", device)
+ // it's explicitly a powerdevice, so cast it before sending
+ handlePower(item, channel, device.(*powerdevice))
default:
- log.Printf("other device, type was %s", t)
+ logger.Info("Found a device we can't classify", "device", device, "type", t)
}
}
}
//time.Sleep(time.Duration(0.5*float64(time.Second)))
}
}
-func timeLoop(channel chan helper.RabbitSend) {
+func powerLoop(sendChannel chan helper.RabbitSend, devices []device) {
+ const powerRequestSeconds = 15
+ // first, build up a list of the power devices since those are the ones we care about
+ var pds []*powerdevice
+ for _, device := range devices {
+ switch device.(type) {
+ case *powerdevice:
+ pds = append(pds, device.(*powerdevice))
+ }
+ }
+
+ logger.V(3).Info("Built up PD list", "pds", pds)
+
+ var reqMap = make(map[string]interface{}, 0)
+ reqMap["power"] = ""
for {
- time.Sleep(time.Duration(0.5 * float64(time.Second)))
+ // send requests for power on all devices
+ now := time.Now().UTC()
+ for _, pd := range pds {
+ logger.V(1).Info("Requesting power for device", "device", pd)
+ sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey()}
+ sendChannel <- sendThis
+
+ pd.lastUpdateRequested = now
+ }
+
+ // then sleep for the next cycle
+ time.Sleep(time.Duration(powerRequestSeconds * float64(time.Second)))
}
}
func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) {
for sendItem := range channel {
- log.Printf("would try to send here %v", sendItem)
helper.SendData(sendItem, rabbit, true)
//helper.SendData(sendData, rabbit, false)
}
for labels, lastUpdate := range tempExpire {
curStale := now.Sub(lastUpdate)
if curStale > staleTime {
- log.Printf("stale time %v, expiring labels %v\tlastUpdate: %v", curStale, labels, lastUpdate)
+ logger.Info("stale metric update time, expiring labels", "curStale", curStale, "labels", labels, "lastUpdate", lastUpdate)
tempExpireValues = append(tempExpireValues, labels)
}
}
if complete {
delete(tempExpire, labels)
} else {
- log.Panicf("oh no wasn't able to delete metrics")
+ logger.Info("unable to delete metrics, exiting")
+ os.Exit(1)
}
}
}
-func setupLogging() {
- log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
-}
-
func main() {
- setupLogging()
+ // logging and flag initialization
+ flag.StringVar(&configFilename, "config", "", "the config filename")
+ logger = helper.NewLogger()
+ flag.Parse()
+
+ if configFilename == "" {
+ logger.Error(fmt.Errorf("need to define config in order to load!"), "invalid config")
+ os.Exit(1)
+ }
- rabbit := helper.SetupRabbit(os.Args[1], "", "reprocess") // config file, default routing key, source
+ rabbit, err := helper.SetupRabbit(configFilename, "", "reprocess") // config file, default routing key, source
+ if err != nil {
+ logger.Error(err, "unable to setup rabbit")
+ os.Exit(1)
+ }
+
+ defer rabbit.Channel.Close()
const sleepTime time.Duration = 5 * time.Second
- devices := make([]Device, 0)
- devices = append(devices, NewDIYDevice("office_pico"))
- devices = append(devices, NewDIYDevice("stairway_pico"))
- devices = append(devices, NewDIYDevice("stairway_pico"))
- devices = append(devices, NewDIYDevice("downstairs_stairway_pico"))
- devices = append(devices, NewDIYDevice("upstairs_pico"))
- devices = append(devices, NewDIYDevice("data_closet_pico"))
+ devices := make([]device, 0)
+
+ // DIY devices
+ devices = append(devices, newdiydevice("office_pico"))
+ devices = append(devices, newdiydevice("stairway_pico"))
+ devices = append(devices, newdiydevice("stairway_pico"))
+ devices = append(devices, newdiydevice("downstairs_stairway_pico"))
+ devices = append(devices, newdiydevice("upstairs_pico"))
+ devices = append(devices, newdiydevice("data_closet_pico"))
+
+ // power devices
+ devices = append(devices, newpowerdevice("0x00158d0002a2e370", "wiggleswhale", true))
//currentTemp, err := fetchTemp(weatherStation)
channel := make(chan helper.RabbitSend)
go promSetup()
go readLoop(channel, devices, rabbit)
- go timeLoop(channel)
+ go powerLoop(channel, devices)
go sendLoop(channel, rabbit)
for true {
package main
import (
- //"encoding/json"
- //"fmt"
- //"io"
- "log"
+ "flag"
+ "io/ioutil"
"math"
- //"net/http"
+
"os"
- //"strconv"
"time"
//"github.com/leekchan/timeutil"
+ "github.com/go-logr/logr"
+ "gopkg.in/yaml.v2"
"unpiege.net/rabbit_go.git/helper"
)
+var logger logr.Logger
+var configFilename, timeConfigFilename string
+
+type timeConfigYaml struct {
+ PeakTime TimeFloat `yaml:"PeakTime"`
+ TimeColors []TimeColor `yaml:"TimeColors"`
+}
+
// TimeFloat allows user to specify times in a more meaningful way
type TimeFloat struct {
- Hour int
- Minute int
- Second int
+ Hour int `yaml:"Hour"`
+ Minute int `yaml:"Minute"`
+ Second int `yaml:"Second"`
}
// TimeColor is one instance of RGB values changing over time
type TimeColor struct {
- PeakTime TimeFloat
- Red float64
- Green float64
- Blue float64
- Extent TimeFloat
+ PeakTime TimeFloat `yaml:"PeakTime"`
+ Red float64 `yaml:"Red"`
+ Green float64 `yaml:"Green"`
+ Blue float64 `yaml:"Blue"`
+ Extent TimeFloat `yaml:"Extent"`
}
type RGB struct {
- Red float64
- Green float64
- Blue float64
+ Red float64 `yaml:"Red"`
+ Green float64 `yaml:"Green"`
+ Blue float64 `yaml:"Blue"`
}
// make sure no value is greater than 1
minDistance = val
}
}
- //log.Printf("minDistance: %f", minDistance)
if minDistance > tc.Extent.Float()/2 {
- //log.Printf("turned off")
// multiplier is already initialized zero so do nothing
} else {
extentMultiplier := 1 / tc.Extent.Float()
multiplier = math.Cos(math.Pi * extentMultiplier * minDistance)
}
- //log.Printf("multiplier is %f", multiplier)
retval.Mult(multiplier)
return retval
}
-func failOnError(err error, msg string) {
- if err != nil {
- log.Panicf("%s: %s", msg, err)
- }
-}
-
func sum(arr []float64) float64 {
sum := 0.0
for _, value := range arr {
func timeLoop(channel chan helper.RabbitSend, timeColors []TimeColor) {
loc, _ := time.LoadLocation("America/Los_Angeles")
for {
- //log.Printf("okay just looping here")
curTime := time.Now().UTC().In(loc)
- //log.Printf("here's the current time %s", curTime)
totalRGB := RGB{}
for _, tc := range timeColors {
- //log.Printf("Evaluating RGB for %+v", tc)
rgb := tc.RGBValues(curTime)
- //log.Printf("%+v", rgb)
+ logger.V(3).Info("RGB values for tc", "tc", tc, "rgb", rgb)
totalRGB = totalRGB.Add(rgb)
}
totalRGB.Mult(0.877)
- //log.Printf("TotalRGB: %+v", totalRGB)
+ logger.V(3).Info("total RGB values", "rgb", totalRGB)
sendObj := make(map[string]interface{})
sendObj["red"] = totalRGB.Red
func sendLoop(channel chan helper.RabbitSend, rabbit helper.RabbitConfig) {
for sendData := range channel {
- helper.SendData(sendData, rabbit, false)
+ err := helper.SendData(sendData, rabbit, false)
+ // TODO: find a cleaner way to bubble this up
+ if err != nil {
+ logger.Error(err, "error sending data in send loop, exiting")
+ os.Exit(1)
+ }
}
}
+func (c *timeConfigYaml) Parse(data []byte) error {
+ return yaml.Unmarshal(data, c)
+}
+
func main() {
- rabbit := helper.SetupRabbit(os.Args[1], "timecolorshift.py", "timecolorshift") // config file, routing key
+ // logging and flag initialization
+ flag.StringVar(&configFilename, "config", "", "the config filename")
+ flag.StringVar(&timeConfigFilename, "timeConfig", "", "yaml config filename (for colors)")
+
+ logger = helper.NewLogger()
+ flag.Parse()
+
+ rabbit, err := helper.SetupRabbit(configFilename, "timecolorshift.py", "timecolorshift") // config file, default routing key, source
+ if err != nil {
+ logger.Error(err, "failed to connect to rabbitmq")
+ os.Exit(1)
+ }
const sleepTime time.Duration = 30 * time.Second
//currentTemp, err := fetchTemp(weatherStation)
channel := make(chan helper.RabbitSend)
- all_tcs := make([]TimeColor, 0)
- all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour: 12}))
- all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour: 8}))
- all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 20}, 0.25, 0.25, 0, TimeFloat{Hour: 12}))
- go timeLoop(channel, all_tcs)
+ var tcy timeConfigYaml
+
+ if timeConfigFilename != "" {
+ logger.Info("Parsing provided timeConfigFilename", "timeConfigFilename", timeConfigFilename)
+ data, err := ioutil.ReadFile(timeConfigFilename)
+ if err != nil {
+ logger.Error(err, "error reading time config yaml file")
+ os.Exit(1)
+ }
+ //err = yaml.Unmarshal(data, &tcy)
+ err = tcy.Parse(data)
+ if err != nil {
+ logger.Error(err, "error parsing time config yaml file")
+ os.Exit(1)
+ }
+ } else {
+ logger.Info("Adding default (compiled) time colors")
+ all_tcs := make([]TimeColor, 0)
+ all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 12}, 0, 1, 0.5, TimeFloat{Hour: 12}))
+ all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 2}, 1, 0.1, 0, TimeFloat{Hour: 8}))
+ all_tcs = append(all_tcs, NewTimeColor(TimeFloat{Hour: 20}, 0.25, 0.25, 0, TimeFloat{Hour: 12}))
+ tcy.TimeColors = all_tcs
+ }
+ // just display everything we configured before we boot further
+ for _, tc := range tcy.TimeColors {
+ logger.Info("displaying processed timeColors", "TimeColor", tc)
+ }
+
+ go timeLoop(channel, tcy.TimeColors)
go sendLoop(channel, rabbit)
for true {
time.Sleep(sleepTime)
import (
//"encoding/json"
+ "flag"
"fmt"
"io"
- "log"
"math"
"net/http"
"os"
"time"
"github.com/PuerkitoBio/goquery"
+ "github.com/go-logr/logr"
+
//"github.com/leekchan/timeutil"
"unpiege.net/rabbit_go.git/helper"
)
-func failOnError(err error, msg string) {
- if err != nil {
- log.Panicf("%s: %s", msg, err)
- }
-}
+var logger logr.Logger
+var configFilename string
func sum(arr []float64) float64 {
sum := 0.0
if childClass == "wu-value wu-value-to" {
currentTemp, err = strconv.ParseFloat(t.Text(), 10)
if err != nil {
- log.Println(err)
+ logger.Error(err, "error finding current temp")
}
foundTemp = true
}
}
})
if !foundTemp {
- fmt.Println("oh no!")
return currentTemp, fmt.Errorf("An error occurred while parsing html data")
}
return currentTemp, nil
Timeout: time.Second * 10,
}
fullURL := fmt.Sprintf("%s/%s", baseURL, weatherStation)
- log.Printf("Fetching full url %s\n", fullURL)
+ logger.Info("Fetching full url", "url", fullURL)
req, err := http.NewRequest("GET", fullURL, nil)
if err != nil {
return 0, err
func tempLoop(channel chan int, weatherStation string, rabbit helper.RabbitConfig) error {
tempList := make([]float64, 0)
- for count := range channel {
+ for _ = range channel {
const tempLength int = 60
- log.Printf("doing another fetch, count %d", count)
currentTemp, err := fetchTemp(weatherStation)
if err != nil {
- log.Printf("Wasn't able to get temp %s", err)
+ logger.Error(err, "msg", "Wasn't able to get temp, ignoring")
} else {
if currentTemp == 0 {
- fmt.Printf("Got 0F for temp, ignoring")
+ logger.Info("Got 0F for temp, ignoring")
} else {
celsius := (currentTemp - 32) * 5 / 9
tempList = append(tempList, celsius)
}
tempAvg := sum(tempList) / float64(len(tempList))
if math.Abs(tempAvg-celsius) > 3.0 {
- log.Printf("temperature %f outside mean %f, not printing", celsius, tempAvg)
+ logger.Info("msg", "temperature outside mean, not printing", "current", celsius, "average", tempAvg)
} else {
sendTemp(currentTemp, celsius, rabbit)
}
- fmt.Printf("temp list is %+v, avg is %f\n", tempList, tempAvg)
+ logger.V(2).Info("updated templist and average", "tempList", tempList, "tempAvg", tempAvg)
}
}
}
}
func main() {
- rabbit := helper.SetupRabbit(os.Args[1], "temp", "wunder") // config file, routing key
+ // logging and flag initialization
+ flag.StringVar(&configFilename, "config", "", "the config filename")
+ logger = helper.NewLogger()
+ flag.Parse()
+
+ rabbit, err := helper.SetupRabbit(configFilename, "temp", "wunder") // config file, routing key
+ if err != nil {
+ logger.Error(err, "failed in rabbitmq setup")
+ os.Exit(1)
+ }
const weatherStation string = "KWASEATT2696"
const sleepTime time.Duration = 30 * time.Second
channel := make(chan int)
go tempLoop(channel, weatherStation, rabbit)
//time.Sleep(sleepTime)
- var count int
for true {
- channel <- count
- count = count + 1
+ channel <- 0
time.Sleep(sleepTime)
}