Modifications required to support new restrictions in Flume's API (#18696)

Signed-off-by: Jeff James <jeff@james-online.com>
pull/18301/head
jsjames 2025-06-12 12:53:48 -07:00 committed by GitHub
parent b3893bf8f8
commit 4a9af7110c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 201 additions and 167 deletions

View File

@ -49,6 +49,6 @@ public class FlumeBindingConstants {
// Properties // Properties
public static final String PROPERTY_ID = "id"; public static final String PROPERTY_ID = "id";
public static final int DEFAULT_POLLING_INTERVAL_INSTANTANEOUS = 1; public static final int DEFAULT_POLLING_INTERVAL_INSTANTANEOUS_MIN = 1;
public static final int DEFAULT_POLLING_INTERVAL_CUMULATIVE = 5; public static final int DEFAULT_POLLING_INTERVAL_CUMULATIVE_MIN = 5;
} }

View File

@ -28,6 +28,6 @@ public class FlumeBridgeConfig {
public String username = ""; public String username = "";
public String password = ""; public String password = "";
public int refreshIntervalInstantaneous = DEFAULT_POLLING_INTERVAL_INSTANTANEOUS; public int refreshIntervalInstantaneous = DEFAULT_POLLING_INTERVAL_INSTANTANEOUS_MIN;
public int refreshIntervalCumulative = DEFAULT_POLLING_INTERVAL_CUMULATIVE; public int refreshIntervalCumulative = DEFAULT_POLLING_INTERVAL_CUMULATIVE_MIN;
} }

View File

@ -26,6 +26,7 @@ import org.openhab.core.i18n.LocaleProvider;
import org.openhab.core.i18n.TranslationProvider; import org.openhab.core.i18n.TranslationProvider;
import org.openhab.core.i18n.UnitProvider; import org.openhab.core.i18n.UnitProvider;
import org.openhab.core.io.net.http.HttpClientFactory; import org.openhab.core.io.net.http.HttpClientFactory;
import org.openhab.core.storage.StorageService;
import org.openhab.core.thing.Bridge; import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.Thing; import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingTypeUID; import org.openhab.core.thing.ThingTypeUID;
@ -50,15 +51,18 @@ public class FlumeHandlerFactory extends BaseThingHandlerFactory {
private final HttpClientFactory httpClientFactory; private final HttpClientFactory httpClientFactory;
private final TranslationProvider i18nProvider; private final TranslationProvider i18nProvider;
private final LocaleProvider localeProvider; private final LocaleProvider localeProvider;
private final StorageService storageService;
public final SystemOfUnits systemOfUnits; public final SystemOfUnits systemOfUnits;
@Activate @Activate
public FlumeHandlerFactory(@Reference UnitProvider unitProvider, @Reference HttpClientFactory httpClientFactory, public FlumeHandlerFactory(@Reference UnitProvider unitProvider, @Reference HttpClientFactory httpClientFactory,
final @Reference TranslationProvider i18nProvider, final @Reference LocaleProvider localeProvider) { final @Reference TranslationProvider i18nProvider, final @Reference LocaleProvider localeProvider,
final @Reference StorageService storageService) {
this.systemOfUnits = unitProvider.getMeasurementSystem(); this.systemOfUnits = unitProvider.getMeasurementSystem();
this.httpClientFactory = httpClientFactory; this.httpClientFactory = httpClientFactory;
this.i18nProvider = i18nProvider; this.i18nProvider = i18nProvider;
this.localeProvider = localeProvider; this.localeProvider = localeProvider;
this.storageService = storageService;
} }
@Override @Override
@ -74,7 +78,7 @@ public class FlumeHandlerFactory extends BaseThingHandlerFactory {
return new FlumeBridgeHandler((Bridge) thing, systemOfUnits, this.httpClientFactory.getCommonHttpClient(), return new FlumeBridgeHandler((Bridge) thing, systemOfUnits, this.httpClientFactory.getCommonHttpClient(),
i18nProvider, localeProvider); i18nProvider, localeProvider);
} else if (THING_TYPE_METER.equals(thingTypeUID)) { } else if (THING_TYPE_METER.equals(thingTypeUID)) {
return new FlumeDeviceHandler(thing); return new FlumeDeviceHandler(thing, systemOfUnits, storageService);
} }
return null; return null;

