Data map[string]interface{}
RoutingKey string
IncludeDate bool // defaults to false, so we don't send the date
+ EmptySource bool // defaults to false, used when we don't want zigbee2mqtt to get mad at us
+ SendRaw bool
+ RawData string
}
// RabbitConfig simple values to pass around
// StartConsuming - starts consuming with all our presets and returns channel of deliveries
func StartConsuming(rabbit RabbitConfig) (<-chan amqp.Delivery, error) {
// grab the mutex, unlock when we're done
- sendMutex.Lock()
- defer sendMutex.Unlock()
+ //sendMutex.Lock()
+ //defer sendMutex.Unlock()
deliveries, err := rabbit.Channel.Consume(
rabbit.Queue.Name,
return result, err
}
if isZigbee {
- // if we have an action present, we kinda know that it was my own jank encapsulated json fields as the value
+ // if we have an action present, it may either be json encoded from a pico or... generated by the system and not
// try to further decode this in an effort to help the data consumer
_, actionPresent := result["action"]
if actionPresent {
var decode map[string]interface{}
err := json.Unmarshal([]byte(result["action"].(string)), &decode)
- if err != nil {
- return decode, err
+ if err == nil {
+ result["action"] = decode
}
- result["action"] = decode
+ // otherwise, just leave it alone and return as-is
}
}
} else {
rabbitData.Data["date"] = dateString
}
- if rabbit.Source != "" {
+ if rabbit.Source != "" && !rabbitData.EmptySource {
rabbitData.Data["source"] = rabbit.Source
}
- jsonBytes, err := json.Marshal(rabbitData.Data)
+ var err error
+ var jsonBytes []byte
+ jsonBytes, err = json.Marshal(rabbitData.Data)
if err != nil {
return err
}
func (dev *powerdevice) shouldUpdate(now time.Time) bool {
const updateStalenessSeconds = 2
// well-behaved OR in the last 2 seconds since we requested
- if now.Add(updateStalenessSeconds * -time.Second).Before(dev.lastUpdateRequested) {
+ if !dev.queryNeeded || now.Add(updateStalenessSeconds*-time.Second).Before(dev.lastUpdateRequested) {
return true
}
return false
var reqMap = make(map[string]interface{}, 0)
reqMap["power"] = ""
for {
- // send requests for power on all devices
+ // send requests for power on all devices that support it
now := time.Now().UTC()
for _, pd := range pds {
- logger.V(1).Info("Requesting power for device", "device", pd)
- sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey()}
- sendChannel <- sendThis
+ if pd.queryNeeded {
+ logger.V(1).Info("Requesting power for device", "device", pd)
+ sendThis := helper.RabbitSend{Data: reqMap, RoutingKey: pd.getReqKey(), EmptySource: true}
+ sendChannel <- sendThis
- pd.lastUpdateRequested = now
+ pd.lastUpdateRequested = now
+ }
}
// then sleep for the next cycle