diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBindingConstants.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBindingConstants.java index 998a1875bcf..9f323cb639a 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBindingConstants.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBindingConstants.java @@ -49,6 +49,6 @@ public class FlumeBindingConstants { // Properties public static final String PROPERTY_ID = "id"; - public static final int DEFAULT_POLLING_INTERVAL_INSTANTANEOUS = 1; - public static final int DEFAULT_POLLING_INTERVAL_CUMULATIVE = 5; + public static final int DEFAULT_POLLING_INTERVAL_INSTANTANEOUS_MIN = 1; + public static final int DEFAULT_POLLING_INTERVAL_CUMULATIVE_MIN = 5; } diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBridgeConfig.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBridgeConfig.java index 80f1d0f2129..c3301b4df60 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBridgeConfig.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeBridgeConfig.java @@ -28,6 +28,6 @@ public class FlumeBridgeConfig { public String username = ""; public String password = ""; - public int refreshIntervalInstantaneous = DEFAULT_POLLING_INTERVAL_INSTANTANEOUS; - public int refreshIntervalCumulative = DEFAULT_POLLING_INTERVAL_CUMULATIVE; + public int refreshIntervalInstantaneous = DEFAULT_POLLING_INTERVAL_INSTANTANEOUS_MIN; + public int refreshIntervalCumulative = DEFAULT_POLLING_INTERVAL_CUMULATIVE_MIN; } diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeHandlerFactory.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeHandlerFactory.java index 9694317af16..efde505eb6f 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeHandlerFactory.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/FlumeHandlerFactory.java @@ -26,6 +26,7 @@ import org.openhab.core.i18n.LocaleProvider; import org.openhab.core.i18n.TranslationProvider; import org.openhab.core.i18n.UnitProvider; import org.openhab.core.io.net.http.HttpClientFactory; +import org.openhab.core.storage.StorageService; import org.openhab.core.thing.Bridge; import org.openhab.core.thing.Thing; import org.openhab.core.thing.ThingTypeUID; @@ -50,15 +51,18 @@ public class FlumeHandlerFactory extends BaseThingHandlerFactory { private final HttpClientFactory httpClientFactory; private final TranslationProvider i18nProvider; private final LocaleProvider localeProvider; + private final StorageService storageService; public final SystemOfUnits systemOfUnits; @Activate public FlumeHandlerFactory(@Reference UnitProvider unitProvider, @Reference HttpClientFactory httpClientFactory, - final @Reference TranslationProvider i18nProvider, final @Reference LocaleProvider localeProvider) { + final @Reference TranslationProvider i18nProvider, final @Reference LocaleProvider localeProvider, + final @Reference StorageService storageService) { this.systemOfUnits = unitProvider.getMeasurementSystem(); this.httpClientFactory = httpClientFactory; this.i18nProvider = i18nProvider; this.localeProvider = localeProvider; + this.storageService = storageService; } @Override @@ -74,7 +78,7 @@ public class FlumeHandlerFactory extends BaseThingHandlerFactory { return new FlumeBridgeHandler((Bridge) thing, systemOfUnits, this.httpClientFactory.getCommonHttpClient(), i18nProvider, localeProvider); } else if (THING_TYPE_METER.equals(thingTypeUID)) { - return new FlumeDeviceHandler(thing); + return new FlumeDeviceHandler(thing, systemOfUnits, storageService); } return null; diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/actions/FlumeDeviceActions.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/actions/FlumeDeviceActions.java index 6dcd7b11719..dce4e1d443b 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/actions/FlumeDeviceActions.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/actions/FlumeDeviceActions.java @@ -76,8 +76,6 @@ public class FlumeDeviceActions implements ThingActions { @ActionInput(name = "operation", label = "Operation", required = true, description = "The aggregate/accumulate operation to perform (SUM, AVG, MIN, MAX, CNT).") @Nullable String operation) { logger.info("queryWaterUsage called"); - FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(); - FlumeDeviceHandler localDeviceHandler = deviceHandler; if (localDeviceHandler == null) { logger.debug("querying device usage, but device is undefined."); @@ -90,31 +88,26 @@ public class FlumeDeviceActions implements ThingActions { logger.warn("queryWaterUsage called with null inputs"); return null; } - - if (!FlumeApi.OperationType.contains(operation)) { - logger.warn("Invalid aggregation operation in call to queryWaterUsage"); - return null; - } else { - query.operation = FlumeApi.OperationType.valueOf(operation); - } - - if (!FlumeApi.BucketType.contains(bucket)) { - logger.warn("Invalid bucket type in call to queryWaterUsage"); - return null; - } else { - query.bucket = FlumeApi.BucketType.valueOf(bucket); - } - - if (untilDateTime.isBefore(sinceDateTime)) { + if (!untilDateTime.isAfter(sinceDateTime)) { logger.warn("sinceDateTime must be earlier than untilDateTime"); return null; } + if (!FlumeApi.OperationType.contains(operation)) { + logger.warn("Invalid aggregation operation in call to queryWaterUsage"); + return null; + } + if (!FlumeApi.BucketType.contains(bucket)) { + logger.warn("Invalid bucket type in call to queryWaterUsage"); + return null; + } - query.requestId = QUERYID; - query.sinceDateTime = sinceDateTime; - query.untilDateTime = untilDateTime; - query.bucket = FlumeApi.BucketType.valueOf(bucket); - query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; + FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(QUERYID, // + sinceDateTime, // + untilDateTime, // + FlumeApi.BucketType.valueOf(bucket), // + 100, // + FlumeApi.OperationType.valueOf(operation), // + imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, FlumeApi.SortDirectionType.ASC); Float usage; try { diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/FlumeApi.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/FlumeApi.java index cd8d1355fb1..eb1b6ff9cc1 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/FlumeApi.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/FlumeApi.java @@ -283,8 +283,7 @@ public class FlumeApi { } Map> queryBuckets = queryData.get(0); - - List queryBucket = queryBuckets.get(query.requestId); + List queryBucket = queryBuckets.get(query.requestId()); return (queryBucket == null || queryBucket.isEmpty()) ? null : queryBucket.get(0).value; } @@ -404,8 +403,8 @@ public class FlumeApi { case 200: break; case 400: - // Flume API sense response code 400 (vs. normal 401) on invalid user credentials - throw new FlumeApiException("@text/api.invalid-user-credentials [\"" + response.getReason() + "\"]", + logger.warn("@text/api-request: {}", response); + throw new FlumeApiException("@text/api.bad-request [\"" + response.getReason() + "\"]", response.getStatus(), true); case 401: throw new FlumeApiException("@text/api.invalid-user-credentials [\"" + response.getReason() + "\"]", diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/dto/FlumeApiQueryWaterUsage.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/dto/FlumeApiQueryWaterUsage.java index fe456ba3bb7..0451cd2a1e9 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/dto/FlumeApiQueryWaterUsage.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/api/dto/FlumeApiQueryWaterUsage.java @@ -23,22 +23,13 @@ import com.google.gson.annotations.SerializedName; * * @author Jeff James - Initial contribution */ -public class FlumeApiQueryWaterUsage { - @SerializedName("request_id") - public String requestId; - @SerializedName("since_datetime") - public LocalDateTime sinceDateTime; - @SerializedName("until_datetime") - public LocalDateTime untilDateTime; - @SerializedName("tz") - public String timeZone; - public FlumeApi.BucketType bucket; - @SerializedName("device_id") - public String[] deviceId; - @SerializedName("group_multiplier") - public Integer groupMultiplier; - public FlumeApi.OperationType operation; - public FlumeApi.UnitType units; - @SerializedName("sort_direction") - public FlumeApi.SortDirectionType sortDirection; +public record FlumeApiQueryWaterUsage( // + @SerializedName("request_id") String requestId, // + @SerializedName("since_datetime") LocalDateTime sinceDateTime, // + @SerializedName("until_datetime") LocalDateTime untilDateTime, // + FlumeApi.BucketType bucket, // + @SerializedName("group_multiplier") Integer groupMultiplier, // + FlumeApi.OperationType operation, // + FlumeApi.UnitType units, // + @SerializedName("sort_direction") FlumeApi.SortDirectionType sortDirection) { } diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeBridgeHandler.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeBridgeHandler.java index cc7a972975d..17bfb4b83e9 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeBridgeHandler.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeBridgeHandler.java @@ -138,7 +138,7 @@ public class FlumeBridgeHandler extends BaseBridgeHandler { scheduler.execute(this::goOnline); } - public void goOnline() { + public synchronized void goOnline() { try { api.initialize(config.clientId, config.clientSecret, config.username, config.password, this.getThing().getUID()); @@ -218,15 +218,13 @@ public class FlumeBridgeHandler extends BaseBridgeHandler { */ @Nullable public FlumeDeviceHandler getFlumeDeviceHandler(String id) { - //@formatter:off - return getThing().getThings().stream() - .filter(t -> t.getThingTypeUID().equals(THING_TYPE_METER)) - .map(t -> (FlumeDeviceHandler)t.getHandler()) - .filter(Objects::nonNull) - .filter(h -> h.getId().equals(id)) - .findFirst() - .orElse(null); - //@formatter:on + return getThing().getThings().stream() // + .filter(t -> t.getThingTypeUID().equals(THING_TYPE_METER)) // + .map(t -> (FlumeDeviceHandler) t.getHandler()) // + .filter(Objects::nonNull) // + .filter(h -> h.getId().equals(id)) // + .findFirst() // + .orElse(null); } public void handleApiException(Exception e) { @@ -277,10 +275,11 @@ public class FlumeBridgeHandler extends BaseBridgeHandler { refreshDevices(true); } - //@formatter:off - getThing().getThings().stream() - .forEach(t -> { if(t.getHandler() instanceof FlumeDeviceHandler handler) { handler.queryUsage(); } }); - //@formatter:on + getThing().getThings().stream() // + .map(t -> t.getHandler()) // + .filter(FlumeDeviceHandler.class::isInstance) // + .map(FlumeDeviceHandler.class::cast) // + .forEach(FlumeDeviceHandler::queryUsage); } public @Nullable String getLocaleString(String key) { diff --git a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeDeviceHandler.java b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeDeviceHandler.java index 38d1e1fbbf1..b8fd218acce 100644 --- a/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeDeviceHandler.java +++ b/bundles/org.openhab.binding.flume/src/main/java/org/openhab/binding/flume/internal/handler/FlumeDeviceHandler.java @@ -30,6 +30,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import javax.measure.spi.SystemOfUnits; + import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.flume.internal.FlumeBridgeConfig; @@ -48,6 +50,8 @@ import org.openhab.core.library.types.OnOffType; import org.openhab.core.library.types.QuantityType; import org.openhab.core.library.unit.ImperialUnits; import org.openhab.core.library.unit.Units; +import org.openhab.core.storage.Storage; +import org.openhab.core.storage.StorageService; import org.openhab.core.thing.Bridge; import org.openhab.core.thing.ChannelUID; import org.openhab.core.thing.Thing; @@ -71,50 +75,62 @@ import org.slf4j.LoggerFactory; public class FlumeDeviceHandler extends BaseThingHandler { private final Logger logger = LoggerFactory.getLogger(FlumeDeviceHandler.class); - // private final static String beginDateUsage = "2016-01-01 00:00:00"; - private static final LocalDateTime BEGIN_DATE_USAGE = LocalDateTime.of(2016, 1, 1, 0, 0, 0); - private static final String QUERY_ID_CUMULATIVE_START_OF_YEAR = "cumulativeStartOfYear"; - private static final String QUERY_ID_YEAR_TO_DATE = "usageYTD"; + private static final int HISTORICAL_YEAR_START = 2018; + private static final String QUERY_ID_HISTORICAL_BY_YEAR = "CUMULATIVE_START_OF_YEAR"; + private static final String QUERY_ID_USAGE_FROM_BASELINE = "USAGE_FROM_BASELINE"; private ExpiringCache apiDeviceCache = new ExpiringCache( Duration.ofMinutes(5).toMillis(), this::getDeviceInfo); + public record CumulativeStore(LocalDateTime lastUpdate, float value, FlumeApi.UnitType unit) { + } + + private CumulativeStore cumulativeStore; + private FlumeDeviceConfig config = new FlumeDeviceConfig(); - private float cumulativeStartOfYear = 0; - - private float cumulativeUsage = 0; private long expiryCumulativeUsage = 0; - private Duration refreshIntervalCumulative = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_CUMULATIVE); + private Duration refreshIntervalCumulative = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_CUMULATIVE_MIN); private float instantUsage = 0; private long expiryInstantUsage = 0; - private Duration refreshIntervalInstant = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_INSTANTANEOUS); - - private LocalDateTime startOfYear = LocalDateTime.MIN; + private Duration refreshIntervalInstant = Duration.ofMinutes(DEFAULT_POLLING_INTERVAL_INSTANTANEOUS_MIN); private Instant lastUsageAlert = Instant.MIN; - private static final Duration USAGE_QUERY_FETCH_INTERVAL = Duration.ofMinutes(5); private long expiryUsageAlertFetch = 0; + private static final Duration USAGE_ALERT_FETCH_INTERVAL = Duration.ofMinutes(5); - public FlumeDeviceHandler(Thing thing) { + private final Storage storage; + private static final String STORAGE_KEY_CUMULATIVE_USAGE = "CumulativeUsage"; + private final boolean imperialUnits; + + public FlumeDeviceHandler(Thing thing, final SystemOfUnits systemOfUnits, final StorageService storageService) { super(thing); + this.imperialUnits = systemOfUnits instanceof ImperialUnits; + this.storage = storageService.getStorage(thing.getUID().toString(), CumulativeStore.class.getClassLoader()); + + CumulativeStore lCumulativeStore = storage.get(STORAGE_KEY_CUMULATIVE_USAGE); + + if (lCumulativeStore == null || !lCumulativeStore.unit() + .equals(imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS)) { + lCumulativeStore = new CumulativeStore(LocalDateTime.MIN, 0, + imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS); + } + this.cumulativeStore = lCumulativeStore; } @Override public void initialize() { config = getConfigAs(FlumeDeviceConfig.class); - updateStatus(ThingStatus.UNKNOWN); scheduler.execute(this::goOnline); } - public void goOnline() { + public synchronized void goOnline() { if (this.getThing().getStatus() == ThingStatus.ONLINE) { return; } - FlumeBridgeHandler bh = getBridgeHandler(); if (bh == null) { @@ -133,15 +149,13 @@ public class FlumeDeviceHandler extends BaseThingHandler { refreshIntervalCumulative = Duration.ofMinutes(bridgeConfig.refreshIntervalCumulative); refreshIntervalInstant = Duration.ofMinutes(bridgeConfig.refreshIntervalInstantaneous); - // always update the startOfYear number; - startOfYear = LocalDateTime.MIN; - FlumeApiDevice apiDevice = apiDeviceCache.getValue(); if (apiDevice != null) { updateDeviceInfo(apiDevice); } try { + tryFetchUsageAlerts(true); tryQueryUsage(true); tryGetCurrentFlowRate(true); } catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) { @@ -173,6 +187,10 @@ public class FlumeDeviceHandler extends BaseThingHandler { return config.id; } + public boolean isImperial() { + return imperialUnits; + } + public void updateDeviceChannel(@Nullable FlumeApiDevice apiDevice, String channelUID) { final Map mapBatteryLevel = Map.of("low", 25, "medium", 50, "high", 100); if (apiDevice == null) { @@ -224,66 +242,57 @@ public class FlumeDeviceHandler extends BaseThingHandler { } /** - * Pools together several usage queries based on whether the value is expired and whether a channel is linked. Also, - * if necessary will update the usage from beginning to start of year so subsequent cumulative queries only need to - * ytd. Will update the values in the ExpiringCache as necessary. - * - * @throws FlumeApiException - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - * @throws ExecutionException + * {@link tryUpdateCumulativeStore} will query annual usage from HISTORICAL_YEAR_START to the end of last year. + * Flume + * limits queries to a duration of 1 year, so these must be broken up to individual queries. The result is stored + * so that subsequent startups don't have to requery. */ - protected void tryQueryUsage(boolean forceUpdate) + protected void tryUpdateCumulativeStore() throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException { @Nullable List>> result; - - boolean imperialUnits = isImperial(); - + List listQuery = new ArrayList(); LocalDateTime now = LocalDateTime.now(); - List listQuery = new ArrayList(); - - // Get sum of all historical readings only when binding starts or its the start of a new year - // This is to reduce the time it takes on the periodic queries - if (startOfYear.equals(LocalDateTime.MIN) || (now.getYear() != startOfYear.getYear())) { - FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(); - - query.bucket = FlumeApi.BucketType.YR; - query.sinceDateTime = BEGIN_DATE_USAGE; - query.untilDateTime = now.minusYears(1); - query.groupMultiplier = 100; - query.operation = FlumeApi.OperationType.SUM; - query.requestId = QUERY_ID_CUMULATIVE_START_OF_YEAR; - query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; - - listQuery.add(query); + // Get sum of all historical full year readings only when binding starts or its the start of a new year + // This is to reduce the time it takes on the periodic queries. + boolean updateBaselineYears = cumulativeStore.lastUpdate() == LocalDateTime.MIN + || Duration.between(now, cumulativeStore.lastUpdate()).toDays() > 365; + if (updateBaselineYears) { + for (int year = HISTORICAL_YEAR_START; year < now.getYear(); year++) { + listQuery.add(new FlumeApiQueryWaterUsage( // + QUERY_ID_HISTORICAL_BY_YEAR + year, // + LocalDateTime.of(year, 1, 1, 0, 0, 0), // + LocalDateTime.of(year, 12, 31, 23, 59, 59), // + FlumeApi.BucketType.YR, // + 1, // + FlumeApi.OperationType.SUM, // + imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, // + FlumeApi.SortDirectionType.ASC // + )); + } } - if (System.nanoTime() > this.expiryUsageAlertFetch) { - fetchUsageAlerts(); - this.expiryUsageAlertFetch = System.nanoTime() + USAGE_QUERY_FETCH_INTERVAL.toNanos(); - } + // Get the total usage for complete months since lastUpdate OR beginning of the year + // (note, flume returns the full month usage regardless of the time, hence the need to query a full complete + // month + LocalDateTime fromDateTime = (updateBaselineYears) ? LocalDateTime.of(now.getYear(), 1, 1, 0, 0, 0) + : cumulativeStore.lastUpdate(); + LocalDateTime toDateTime = now.withDayOfMonth(1).minusDays(1); - if (this.isLinked(CHANNEL_DEVICE_CUMULATIVEUSAGE) - && ((System.nanoTime() > this.expiryCumulativeUsage) || forceUpdate)) { - FlumeApiQueryWaterUsage query = new FlumeApiQueryWaterUsage(); - - query.bucket = FlumeApi.BucketType.DAY; - query.untilDateTime = now; - query.sinceDateTime = now.withDayOfYear(1); - query.groupMultiplier = 400; - query.operation = FlumeApi.OperationType.SUM; - query.requestId = QUERY_ID_YEAR_TO_DATE; - query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; - - listQuery.add(query); - } - - if (listQuery.isEmpty()) { - return; - } + // Add query for current year up to end of last month. Note, the max multiplier is 100, so we have to use Months + // vs. Days as the + // minimum granularity since if Days was used the query could be for a total of 365 days + listQuery.add(new FlumeApiQueryWaterUsage( // + QUERY_ID_HISTORICAL_BY_YEAR + "MONTH", // + fromDateTime, // + toDateTime, // + FlumeApi.BucketType.MON, // + 99, // max of 100 + FlumeApi.OperationType.SUM, // + imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, // + FlumeApi.SortDirectionType.ASC // + )); result = getApi().queryUsage(config.id, listQuery); @@ -294,21 +303,61 @@ public class FlumeDeviceHandler extends BaseThingHandler { } Map> queryData = result.get(0); - List queryBuckets; + float cumulativeUsage = (float) queryData.values().stream().map((bucket) -> bucket.get(0).value) + .mapToDouble(Float::doubleValue).sum(); - queryBuckets = queryData.get(QUERY_ID_CUMULATIVE_START_OF_YEAR); - if (queryBuckets != null) { - cumulativeStartOfYear = queryBuckets.get(0).value; - startOfYear = now.withDayOfYear(1); + this.cumulativeStore = new CumulativeStore(LocalDateTime.of(now.getYear(), now.getMonth(), 1, 0, 0, 0), + cumulativeUsage, imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS); + storage.put(STORAGE_KEY_CUMULATIVE_USAGE, cumulativeStore); + } + + /** + * Queries usage from baseline to now if expired and channel is linked. Will update the falues in ExpiringCache as + * necessary + * + * @throws FlumeApiException + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + * @throws ExecutionException + */ + protected void tryQueryUsage(boolean forceUpdate) + throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException { + if (System.nanoTime() <= this.expiryCumulativeUsage && !forceUpdate) { + return; } - queryBuckets = queryData.get(QUERY_ID_YEAR_TO_DATE); - if (queryBuckets != null) { - cumulativeUsage = queryBuckets.get(0).value + cumulativeStartOfYear; - updateState(CHANNEL_DEVICE_CUMULATIVEUSAGE, - new QuantityType<>(cumulativeUsage, imperialUnits ? ImperialUnits.GALLON_LIQUID_US : Units.LITRE)); - this.expiryCumulativeUsage = System.nanoTime() + this.refreshIntervalCumulative.toNanos(); + LocalDateTime now = LocalDateTime.now(); + // update cumulativeStore if no storage exists (first time), or we are at the start of a new month + if (cumulativeStore.lastUpdate() == LocalDateTime.MIN + || LocalDateTime.of(now.getYear(), now.getMonth(), 1, 0, 0, 0).isAfter(cumulativeStore.lastUpdate())) { + tryUpdateCumulativeStore(); } + + if (!this.isLinked(CHANNEL_DEVICE_CUMULATIVEUSAGE)) { + return; + } + + Float result = getApi().queryUsage(config.id, // + new FlumeApiQueryWaterUsage(QUERY_ID_USAGE_FROM_BASELINE, // + cumulativeStore.lastUpdate(), // + now, // + FlumeApi.BucketType.MON, // + 1, // + FlumeApi.OperationType.SUM, // + imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, // + FlumeApi.SortDirectionType.ASC // + )); + + if (result == null) { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, + "@text/offline.cloud-connection-issue"); + return; + } + + updateState(CHANNEL_DEVICE_CUMULATIVEUSAGE, new QuantityType<>(cumulativeStore.value() + result, + imperialUnits ? ImperialUnits.GALLON_LIQUID_US : Units.LITRE)); + this.expiryCumulativeUsage = System.nanoTime() + this.refreshIntervalCumulative.toNanos(); } protected void tryGetCurrentFlowRate(boolean forceUpdate) @@ -354,33 +403,27 @@ public class FlumeDeviceHandler extends BaseThingHandler { } try { + tryFetchUsageAlerts(false); tryQueryUsage(false); tryGetCurrentFlowRate(false); } catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) { this.handleApiException(e); return; } - - if (System.nanoTime() > this.expiryUsageAlertFetch) { - fetchUsageAlerts(); - this.expiryUsageAlertFetch = System.nanoTime() + USAGE_QUERY_FETCH_INTERVAL.toNanos(); - } } - public void fetchUsageAlerts() { + public void tryFetchUsageAlerts(boolean forceUpdate) + throws FlumeApiException, IOException, InterruptedException, TimeoutException, ExecutionException { List resultList; FlumeApiUsageAlert alert; FlumeApiQueryWaterUsage query; - boolean imperialUnits = isImperial(); - - try { - resultList = getApi().fetchUsageAlerts(config.id, 1); - } catch (FlumeApiException | IOException | InterruptedException | TimeoutException | ExecutionException e) { - this.handleApiException(e); + if (System.nanoTime() <= this.expiryUsageAlertFetch || !forceUpdate) { return; } + resultList = getApi().fetchUsageAlerts(config.id, 1); + if (resultList.isEmpty()) { return; } @@ -400,10 +443,17 @@ public class FlumeDeviceHandler extends BaseThingHandler { return; } - query = alert.query; - query.bucket = FlumeApi.BucketType.MIN; - query.operation = FlumeApi.OperationType.AVG; - query.units = imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS; + query = new FlumeApiQueryWaterUsage( // + alert.query.requestId(), // + alert.query.sinceDateTime(), // + alert.query.untilDateTime(), // + alert.query.bucket(), // + alert.query.groupMultiplier(), // + FlumeApi.OperationType.AVG, // + imperialUnits ? FlumeApi.UnitType.GALLONS : FlumeApi.UnitType.LITERS, // + alert.query.sortDirection() // + ); + logger.debug("Alert query: {}", query); Float avgUsage; try { @@ -412,14 +462,16 @@ public class FlumeDeviceHandler extends BaseThingHandler { this.handleApiException(e); return; } - long minutes = Duration.between(query.sinceDateTime, query.untilDateTime).toMinutes(); + long minutes = Duration.between(query.sinceDateTime(), query.untilDateTime()).toMinutes(); LocalDateTime localWhenTriggered = LocalDateTime.ofInstant(alert.triggeredDateTime, ZoneId.systemDefault()); String stringAlert = String.format(stringAlertFormat, alert.eventRuleName, localWhenTriggered.toString(), minutes, avgUsage, imperialUnits ? "gallons" : "liters"); + logger.debug("Alert: {}", stringAlert); triggerChannel(CHANNEL_DEVICE_USAGEALERT, stringAlert); + this.expiryUsageAlertFetch = System.nanoTime() + USAGE_ALERT_FETCH_INTERVAL.toNanos(); } @Override @@ -466,10 +518,6 @@ public class FlumeDeviceHandler extends BaseThingHandler { updateDeviceChannel(apiDevice, CHANNEL_DEVICE_LASTSEEN); } - public boolean isImperial() { - return Objects.requireNonNull(getBridgeHandler()).systemOfUnits instanceof ImperialUnits; - } - public @Nullable FlumeBridgeHandler getBridgeHandler() { Bridge bridge = this.getBridge(); if (bridge == null) {