feat: add per-topic alarm expiry using PullMessagesResponse TerminationTime
Cameras like Reolink send alarm=true but never send the corresponding false, causing alarms to stick indefinitely. Use the TerminationTime from PullMessagesResponse to auto-expire stale per-topic alarms. - Add AlarmEntry struct with value and termination_time fields - Extract TerminationTime from each PullMessagesResponse and attach it to alarm entries; refresh on re-trigger so active alarms persist - Sweep expired alarms after processing messages and on poll timeout - Add expire_alarms option (default: true) to disable via onvif_options - Fix TOCTOU race: remove unsynchronized alarms.empty() check before acquiring mutex in the timeout sweep path - Simplify SetNoteSet with C++17 structured bindings - Add Catch2 tests for alarm expiry logic (mirrored struct to avoid gSOAP header dependency) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>pull/4573/head
parent
8eee695e63
commit
2dc48e4335
|
|
@ -77,6 +77,7 @@ ONVIF::ONVIF(Monitor *parent_) :
|
|||
,use_absolute_time_for_renewal(false)
|
||||
,renewal_enabled(true)
|
||||
#endif
|
||||
,expire_alarms_enabled(true)
|
||||
,terminate_(false)
|
||||
{
|
||||
parse_onvif_options();
|
||||
|
|
@ -423,6 +424,12 @@ void ONVIF::WaitForMessage() {
|
|||
|
||||
// Don't clear alarms on timeout - they should remain active until explicitly cleared
|
||||
// Timeout is not an error, don't increment retry_count
|
||||
|
||||
// Still sweep for expired alarms on timeout - stuck alarms may have expired
|
||||
if (expire_alarms_enabled) {
|
||||
std::unique_lock<std::mutex> lck(alarms_mutex);
|
||||
expire_stale_alarms(std::chrono::system_clock::now());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Success - reset retry count
|
||||
|
|
@ -431,9 +438,30 @@ void ONVIF::WaitForMessage() {
|
|||
retry_count = 0;
|
||||
}
|
||||
Debug(1, "ONVIF polling : Got Good Response! %i, # of messages %zu", result, tev__PullMessagesResponse.wsnt__NotificationMessage.size());
|
||||
|
||||
// Extract TerminationTime from PullMessagesResponse for per-topic alarm expiry.
|
||||
// This is the camera's indication of how long the current subscription/response is valid.
|
||||
SystemTimePoint response_termination;
|
||||
bool have_response_termination = false;
|
||||
if (tev__PullMessagesResponse.TerminationTime != 0) {
|
||||
response_termination = std::chrono::system_clock::from_time_t(tev__PullMessagesResponse.TerminationTime);
|
||||
have_response_termination = true;
|
||||
Debug(2, "ONVIF: PullMessagesResponse TerminationTime=%ld (%s)",
|
||||
static_cast<long>(tev__PullMessagesResponse.TerminationTime),
|
||||
SystemTimePointToString(response_termination).c_str());
|
||||
}
|
||||
|
||||
{ // Scope for lock
|
||||
std::unique_lock<std::mutex> lck(alarms_mutex);
|
||||
|
||||
// Compute termination time for alarm entries: prefer response termination,
|
||||
// fall back to existing alarm's time, or epoch for new alarms.
|
||||
auto alarm_termination = [&](const std::string &topic) -> SystemTimePoint {
|
||||
if (have_response_termination) return response_termination;
|
||||
auto it = alarms.find(topic);
|
||||
return (it != alarms.end()) ? it->second.termination_time : SystemTimePoint{};
|
||||
};
|
||||
|
||||
// Note: We do NOT clear alarms on empty PullMessages response.
|
||||
// According to ONVIF spec, alarms should only be cleared based on explicit
|
||||
// PropertyOperation="Deleted" or PropertyOperation="Changed" with inactive value.
|
||||
|
|
@ -508,7 +536,7 @@ void ONVIF::WaitForMessage() {
|
|||
if (state_is_active && alarms.count(last_topic) == 0) {
|
||||
// Camera reports an existing alarm we didn't know about
|
||||
Debug(2, "ONVIF Syncing with camera: alarm is already active for topic: %s", last_topic.c_str());
|
||||
alarms[last_topic] = last_value;
|
||||
alarms[last_topic] = AlarmEntry{last_value, alarm_termination(last_topic)};
|
||||
if (!isAlarmed()) {
|
||||
setAlarmed(true);
|
||||
Info("ONVIF Alarm already active on subscription (Initialized): %s", last_topic.c_str());
|
||||
|
|
@ -548,14 +576,14 @@ void ONVIF::WaitForMessage() {
|
|||
// Alarm turned on
|
||||
Info("ONVIF Alarm On (Changed to active): topic=%s value=%s", last_topic.c_str(), last_value.c_str());
|
||||
if (alarms.count(last_topic) == 0) {
|
||||
alarms[last_topic] = last_value;
|
||||
alarms[last_topic] = AlarmEntry{last_value, alarm_termination(last_topic)};
|
||||
if (!isAlarmed()) {
|
||||
Info("ONVIF Triggered Start Event on topic: %s", last_topic.c_str());
|
||||
setAlarmed(true);
|
||||
}
|
||||
} else {
|
||||
// Update existing alarm value
|
||||
alarms[last_topic] = last_value;
|
||||
// Update existing alarm value and refresh termination time
|
||||
alarms[last_topic] = AlarmEntry{last_value, alarm_termination(last_topic)};
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -572,17 +600,25 @@ void ONVIF::WaitForMessage() {
|
|||
}
|
||||
} else {
|
||||
if (alarms.count(last_topic) == 0) {
|
||||
alarms[last_topic] = last_value;
|
||||
alarms[last_topic] = AlarmEntry{last_value, alarm_termination(last_topic)};
|
||||
if (!isAlarmed()) {
|
||||
setAlarmed(true);
|
||||
}
|
||||
} else {
|
||||
alarms[last_topic] = last_value;
|
||||
alarms[last_topic] = AlarmEntry{last_value, alarm_termination(last_topic)};
|
||||
}
|
||||
}
|
||||
}
|
||||
Debug(1, "ONVIF Alarms count is %zu, alarmed is %s", alarms.size(), isAlarmed() ? "true" : "false");
|
||||
} // end foreach msg
|
||||
|
||||
// Sweep and expire alarms whose per-topic TerminationTime has passed.
|
||||
// This handles cameras (e.g., Reolink) that send alarm=true but never
|
||||
// send the corresponding false. Alarms that are still being re-triggered
|
||||
// will have had their TerminationTime refreshed above.
|
||||
if (expire_alarms_enabled && !alarms.empty()) {
|
||||
expire_stale_alarms(std::chrono::system_clock::now());
|
||||
}
|
||||
} // end scope for lock
|
||||
|
||||
if (IsRenewalNeeded()) Renew();
|
||||
|
|
@ -713,8 +749,10 @@ void ONVIF::cleanup_subscription() {
|
|||
// soap_log=/path/to/logfile - Enable SOAP message logging
|
||||
void ONVIF::parse_onvif_options() {
|
||||
if (parent->onvif_options.empty()) {
|
||||
Info("ONVIF: Using pull_timeout=%ds, subscription_timeout=%ds, renewal_enabled=%s",
|
||||
pull_timeout_seconds, subscription_timeout_seconds, renewal_enabled ? "true" : "false");
|
||||
Info("ONVIF: Using pull_timeout=%ds, subscription_timeout=%ds, renewal_enabled=%s, expire_alarms=%s",
|
||||
pull_timeout_seconds, subscription_timeout_seconds,
|
||||
renewal_enabled ? "true" : "false",
|
||||
expire_alarms_enabled ? "true" : "false");
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -779,11 +817,20 @@ void ONVIF::parse_onvif_options() {
|
|||
} else {
|
||||
renewal_enabled = true;
|
||||
}
|
||||
} else if (key == "expire_alarms") {
|
||||
if (value == "false" || value == "0" || value == "no") {
|
||||
expire_alarms_enabled = false;
|
||||
Info("ONVIF: Per-topic alarm expiry disabled via option");
|
||||
} else {
|
||||
expire_alarms_enabled = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Info("ONVIF: Using pull_timeout=%ds, subscription_timeout=%ds, renewal_enabled=%s",
|
||||
pull_timeout_seconds, subscription_timeout_seconds, renewal_enabled ? "true" : "false");
|
||||
Info("ONVIF: Using pull_timeout=%ds, subscription_timeout=%ds, renewal_enabled=%s, expire_alarms=%s",
|
||||
pull_timeout_seconds, subscription_timeout_seconds,
|
||||
renewal_enabled ? "true" : "false",
|
||||
expire_alarms_enabled ? "true" : "false");
|
||||
}
|
||||
|
||||
// Calculate exponential backoff delay for retries
|
||||
|
|
@ -1462,16 +1509,39 @@ void ONVIF::Run() {
|
|||
}
|
||||
#endif
|
||||
|
||||
// Sweep through active alarms and expire any whose per-topic TerminationTime has passed.
|
||||
// This handles cameras that send alarm=true but never send the corresponding false
|
||||
// (e.g., Reolink PeopleDetect/VehicleDetect topics).
|
||||
// Must be called with alarms_mutex held.
|
||||
void ONVIF::expire_stale_alarms(const SystemTimePoint &now) {
|
||||
auto it = alarms.begin();
|
||||
while (it != alarms.end()) {
|
||||
// Skip entries with no termination time set (epoch = uninitialized)
|
||||
if (it->second.termination_time.time_since_epoch().count() == 0) {
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
if (it->second.termination_time <= now) {
|
||||
Info("ONVIF: Auto-expiring stale alarm for topic=%s (TerminationTime %s has passed)",
|
||||
it->first.c_str(),
|
||||
SystemTimePointToString(it->second.termination_time).c_str());
|
||||
it = alarms.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
if (alarms.empty()) {
|
||||
setAlarmed(false);
|
||||
}
|
||||
}
|
||||
|
||||
void ONVIF::SetNoteSet(Event::StringSet ¬eSet) {
|
||||
#ifdef WITH_GSOAP
|
||||
std::unique_lock<std::mutex> lck(alarms_mutex);
|
||||
if (alarms.empty()) return;
|
||||
|
||||
std::string note = "";
|
||||
for (auto it = alarms.begin(); it != alarms.end(); ++it) {
|
||||
note = it->first + "/" + it->second;
|
||||
noteSet.insert(note);
|
||||
for (const auto &[topic, entry] : alarms) {
|
||||
noteSet.insert(topic + "/" + entry.value);
|
||||
}
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,7 +93,13 @@ class ONVIF {
|
|||
bool IsRenewalNeeded(); // Check if subscription renewal is needed now
|
||||
bool do_wsa_request(const char* address, const char* action); // Setup WS-Addressing headers for SOAP request
|
||||
#endif
|
||||
std::unordered_map<std::string, std::string> alarms;
|
||||
struct AlarmEntry {
|
||||
std::string value;
|
||||
SystemTimePoint termination_time;
|
||||
};
|
||||
std::unordered_map<std::string, AlarmEntry> alarms;
|
||||
bool expire_alarms_enabled; // Enable per-topic TerminationTime expiry (default: true)
|
||||
void expire_stale_alarms(const SystemTimePoint &now); // Sweep and expire alarms past TerminationTime
|
||||
std::mutex alarms_mutex;
|
||||
|
||||
// Thread management
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@
|
|||
#include "zm_catch2.h"
|
||||
#include "zm_time.h"
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
// Test the ONVIF subscription renewal timing logic
|
||||
TEST_CASE("ONVIF Subscription Renewal Timing") {
|
||||
|
|
@ -207,10 +209,10 @@ TEST_CASE("ONVIF Absolute Time Formatting") {
|
|||
SECTION("Verify ISO 8601 format components") {
|
||||
time_t test_time = 1705151696; // 2024-01-13 13:14:56 UTC
|
||||
std::string result = format_absolute_time_iso8601(test_time);
|
||||
|
||||
|
||||
// Check year
|
||||
REQUIRE(result.substr(0, 4) == "2024");
|
||||
|
||||
|
||||
// Check separators
|
||||
REQUIRE(result[4] == '-'); // After year
|
||||
REQUIRE(result[7] == '-'); // After month
|
||||
|
|
@ -221,3 +223,146 @@ TEST_CASE("ONVIF Absolute Time Formatting") {
|
|||
REQUIRE(result[23] == 'Z'); // UTC indicator
|
||||
}
|
||||
}
|
||||
|
||||
// Standalone AlarmEntry struct matching the one in zm_monitor_onvif.h.
|
||||
// We replicate it here so tests don't depend on gSOAP headers.
|
||||
namespace onvif_test {
|
||||
struct AlarmEntry {
|
||||
std::string value;
|
||||
SystemTimePoint termination_time;
|
||||
};
|
||||
|
||||
using AlarmMap = std::unordered_map<std::string, AlarmEntry>;
|
||||
|
||||
// Mirror of ONVIF::expire_stale_alarms logic for unit testing.
|
||||
// Returns true if the map became empty (caller should setAlarmed(false)).
|
||||
bool expire_stale_alarms(AlarmMap &alarms, const SystemTimePoint &now) {
|
||||
auto it = alarms.begin();
|
||||
while (it != alarms.end()) {
|
||||
// Skip entries with no termination time set (epoch = uninitialized)
|
||||
if (it->second.termination_time.time_since_epoch().count() == 0) {
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
if (it->second.termination_time <= now) {
|
||||
it = alarms.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
return alarms.empty();
|
||||
}
|
||||
} // namespace onvif_test
|
||||
|
||||
// Test per-topic TerminationTime alarm expiry logic
|
||||
TEST_CASE("ONVIF Per-Topic Alarm Expiry") {
|
||||
using namespace onvif_test;
|
||||
auto now = std::chrono::system_clock::now();
|
||||
|
||||
SECTION("Expired alarms are removed by sweep") {
|
||||
AlarmMap alarms;
|
||||
// Alarm with TerminationTime 10 seconds in the past
|
||||
alarms["PeopleDetect"] = AlarmEntry{"true", now - std::chrono::seconds(10)};
|
||||
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(alarms.empty());
|
||||
REQUIRE(empty);
|
||||
}
|
||||
|
||||
SECTION("Future alarms are retained by sweep") {
|
||||
AlarmMap alarms;
|
||||
// Alarm with TerminationTime 60 seconds in the future
|
||||
alarms["MotionAlarm"] = AlarmEntry{"true", now + std::chrono::seconds(60)};
|
||||
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(alarms.size() == 1);
|
||||
REQUIRE_FALSE(empty);
|
||||
}
|
||||
|
||||
SECTION("Mixed expired and future alarms") {
|
||||
AlarmMap alarms;
|
||||
alarms["PeopleDetect"] = AlarmEntry{"true", now - std::chrono::seconds(10)};
|
||||
alarms["MotionAlarm"] = AlarmEntry{"true", now + std::chrono::seconds(60)};
|
||||
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(alarms.size() == 1);
|
||||
REQUIRE(alarms.count("MotionAlarm") == 1);
|
||||
REQUIRE(alarms.count("PeopleDetect") == 0);
|
||||
REQUIRE_FALSE(empty);
|
||||
}
|
||||
|
||||
SECTION("Re-triggering an alarm updates its TerminationTime") {
|
||||
AlarmMap alarms;
|
||||
// Initial alarm with TerminationTime 5 seconds from now
|
||||
SystemTimePoint initial_term = now + std::chrono::seconds(5);
|
||||
alarms["PeopleDetect"] = AlarmEntry{"true", initial_term};
|
||||
|
||||
// Simulate re-trigger with new TerminationTime 65 seconds from now
|
||||
SystemTimePoint new_term = now + std::chrono::seconds(65);
|
||||
alarms["PeopleDetect"] = AlarmEntry{"true", new_term};
|
||||
|
||||
// Sweep at now+10s - alarm should NOT be expired because it was refreshed
|
||||
SystemTimePoint sweep_time = now + std::chrono::seconds(10);
|
||||
bool empty = expire_stale_alarms(alarms, sweep_time);
|
||||
REQUIRE(alarms.size() == 1);
|
||||
REQUIRE_FALSE(empty);
|
||||
|
||||
// Verify the termination time was updated
|
||||
REQUIRE(alarms["PeopleDetect"].termination_time == new_term);
|
||||
}
|
||||
|
||||
SECTION("Alarms with epoch termination time (uninitialized) are not expired") {
|
||||
AlarmMap alarms;
|
||||
// Alarm with default-constructed (epoch) termination time
|
||||
alarms["SomeAlarm"] = AlarmEntry{"true", SystemTimePoint{}};
|
||||
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(alarms.size() == 1);
|
||||
REQUIRE_FALSE(empty);
|
||||
}
|
||||
|
||||
SECTION("TerminationTime exactly equal to now is expired") {
|
||||
AlarmMap alarms;
|
||||
alarms["PeopleDetect"] = AlarmEntry{"true", now};
|
||||
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(alarms.empty());
|
||||
REQUIRE(empty);
|
||||
}
|
||||
|
||||
SECTION("Multiple expired alarms are all removed") {
|
||||
AlarmMap alarms;
|
||||
alarms["PeopleDetect"] = AlarmEntry{"true", now - std::chrono::seconds(30)};
|
||||
alarms["VehicleDetect"] = AlarmEntry{"true", now - std::chrono::seconds(20)};
|
||||
alarms["DogCatDetect"] = AlarmEntry{"true", now - std::chrono::seconds(10)};
|
||||
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(alarms.empty());
|
||||
REQUIRE(empty);
|
||||
}
|
||||
|
||||
SECTION("Empty alarms map is handled gracefully") {
|
||||
AlarmMap alarms;
|
||||
bool empty = expire_stale_alarms(alarms, now);
|
||||
REQUIRE(empty);
|
||||
}
|
||||
|
||||
SECTION("AlarmEntry stores value correctly") {
|
||||
AlarmEntry entry{"true", now + std::chrono::seconds(60)};
|
||||
REQUIRE(entry.value == "true");
|
||||
|
||||
AlarmEntry entry2{"false", now};
|
||||
REQUIRE(entry2.value == "false");
|
||||
}
|
||||
|
||||
SECTION("Alarm value accessible via map for SetNoteSet") {
|
||||
AlarmMap alarms;
|
||||
alarms["MyRuleDetector/PeopleDetect"] = AlarmEntry{"true", now + std::chrono::seconds(60)};
|
||||
|
||||
// Simulate SetNoteSet logic: iterate and access .value
|
||||
for (auto it = alarms.begin(); it != alarms.end(); ++it) {
|
||||
std::string note = it->first + "/" + it->second.value;
|
||||
REQUIRE(note == "MyRuleDetector/PeopleDetect/true");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue