[rrd4j] Improve event handling (#15223)

* [rrd4j] Improve event handling

Signed-off-by: Jan N. Klug <github@klug.nrw>
pull/15234/head
J-N-K 2023-07-13 22:11:14 +02:00 committed by GitHub
parent 1942dfeddc
commit 571cd6334f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 164 additions and 155 deletions

View File

@ -20,13 +20,13 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@ -71,6 +71,7 @@ import org.openhab.core.types.State;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.rrd4j.ConsolFun;
@ -108,11 +109,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
private static final Set<String> SUPPORTED_TYPES = Set.of(CoreItemFactory.SWITCH, CoreItemFactory.CONTACT,
CoreItemFactory.DIMMER, CoreItemFactory.NUMBER, CoreItemFactory.ROLLERSHUTTER, CoreItemFactory.COLOR);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3,
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("RRD4j"));
private final Map<String, RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
private final ConcurrentSkipListMap<Long, Map<String, Double>> storageMap = new ConcurrentSkipListMap<>();
private static final String DATASOURCE_STATE = "state";
private static final Path DB_FOLDER = Path.of(OpenHAB.getUserDataFolder(), "persistence", "rrd4j").toAbsolutePath();
@ -120,10 +123,8 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
private static final RrdDbPool DATABASE_POOL = new RrdDbPool();
private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
private final Map<String, ScheduledFuture<?>> scheduledJobs = new HashMap<>();
private final ItemRegistry itemRegistry;
private boolean active = false;
public static Path getDatabasePath(String name) {
return DB_FOLDER.resolve(name + ".rrd");
@ -133,9 +134,132 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
return DATABASE_POOL;
}
private final ScheduledFuture<?> storeJob;
@Activate
public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry, Map<String, Object> config) {
this.itemRegistry = itemRegistry;
storeJob = scheduler.scheduleWithFixedDelay(() -> doStore(false), 1, 1, TimeUnit.SECONDS);
modified(config);
active = true;
}
@Modified
protected void modified(final Map<String, Object> config) {
// clean existing definitions
rrdDefs.clear();
// add default configurations
RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
// use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
defaultNumeric.setDef("GAUGE,600,U,U,10");
// define 5 different boxes:
// 1. granularity of 10s for the last hour
// 2. granularity of 1m for the last week
// 3. granularity of 15m for the last year
// 4. granularity of 1h for the last 5 years
// 5. granularity of 1d for the last 10 years
defaultNumeric
.addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
// use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
defaultQuantifiable.setDef("GAUGE,600,U,U,10");
// define 5 different boxes:
// 1. granularity of 10s for the last hour
// 2. granularity of 1m for the last week
// 3. granularity of 15m for the last year
// 4. granularity of 1h for the last 5 years
// 5. granularity of 1d for the last 10 years
defaultQuantifiable.addArchives(
"AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
// use 5 seconds as a step size for discrete values and allow a 1h silence between updates
defaultOther.setDef("GAUGE,3600,U,U,5");
// define 4 different boxes:
// 1. granularity of 5s for the last hour
// 2. granularity of 1m for the last week
// 3. granularity of 15m for the last year
// 4. granularity of 4h for the last 10 years
defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
rrdDefs.put(DEFAULT_OTHER, defaultOther);
if (config.isEmpty()) {
logger.debug("using default configuration only");
return;
}
Iterator<String> keys = config.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
if ("service.pid".equals(key) || "component.name".equals(key)) {
// ignore service.pid and name
continue;
}
String[] subkeys = key.split("\\.");
if (subkeys.length != 2) {
logger.debug("config '{}' should have the format 'name.configkey'", key);
continue;
}
Object v = config.get(key);
if (v instanceof String) {
String value = (String) v;
String name = subkeys[0].toLowerCase();
String property = subkeys[1].toLowerCase();
if (value.isBlank()) {
logger.trace("Config is empty: {}", property);
continue;
} else {
logger.trace("Processing config: {} = {}", property, value);
}
RrdDefConfig rrdDef = rrdDefs.get(name);
if (rrdDef == null) {
rrdDef = new RrdDefConfig(name);
rrdDefs.put(name, rrdDef);
}
try {
if ("def".equals(property)) {
rrdDef.setDef(value);
} else if ("archives".equals(property)) {
rrdDef.addArchives(value);
} else if ("items".equals(property)) {
rrdDef.addItems(value);
} else {
logger.debug("Unknown property {} : {}", property, value);
}
} catch (IllegalArgumentException e) {
logger.warn("Ignoring illegal configuration: {}", e.getMessage());
}
}
}
for (RrdDefConfig rrdDef : rrdDefs.values()) {
if (rrdDef.isValid()) {
logger.debug("Created {}", rrdDef);
} else {
logger.info("Removing invalid definition {}", rrdDef);
rrdDefs.remove(rrdDef.name);
}
}
}
@Deactivate
protected void deactivate() {
active = false;
storeJob.cancel(false);
// make sure we really store everything
doStore(true);
}
@Override
@ -150,6 +274,11 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
@Override
public void store(final Item item, @Nullable final String alias) {
if (!active) {
logger.warn("Tried to store {} but service is not yet ready (or shutting down).", item);
return;
}
if (!isSupportedItemType(item)) {
logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
return;
@ -158,9 +287,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
Double value;
if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
NumberItem nItem = (NumberItem) item;
QuantityType<?> qState = (QuantityType<?>) item.getState();
if (item instanceof NumberItem nItem && item.getState() instanceof QuantityType<?> qState) {
Unit<? extends Quantity<?>> unit = nItem.getUnit();
if (unit != null) {
QuantityType<?> convertedState = qState.toUnit(unit);
@ -190,11 +317,30 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
long now = System.currentTimeMillis() / 1000;
scheduler.schedule(() -> internalStore(name, value, now, true), 0, TimeUnit.SECONDS);
Double oldValue = storageMap.computeIfAbsent(now, t -> new ConcurrentHashMap<>()).put(name, value);
if (oldValue != null && !oldValue.equals(value)) {
logger.debug(
"Discarding value {} for item {} with timestamp {} because a new value ({}) arrived with the same timestamp.",
oldValue, name, now, value);
}
}
private synchronized void internalStore(String name, double value, long now, boolean retry) {
private void doStore(boolean force) {
while (!storageMap.isEmpty()) {
long timestamp = storageMap.firstKey();
long now = System.currentTimeMillis() / 1000;
if (now > timestamp || force) {
// no new elements can be added for this timestamp because we are already past that time or the service
// requires forced storing
Map<String, Double> values = storageMap.pollFirstEntry().getValue();
values.forEach((name, value) -> writePointToDatabase(name, value, timestamp));
} else {
return;
}
}
}
private synchronized void writePointToDatabase(String name, double value, long timestamp) {
RrdDb db = null;
try {
db = getDB(name, true);
@ -205,41 +351,31 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
return;
}
try {
if (now < db.getLastUpdateTime()) {
logger.warn("RRD4J does not support adding past value this={}, last update={}. Discarding {} - {}", now,
db.getLastUpdateTime(), name, value);
return;
}
} catch (IOException ignored) {
// we can ignore that here, we'll fail again later.
}
ConsolFun function = getConsolidationFunction(db);
if (function != ConsolFun.AVERAGE) {
try {
// we store the last value again, so that the value change
// in the database is not interpolated, but
// happens right at this spot
if (now - 1 > db.getLastUpdateTime()) {
if (timestamp - 1 > db.getLastUpdateTime()) {
// only do it if there is not already a value
double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
if (!Double.isNaN(lastValue)) {
Sample sample = db.createSample();
sample.setTime(now - 1);
sample.setTime(timestamp - 1);
sample.setValue(DATASOURCE_STATE, lastValue);
sample.update();
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database (again)", name,
lastValue, now - 1);
lastValue, timestamp - 1);
}
}
} catch (IOException e) {
logger.debug("Error storing last value (again): {}", e.getMessage());
logger.debug("Error storing last value (again) for {}: {}", e.getMessage(), name);
}
}
try {
Sample sample = db.createSample();
sample.setTime(now);
sample.setTime(timestamp);
double storeValue = value;
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) {
// counter values must be adjusted by stepsize
@ -247,20 +383,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
sample.setValue(DATASOURCE_STATE, storeValue);
sample.update();
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, now);
} catch (IllegalArgumentException e) {
String message = e.getMessage();
if (message != null && message.contains("at least one second step is required") && retry) {
// we try to store the value one second later
ScheduledFuture<?> job = scheduledJobs.get(name);
if (job != null) {
job.cancel(true);
scheduledJobs.remove(name);
}
internalStore(name, value, now + 1, false);
} else {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
}
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, timestamp);
} catch (Exception e) {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
}
@ -524,120 +647,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
}
@Activate
protected void activate(final Map<String, Object> config) {
modified(config);
}
@Modified
protected void modified(final Map<String, Object> config) {
// clean existing definitions
rrdDefs.clear();
// add default configurations
RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC);
// use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
defaultNumeric.setDef("GAUGE,600,U,U,10");
// define 5 different boxes:
// 1. granularity of 10s for the last hour
// 2. granularity of 1m for the last week
// 3. granularity of 15m for the last year
// 4. granularity of 1h for the last 5 years
// 5. granularity of 1d for the last 10 years
defaultNumeric
.addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650");
rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric);
RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE);
// use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates
defaultQuantifiable.setDef("GAUGE,600,U,U,10");
// define 5 different boxes:
// 1. granularity of 10s for the last hour
// 2. granularity of 1m for the last week
// 3. granularity of 15m for the last year
// 4. granularity of 1h for the last 5 years
// 5. granularity of 1d for the last 10 years
defaultQuantifiable.addArchives(
"AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650");
rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable);
RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER);
// use 5 seconds as a step size for discrete values and allow a 1h silence between updates
defaultOther.setDef("GAUGE,3600,U,U,5");
// define 4 different boxes:
// 1. granularity of 5s for the last hour
// 2. granularity of 1m for the last week
// 3. granularity of 15m for the last year
// 4. granularity of 4h for the last 10 years
defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900");
rrdDefs.put(DEFAULT_OTHER, defaultOther);
if (config.isEmpty()) {
logger.debug("using default configuration only");
return;
}
Iterator<String> keys = config.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
if ("service.pid".equals(key) || "component.name".equals(key)) {
// ignore service.pid and name
continue;
}
String[] subkeys = key.split("\\.");
if (subkeys.length != 2) {
logger.debug("config '{}' should have the format 'name.configkey'", key);
continue;
}
Object v = config.get(key);
if (v instanceof String) {
String value = (String) v;
String name = subkeys[0].toLowerCase();
String property = subkeys[1].toLowerCase();
if (value.isBlank()) {
logger.trace("Config is empty: {}", property);
continue;
} else {
logger.trace("Processing config: {} = {}", property, value);
}
RrdDefConfig rrdDef = rrdDefs.get(name);
if (rrdDef == null) {
rrdDef = new RrdDefConfig(name);
rrdDefs.put(name, rrdDef);
}
try {
if ("def".equals(property)) {
rrdDef.setDef(value);
} else if ("archives".equals(property)) {
rrdDef.addArchives(value);
} else if ("items".equals(property)) {
rrdDef.addItems(value);
} else {
logger.debug("Unknown property {} : {}", property, value);
}
} catch (IllegalArgumentException e) {
logger.warn("Ignoring illegal configuration: {}", e.getMessage());
}
}
}
for (RrdDefConfig rrdDef : rrdDefs.values()) {
if (rrdDef.isValid()) {
logger.debug("Created {}", rrdDef);
} else {
logger.info("Removing invalid definition {}", rrdDef);
rrdDefs.remove(rrdDef.name);
}
}
}
private static class RrdArchiveDef {
public @Nullable ConsolFun fcn;
public double xff;