Bugfixes for zigbee2mqtt in reprocess (mostly)
authorjweigele <jweigele@local>
Tue, 13 Dec 2022 18:42:14 +0000 (10:42 -0800)
committerjweigele <jweigele@local>
Tue, 13 Dec 2022 18:42:14 +0000 (10:42 -0800)
Basically, whenever z2m gets a json encoded dict, it tries to deref the
keys and use as converters.  That normally works fine, but when you have
extra "source" values then it looks up converters that don't exist -
error!
 * Allow for EmptySource, a param to disable sending it even if it
   exists

Second, we were querying power devices with "power" gets, which
mobile_power doesn't support
 * Properly adhere to "queryNeeded", allowing us to either get the
   requests for updates, or just let it send along in its own time

helper/helper.go
reprocess/main.go

index be6571837faea146206920d0590787ef50351111..aa625c08a42c95412f045b234bc9e862f5620d2e 100644 (file)
@@ -43,6 +43,9 @@ type RabbitSend struct {
        Data        map[string]interface{}
        RoutingKey  string
        IncludeDate bool // defaults to false, so we don't send the date
+       EmptySource bool // defaults to false, used when we don't want zigbee2mqtt to get mad at us
+       SendRaw     bool
+       RawData     string
 }
 
 // RabbitConfig simple values to pass around
@@ -155,8 +158,8 @@ func Bind(routingKey string, rabbit *RabbitConfig) error {
 // StartConsuming - starts consuming with all our presets and returns channel of deliveries
 func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery, error) {
        // grab the mutex, unlock when we're done
-       sendMutex.Lock()
-       defer sendMutex.Unlock()
+       //sendMutex.Lock()
+       //defer sendMutex.Unlock()
 
        deliveries, err := rabbit.Channel.Consume(
                rabbit.Queue.Name,
@@ -184,16 +187,16 @@ func DecodeDelivery(delivery amqp.Delivery) (map[string]interface{}, error) {
                        return result, err
                }
                if isZigbee {
-                       // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
+                       // if we have an action present, it may either be json encoded from a pico or... generated by the system and not
                        // try to further decode this in an effort to help the data consumer
                        _, actionPresent := result["action"]
                        if actionPresent {
                                var decode map[string]interface{}
                                err := json.Unmarshal([]byte(result["action"].(string)), &decode)
-                               if err != nil {
-                                       return decode, err
+                               if err == nil {
+                                       result["action"] = decode
                                }
-                               result["action"] = decode
+                               // otherwise, just leave it alone and return as-is
                        }
                }
        } else {
@@ -215,11 +218,13 @@ func SendData(rabbitData RabbitSend, rabbit RabbitConfig, verboseSend bool) erro
                rabbitData.Data["date"] = dateString
        }
 
-       if rabbit.Source != "" {
+       if rabbit.Source != "" && !rabbitData.EmptySource {
                rabbitData.Data["source"] = rabbit.Source
        }
 
-       jsonBytes, err := json.Marshal(rabbitData.Data)
+       var err error
+       var jsonBytes []byte
+       jsonBytes, err = json.Marshal(rabbitData.Data)
        if err != nil {
                return err
        }
index 166dee096fd2b95436887d8784368f9456c0925a..8a36cc0181334f68fd248ea139b29d782eda71be 100644 (file)
@@ -214,7 +214,7 @@ func handleDIY(obj map[string]interface{}, sendChannel chan helper.RabbitSend) {
 func (dev *powerdevice) shouldUpdate(now time.Time) bool {
        const updateStalenessSeconds = 2
        // well-behaved OR in the last 2 seconds since we requested
-       if now.Add(updateStalenessSeconds * -time.Second).Before(dev.lastUpdateRequested) {
+       if !dev.queryNeeded || now.Add(updateStalenessSeconds*-time.Second).Before(dev.lastUpdateRequested) {
                return true
        }
        return false
@@ -304,14 +304,16 @@ func powerLoop(sendChannel chan helper.RabbitSend, devices []device) {
        var reqMap = make(map[string]interface{}, 0)
        reqMap["power"] = ""
        for {
-               // send requests for power on all devices
+               // send requests for power on all devices that support it
                now := time.Now().UTC()
                for _, pd := range pds {
-                       logger.V(1).Info("Requesting power for device", "device", pd)
-                       sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey()}
-                       sendChannel <- sendThis
+                       if pd.queryNeeded {
+                               logger.V(1).Info("Requesting power for device", "device", pd)
+                               sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey(), EmptySource: true}
+                               sendChannel <- sendThis
 
-                       pd.lastUpdateRequested = now
+                               pd.lastUpdateRequested = now
+                       }
                }
 
                // then sleep for the next cycle