From adea74939bb6826ae8cd246ec0cb6b6b15b7f1dc Mon Sep 17 00:00:00 2001 From: jweigele Date: Tue, 13 Dec 2022 10:42:14 -0800 Subject: [PATCH] Bugfixes for zigbee2mqtt in reprocess (mostly) Basically, whenever z2m gets a json encoded dict, it tries to deref the keys and use as converters. That normally works fine, but when you have extra "source" values then it looks up converters that don't exist - error! * Allow for EmptySource, a param to disable sending it even if it exists Second, we were querying power devices with "power" gets, which mobile_power doesn't support * Properly adhere to "queryNeeded", allowing us to either get the requests for updates, or just let it send along in its own time --- helper/helper.go | 21 +++++++++++++-------- reprocess/main.go | 14 ++++++++------ 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index be65718..aa625c0 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -43,6 +43,9 @@ type RabbitSend struct { Data map[string]interface{} RoutingKey string IncludeDate bool // defaults to false, so we don't send the date + EmptySource bool // defaults to false, used when we don't want zigbee2mqtt to get mad at us + SendRaw bool + RawData string } // RabbitConfig simple values to pass around @@ -155,8 +158,8 @@ func Bind(routingKey string, rabbit *RabbitConfig) error { // StartConsuming - starts consuming with all our presets and returns channel of deliveries func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery, error) { // grab the mutex, unlock when we're done - sendMutex.Lock() - defer sendMutex.Unlock() + //sendMutex.Lock() + //defer sendMutex.Unlock() deliveries, err := rabbit.Channel.Consume( rabbit.Queue.Name, @@ -184,16 +187,16 @@ func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}, error) { return result, err } if isZigbee { - // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value + // if we have an action present, it may either be json encoded from a pico or... generated by the system and not // try to further decode this in an effort to help the data consumer _, actionPresent := result["action"] if actionPresent { var decode map[string]interface{} err := json.Unmarshal([]byte(result["action"].(string)), &decode) - if err != nil { - return decode, err + if err == nil { + result["action"] = decode } - result["action"] = decode + // otherwise, just leave it alone and return as-is } } } else { @@ -215,11 +218,13 @@ func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool) erro rabbitData.Data["date"] = dateString } - if rabbit.Source != "" { + if rabbit.Source != "" && !rabbitData.EmptySource { rabbitData.Data["source"] = rabbit.Source } - jsonBytes, err := json.Marshal(rabbitData.Data) + var err error + var jsonBytes []byte + jsonBytes, err = json.Marshal(rabbitData.Data) if err != nil { return err } diff --git a/reprocess/main.go b/reprocess/main.go index 166dee0..8a36cc0 100644 --- a/reprocess/main.go +++ b/reprocess/main.go @@ -214,7 +214,7 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) { func (dev *powerdevice) shouldUpdate(now time.Time) bool { const updateStalenessSeconds = 2 // well-behaved OR in the last 2 seconds since we requested - if now.Add(updateStalenessSeconds * -time.Second).Before(dev.lastUpdateRequested) { + if !dev.queryNeeded || now.Add(updateStalenessSeconds*-time.Second).Before(dev.lastUpdateRequested) { return true } return false @@ -304,14 +304,16 @@ func powerLoop(sendChannel chan helper.RabbitSend, devices []device) { var reqMap = make(map[string]interface{}, 0) reqMap["power"] = "" for { - // send requests for power on all devices + // send requests for power on all devices that support it now := time.Now().UTC() for _, pd := range pds { - logger.V(1).Info("Requesting power for device", "device", pd) - sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey()} - sendChannel <- sendThis + if pd.queryNeeded { + logger.V(1).Info("Requesting power for device", "device", pd) + sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey(), EmptySource: true} + sendChannel <- sendThis - pd.lastUpdateRequested = now + pd.lastUpdateRequested = now + } } // then sleep for the next cycle -- 2.30.2