View File

@ -76,8 +76,6 @@ public class FlumeDeviceActions implements ThingActions {
@ActionInput(name = "operation", label = "Operation", required = true, description = "The aggregate/accumulate operation to perform (SUM, AVG, MIN, MAX, CNT).") @Nullable String operation) { @ActionInput(name = "operation", label = "Operation", required = true, description = "The aggregate/accumulate operation to perform (SUM, AVG, MIN, MAX, CNT).") @Nullable String operation) {
logger.info("queryWaterUsage called"); logger.info("queryWaterUsage called");
FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage();
FlumeDeviceHandler localDeviceHandler = deviceHandler; FlumeDeviceHandler localDeviceHandler = deviceHandler;
if (localDeviceHandler == null) { if (localDeviceHandler == null) {
logger.debug("querying device usage, but device is undefined."); logger.debug("querying device usage, but device is undefined.");
@ -90,31 +88,26 @@ public class FlumeDeviceActions implements ThingActions {
logger.warn("queryWaterUsage called with null inputs"); logger.warn("queryWaterUsage called with null inputs");
return null; return null;
} }
if (!untilDateTime.isAfter(sinceDateTime)) {
if (!FlumeApi.OperationType.contains(operation)) {
logger.warn("Invalid aggregation operation in call to queryWaterUsage");
return null;
} else {
query.operation = FlumeApi.OperationType.valueOf(operation);
}
if (!FlumeApi.BucketType.contains(bucket)) {
logger.warn("Invalid bucket type in call to queryWaterUsage");
return null;
} else {
query.bucket = FlumeApi.BucketType.valueOf(bucket);
}
if (untilDateTime.isBefore(sinceDateTime)) {
logger.warn("sinceDateTime must be earlier than untilDateTime"); logger.warn("sinceDateTime must be earlier than untilDateTime");
return null; return null;
} }
if (!FlumeApi.OperationType.contains(operation)) {
logger.warn("Invalid aggregation operation in call to queryWaterUsage");
return null;
}
if (!FlumeApi.BucketType.contains(bucket)) {
logger.warn("Invalid bucket type in call to queryWaterUsage");
return null;
}
query.requestId = QUERYID; FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(QUERYID, //
query.sinceDateTime = sinceDateTime; sinceDateTime, //
query.untilDateTime = untilDateTime; untilDateTime, //
query.bucket = FlumeApi.BucketType.valueOf(bucket); FlumeApi.BucketType.valueOf(bucket), //
query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; 100, //
FlumeApi.OperationType.valueOf(operation), //
imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, FlumeApi.SortDirectionType.ASC);
Float usage; Float usage;
try { try {

View File

@ -283,8 +283,7 @@ public class FlumeApi {
} }
Map<String, List<FlumeApiQueryBucket>> queryBuckets = queryData.get(0); Map<String, List<FlumeApiQueryBucket>> queryBuckets = queryData.get(0);
List<FlumeApiQueryBucket> queryBucket = queryBuckets.get(query.requestId());
List<FlumeApiQueryBucket> queryBucket = queryBuckets.get(query.requestId);
return (queryBucket == null || queryBucket.isEmpty()) ? null : queryBucket.get(0).value; return (queryBucket == null || queryBucket.isEmpty()) ? null : queryBucket.get(0).value;
} }
@ -404,8 +403,8 @@ public class FlumeApi {
case 200: case 200:
break; break;
case 400: case 400:
// Flume API sense response code 400 (vs. normal 401) on invalid user credentials logger.warn("@text/api-request: {}", response);
throw new FlumeApiException("@text/api.invalid-user-credentials [\"" + response.getReason() + "\"]", throw new FlumeApiException("@text/api.bad-request [\"" + response.getReason() + "\"]",
response.getStatus(), true); response.getStatus(), true);
case 401: case 401:
throw new FlumeApiException("@text/api.invalid-user-credentials [\"" + response.getReason() + "\"]", throw new FlumeApiException("@text/api.invalid-user-credentials [\"" + response.getReason() + "\"]",

View File

@ -23,22 +23,13 @@ import com.google.gson.annotations.SerializedName;
* *
* @author Jeff James - Initial contribution * @author Jeff James - Initial contribution
*/ */
public class FlumeApiQueryWaterUsage { public record FlumeApiQueryWaterUsage( //
@SerializedName("request_id") @SerializedName("request_id") String requestId, //
public String requestId; @SerializedName("since_datetime") LocalDateTime sinceDateTime, //
@SerializedName("since_datetime") @SerializedName("until_datetime") LocalDateTime untilDateTime, //
public LocalDateTime sinceDateTime; FlumeApi.BucketType bucket, //
@SerializedName("until_datetime") @SerializedName("group_multiplier") Integer groupMultiplier, //
public LocalDateTime untilDateTime; FlumeApi.OperationType operation, //
@SerializedName("tz") FlumeApi.UnitType units, //
public String timeZone; @SerializedName("sort_direction") FlumeApi.SortDirectionType sortDirection) {
public FlumeApi.BucketType bucket;
@SerializedName("device_id")
public String[] deviceId;
@SerializedName("group_multiplier")
public Integer groupMultiplier;
public FlumeApi.OperationType operation;
public FlumeApi.UnitType units;
@SerializedName("sort_direction")
public FlumeApi.SortDirectionType sortDirection;
} }

View File

@ -138,7 +138,7 @@ public class FlumeBridgeHandler extends BaseBridgeHandler {
scheduler.execute(this::goOnline); scheduler.execute(this::goOnline);
} }
public void goOnline() { public synchronized void goOnline() {
try { try {
api.initialize(config.clientId, config.clientSecret, config.username, config.password, api.initialize(config.clientId, config.clientSecret, config.username, config.password,
this.getThing().getUID()); this.getThing().getUID());
@ -218,15 +218,13 @@ public class FlumeBridgeHandler extends BaseBridgeHandler {
*/ */
@Nullable @Nullable
public FlumeDeviceHandler getFlumeDeviceHandler(String id) { public FlumeDeviceHandler getFlumeDeviceHandler(String id) {
//@formatter:off return getThing().getThings().stream() //
return getThing().getThings().stream() .filter(t -> t.getThingTypeUID().equals(THING_TYPE_METER)) //
.filter(t -> t.getThingTypeUID().equals(THING_TYPE_METER)) .map(t -> (FlumeDeviceHandler) t.getHandler()) //
.map(t -> (FlumeDeviceHandler)t.getHandler()) .filter(Objects::nonNull) //
.filter(Objects::nonNull) .filter(h -> h.getId().equals(id)) //
.filter(h -> h.getId().equals(id)) .findFirst() //
.findFirst() .orElse(null);
.orElse(null);
//@formatter:on
} }
public void handleApiException(Exception e) { public void handleApiException(Exception e) {
@ -277,10 +275,11 @@ public class FlumeBridgeHandler extends BaseBridgeHandler {
refreshDevices(true); refreshDevices(true);
} }
//@formatter:off getThing().getThings().stream() //
getThing().getThings().stream() .map(t -> t.getHandler()) //
.forEach(t -> { if(t.getHandler() instanceof FlumeDeviceHandler handler) { handler.queryUsage(); } }); .filter(FlumeDeviceHandler.class::isInstance) //
//@formatter:on .map(FlumeDeviceHandler.class::cast) //
.forEach(FlumeDeviceHandler::queryUsage);
} }
public @Nullable String getLocaleString(String key) { public @Nullable String getLocaleString(String key) {

View File

@ -30,6 +30,8 @@ import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.measure.spi.SystemOfUnits;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.flume.internal.FlumeBridgeConfig; import org.openhab.binding.flume.internal.FlumeBridgeConfig;
@ -48,6 +50,8 @@ import org.openhab.core.library.types.OnOffType;
import org.openhab.core.library.types.QuantityType; import org.openhab.core.library.types.QuantityType;
import org.openhab.core.library.unit.ImperialUnits; import org.openhab.core.library.unit.ImperialUnits;
import org.openhab.core.library.unit.Units; import org.openhab.core.library.unit.Units;
import org.openhab.core.storage.Storage;
import org.openhab.core.storage.StorageService;
import org.openhab.core.thing.Bridge; import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ChannelUID; import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing; import org.openhab.core.thing.Thing;
@ -71,50 +75,62 @@ import org.slf4j.LoggerFactory;
public class FlumeDeviceHandler extends BaseThingHandler { public class FlumeDeviceHandler extends BaseThingHandler {
private final Logger logger = LoggerFactory.getLogger(FlumeDeviceHandler.class); private final Logger logger = LoggerFactory.getLogger(FlumeDeviceHandler.class);
// private final static String beginDateUsage = "2016-01-01 00:00:00"; private static final int HISTORICAL_YEAR_START = 2018;
private static final LocalDateTime BEGIN_DATE_USAGE = LocalDateTime.of(2016, 1, 1, 0, 0, 0); private static final String QUERY_ID_HISTORICAL_BY_YEAR = "CUMULATIVE_START_OF_YEAR";
private static final String QUERY_ID_CUMULATIVE_START_OF_YEAR = "cumulativeStartOfYear"; private static final String QUERY_ID_USAGE_FROM_BASELINE = "USAGE_FROM_BASELINE";
private static final String QUERY_ID_YEAR_TO_DATE = "usageYTD";
private ExpiringCache<FlumeApiDevice> apiDeviceCache = new ExpiringCache<FlumeApiDevice>( private ExpiringCache<FlumeApiDevice> apiDeviceCache = new ExpiringCache<FlumeApiDevice>(
Duration.ofMinutes(5).toMillis(), this::getDeviceInfo); Duration.ofMinutes(5).toMillis(), this::getDeviceInfo);
public record CumulativeStore(LocalDateTime lastUpdate, float value, FlumeApi.UnitType unit) {
}
private CumulativeStore cumulativeStore;
private FlumeDeviceConfig config = new FlumeDeviceConfig(); private FlumeDeviceConfig config = new FlumeDeviceConfig();
private float cumulativeStartOfYear = 0;
private float cumulativeUsage = 0;
private long expiryCumulativeUsage = 0; private long expiryCumulativeUsage = 0;
private Duration refreshIntervalCumulative = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_CUMULATIVE); private Duration refreshIntervalCumulative = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_CUMULATIVE_MIN);
private float instantUsage = 0; private float instantUsage = 0;
private long expiryInstantUsage = 0; private long expiryInstantUsage = 0;
private Duration refreshIntervalInstant = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_INSTANTANEOUS); private Duration refreshIntervalInstant = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_INSTANTANEOUS_MIN);
private LocalDateTime startOfYear = LocalDateTime.MIN;
private Instant lastUsageAlert = Instant.MIN; private Instant lastUsageAlert = Instant.MIN;
private static final Duration USAGE_QUERY_FETCH_INTERVAL = Duration.ofMinutes(5);
private long expiryUsageAlertFetch = 0; private long expiryUsageAlertFetch = 0;
private static final Duration USAGE_ALERT_FETCH_INTERVAL = Duration.ofMinutes(5);
public FlumeDeviceHandler(Thing thing) { private final Storage<CumulativeStore> storage;
private static final String STORAGE_KEY_CUMULATIVE_USAGE = "CumulativeUsage";
private final boolean imperialUnits;
public FlumeDeviceHandler(Thing thing, final SystemOfUnits systemOfUnits, final StorageService storageService) {
super(thing); super(thing);
this.imperialUnits = systemOfUnits instanceof ImperialUnits;
this.storage = storageService.getStorage(thing.getUID().toString(), CumulativeStore.class.getClassLoader());
CumulativeStore lCumulativeStore = storage.get(STORAGE_KEY_CUMULATIVE_USAGE);
if (lCumulativeStore == null || !lCumulativeStore.unit()
.equals(imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS)) {
lCumulativeStore = new CumulativeStore(LocalDateTime.MIN, 0,
imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS);
}
this.cumulativeStore = lCumulativeStore;
} }
@Override @Override
public void initialize() { public void initialize() {
config = getConfigAs(FlumeDeviceConfig.class); config = getConfigAs(FlumeDeviceConfig.class);
updateStatus(ThingStatus.UNKNOWN); updateStatus(ThingStatus.UNKNOWN);
scheduler.execute(this::goOnline); scheduler.execute(this::goOnline);
} }
public void goOnline() { public synchronized void goOnline() {
if (this.getThing().getStatus() == ThingStatus.ONLINE) { if (this.getThing().getStatus() == ThingStatus.ONLINE) {
return; return;
} }
FlumeBridgeHandler bh = getBridgeHandler(); FlumeBridgeHandler bh = getBridgeHandler();
if (bh == null) { if (bh == null) {
@ -133,15 +149,13 @@ public class FlumeDeviceHandler extends BaseThingHandler {
refreshIntervalCumulative = Duration.ofMinutes(bridgeConfig.refreshIntervalCumulative); refreshIntervalCumulative = Duration.ofMinutes(bridgeConfig.refreshIntervalCumulative);
refreshIntervalInstant = Duration.ofMinutes(bridgeConfig.refreshIntervalInstantaneous); refreshIntervalInstant = Duration.ofMinutes(bridgeConfig.refreshIntervalInstantaneous);
// always update the startOfYear number;
startOfYear = LocalDateTime.MIN;
FlumeApiDevice apiDevice = apiDeviceCache.getValue(); FlumeApiDevice apiDevice = apiDeviceCache.getValue();
if (apiDevice != null) { if (apiDevice != null) {
updateDeviceInfo(apiDevice); updateDeviceInfo(apiDevice);
} }
try { try {
tryFetchUsageAlerts(true);
tryQueryUsage(true); tryQueryUsage(true);
tryGetCurrentFlowRate(true); tryGetCurrentFlowRate(true);
} catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) { } catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) {
@ -173,6 +187,10 @@ public class FlumeDeviceHandler extends BaseThingHandler {
return config.id; return config.id;
} }
public boolean isImperial() {
return imperialUnits;
}
public void updateDeviceChannel(@Nullable FlumeApiDevice apiDevice, String channelUID) { public void updateDeviceChannel(@Nullable FlumeApiDevice apiDevice, String channelUID) {
final Map<String, Integer> mapBatteryLevel = Map.of("low", 25, "medium", 50, "high", 100); final Map<String, Integer> mapBatteryLevel = Map.of("low", 25, "medium", 50, "high", 100);
if (apiDevice == null) { if (apiDevice == null) {
@ -224,66 +242,57 @@ public class FlumeDeviceHandler extends BaseThingHandler {
} }
/** /**
* Pools together several usage queries based on whether the value is expired and whether a channel is linked. Also, * {@link tryUpdateCumulativeStore} will query annual usage from HISTORICAL_YEAR_START to the end of last year.
* if necessary will update the usage from beginning to start of year so subsequent cumulative queries only need to * Flume
* ytd. Will update the values in the ExpiringCache as necessary. * limits queries to a duration of 1 year, so these must be broken up to individual queries. The result is stored
* * so that subsequent startups don't have to requery.
* @throws FlumeApiException
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
* @throws ExecutionException
*/ */
protected void tryQueryUsage(boolean forceUpdate) protected void tryUpdateCumulativeStore()
throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException { throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException {
@Nullable @Nullable
List<HashMap<String, List<FlumeApiQueryBucket>>> result; List<HashMap<String, List<FlumeApiQueryBucket>>> result;
List<FlumeApiQueryWaterUsage> listQuery = new ArrayList<FlumeApiQueryWaterUsage>();
boolean imperialUnits = isImperial();
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
List<FlumeApiQueryWaterUsage> listQuery = new ArrayList<FlumeApiQueryWaterUsage>(); // Get sum of all historical full year readings only when binding starts or its the start of a new year
// This is to reduce the time it takes on the periodic queries.
// Get sum of all historical readings only when binding starts or its the start of a new year boolean updateBaselineYears = cumulativeStore.lastUpdate() == LocalDateTime.MIN
// This is to reduce the time it takes on the periodic queries || Duration.between(now, cumulativeStore.lastUpdate()).toDays() > 365;
if (startOfYear.equals(LocalDateTime.MIN) || (now.getYear() != startOfYear.getYear())) { if (updateBaselineYears) {
FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(); for (int year = HISTORICAL_YEAR_START; year < now.getYear(); year++) {
listQuery.add(new FlumeApiQueryWaterUsage( //
query.bucket = FlumeApi.BucketType.YR; QUERY_ID_HISTORICAL_BY_YEAR + year, //
query.sinceDateTime = BEGIN_DATE_USAGE; LocalDateTime.of(year, 1, 1, 0, 0, 0), //
query.untilDateTime = now.minusYears(1); LocalDateTime.of(year, 12, 31, 23, 59, 59), //
query.groupMultiplier = 100; FlumeApi.BucketType.YR, //
query.operation = FlumeApi.OperationType.SUM; 1, //
query.requestId = QUERY_ID_CUMULATIVE_START_OF_YEAR; FlumeApi.OperationType.SUM, //
query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, //
FlumeApi.SortDirectionType.ASC //
listQuery.add(query); ));
}
} }
if (System.nanoTime() > this.expiryUsageAlertFetch) { // Get the total usage for complete months since lastUpdate OR beginning of the year
fetchUsageAlerts(); // (note, flume returns the full month usage regardless of the time, hence the need to query a full complete
this.expiryUsageAlertFetch = System.nanoTime() + USAGE_QUERY_FETCH_INTERVAL.toNanos(); // month
} LocalDateTime fromDateTime = (updateBaselineYears) ? LocalDateTime.of(now.getYear(), 1, 1, 0, 0, 0)
: cumulativeStore.lastUpdate();
LocalDateTime toDateTime = now.withDayOfMonth(1).minusDays(1);
if (this.isLinked(CHANNEL_DEVICE_CUMULATIVEUSAGE) // Add query for current year up to end of last month. Note, the max multiplier is 100, so we have to use Months
&& ((System.nanoTime() > this.expiryCumulativeUsage) || forceUpdate)) { // vs. Days as the
FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(); // minimum granularity since if Days was used the query could be for a total of 365 days
listQuery.add(new FlumeApiQueryWaterUsage( //
query.bucket = FlumeApi.BucketType.DAY; QUERY_ID_HISTORICAL_BY_YEAR + "MONTH", //
query.untilDateTime = now; fromDateTime, //
query.sinceDateTime = now.withDayOfYear(1); toDateTime, //
query.groupMultiplier = 400; FlumeApi.BucketType.MON, //
query.operation = FlumeApi.OperationType.SUM; 99, // max of 100
query.requestId = QUERY_ID_YEAR_TO_DATE; FlumeApi.OperationType.SUM, //
query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, //
FlumeApi.SortDirectionType.ASC //
listQuery.add(query); ));
}
if (listQuery.isEmpty()) {
return;
}
result = getApi().queryUsage(config.id, listQuery); result = getApi().queryUsage(config.id, listQuery);
@ -294,21 +303,61 @@ public class FlumeDeviceHandler extends BaseThingHandler {
} }
Map<String, List<FlumeApiQueryBucket>> queryData = result.get(0); Map<String, List<FlumeApiQueryBucket>> queryData = result.get(0);
List<FlumeApiQueryBucket> queryBuckets; float cumulativeUsage = (float) queryData.values().stream().map((bucket) -> bucket.get(0).value)
.mapToDouble(Float::doubleValue).sum();
queryBuckets = queryData.get(QUERY_ID_CUMULATIVE_START_OF_YEAR); this.cumulativeStore = new CumulativeStore(LocalDateTime.of(now.getYear(), now.getMonth(), 1, 0, 0, 0),
if (queryBuckets != null) { cumulativeUsage, imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS);
cumulativeStartOfYear = queryBuckets.get(0).value; storage.put(STORAGE_KEY_CUMULATIVE_USAGE, cumulativeStore);
startOfYear = now.withDayOfYear(1); }
/**
* Queries usage from baseline to now if expired and channel is linked. Will update the falues in ExpiringCache as
* necessary
*
* @throws FlumeApiException
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
* @throws ExecutionException
*/
protected void tryQueryUsage(boolean forceUpdate)
throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException {
if (System.nanoTime() <= this.expiryCumulativeUsage && !forceUpdate) {
return;
} }
queryBuckets = queryData.get(QUERY_ID_YEAR_TO_DATE); LocalDateTime now = LocalDateTime.now();
if (queryBuckets != null) { // update cumulativeStore if no storage exists (first time), or we are at the start of a new month
cumulativeUsage = queryBuckets.get(0).value + cumulativeStartOfYear; if (cumulativeStore.lastUpdate() == LocalDateTime.MIN
updateState(CHANNEL_DEVICE_CUMULATIVEUSAGE, || LocalDateTime.of(now.getYear(), now.getMonth(), 1, 0, 0, 0).isAfter(cumulativeStore.lastUpdate())) {
new QuantityType<>(cumulativeUsage, imperialUnits ? ImperialUnits.GALLON_LIQUID_US : Units.LITRE)); tryUpdateCumulativeStore();
this.expiryCumulativeUsage = System.nanoTime() + this.refreshIntervalCumulative.toNanos();
} }
if (!this.isLinked(CHANNEL_DEVICE_CUMULATIVEUSAGE)) {
return;
}
Float result = getApi().queryUsage(config.id, //
new FlumeApiQueryWaterUsage(QUERY_ID_USAGE_FROM_BASELINE, //
cumulativeStore.lastUpdate(), //
now, //
FlumeApi.BucketType.MON, //
1, //
FlumeApi.OperationType.SUM, //
imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, //
FlumeApi.SortDirectionType.ASC //
));
if (result == null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
"@text/offline.cloud-connection-issue");
return;
}
updateState(CHANNEL_DEVICE_CUMULATIVEUSAGE, new QuantityType<>(cumulativeStore.value() + result,
imperialUnits ? ImperialUnits.GALLON_LIQUID_US : Units.LITRE));
this.expiryCumulativeUsage = System.nanoTime() + this.refreshIntervalCumulative.toNanos();
} }
protected void tryGetCurrentFlowRate(boolean forceUpdate) protected void tryGetCurrentFlowRate(boolean forceUpdate)
@ -354,33 +403,27 @@ public class FlumeDeviceHandler extends BaseThingHandler {
} }
try { try {
tryFetchUsageAlerts(false);
tryQueryUsage(false); tryQueryUsage(false);
tryGetCurrentFlowRate(false); tryGetCurrentFlowRate(false);
} catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) { } catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) {
this.handleApiException(e); this.handleApiException(e);
return; return;
} }
if (System.nanoTime() > this.expiryUsageAlertFetch) {
fetchUsageAlerts();
this.expiryUsageAlertFetch = System.nanoTime() + USAGE_QUERY_FETCH_INTERVAL.toNanos();
}
} }
public void fetchUsageAlerts() { public void tryFetchUsageAlerts(boolean forceUpdate)
throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException {
List<FlumeApiUsageAlert> resultList; List<FlumeApiUsageAlert> resultList;
FlumeApiUsageAlert alert; FlumeApiUsageAlert alert;
FlumeApiQueryWaterUsage query; FlumeApiQueryWaterUsage query;
boolean imperialUnits = isImperial(); if (System.nanoTime() <= this.expiryUsageAlertFetch || !forceUpdate) {
try {
resultList = getApi().fetchUsageAlerts(config.id, 1);
} catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) {
this.handleApiException(e);
return; return;
} }
resultList = getApi().fetchUsageAlerts(config.id, 1);
if (resultList.isEmpty()) { if (resultList.isEmpty()) {
return; return;
} }
@ -400,10 +443,17 @@ public class FlumeDeviceHandler extends BaseThingHandler {
return; return;
} }
query = alert.query; query = new FlumeApiQueryWaterUsage( //
query.bucket = FlumeApi.BucketType.MIN; alert.query.requestId(), //
query.operation = FlumeApi.OperationType.AVG; alert.query.sinceDateTime(), //
query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; alert.query.untilDateTime(), //
alert.query.bucket(), //
alert.query.groupMultiplier(), //
FlumeApi.OperationType.AVG, //
imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, //
alert.query.sortDirection() //
);
logger.debug("Alert query: {}", query);
Float avgUsage; Float avgUsage;
try { try {
@ -412,14 +462,16 @@ public class FlumeDeviceHandler extends BaseThingHandler {
this.handleApiException(e); this.handleApiException(e);
return; return;
} }
long minutes = Duration.between(query.sinceDateTime, query.untilDateTime).toMinutes(); long minutes = Duration.between(query.sinceDateTime(), query.untilDateTime()).toMinutes();
LocalDateTime localWhenTriggered = LocalDateTime.ofInstant(alert.triggeredDateTime, ZoneId.systemDefault()); LocalDateTime localWhenTriggered = LocalDateTime.ofInstant(alert.triggeredDateTime, ZoneId.systemDefault());
String stringAlert = String.format(stringAlertFormat, alert.eventRuleName, localWhenTriggered.toString(), String stringAlert = String.format(stringAlertFormat, alert.eventRuleName, localWhenTriggered.toString(),
minutes, avgUsage, imperialUnits ? "gallons" : "liters"); minutes, avgUsage, imperialUnits ? "gallons" : "liters");
logger.debug("Alert: {}", stringAlert);
triggerChannel(CHANNEL_DEVICE_USAGEALERT, stringAlert); triggerChannel(CHANNEL_DEVICE_USAGEALERT, stringAlert);
this.expiryUsageAlertFetch = System.nanoTime() + USAGE_ALERT_FETCH_INTERVAL.toNanos();
} }
@Override @Override
@ -466,10 +518,6 @@ public class FlumeDeviceHandler extends BaseThingHandler {
updateDeviceChannel(apiDevice, CHANNEL_DEVICE_LASTSEEN); updateDeviceChannel(apiDevice, CHANNEL_DEVICE_LASTSEEN);
} }
public boolean isImperial() {
return Objects.requireNonNull(getBridgeHandler()).systemOfUnits instanceof ImperialUnits;
}
public @Nullable FlumeBridgeHandler getBridgeHandler() { public @Nullable FlumeBridgeHandler getBridgeHandler() {
Bridge bridge = this.getBridge(); Bridge bridge = this.getBridge();
if (bridge == null) { if (bridge == null) {