Merge branch 'master' into add_janus_rtsp_user

pull/3587/head
Isaac Connor 2022-09-13 16:17:15 -04:00
commit deb35738db
4 changed files with 370 additions and 303 deletions

View File

@ -122,13 +122,13 @@ bool EventStream::loadEventData(uint64_t event_id) {
exit(-1);
}
if ( !mysql_num_rows(result) ) {
if (!mysql_num_rows(result)) {
Fatal("Unable to load event %" PRIu64 ", not found in DB", event_id);
}
MYSQL_ROW dbrow = mysql_fetch_row(result);
if ( mysql_errno(&dbconn) ) {
if (mysql_errno(&dbconn)) {
Error("Can't fetch row: %s", mysql_error(&dbconn));
exit(mysql_errno(&dbconn));
}
@ -136,7 +136,6 @@ bool EventStream::loadEventData(uint64_t event_id) {
delete event_data;
event_data = new EventData;
event_data->event_id = event_id;
event_data->monitor_id = atoi(dbrow[0]);
event_data->storage_id = dbrow[1] ? atoi(dbrow[1]) : 0;
event_data->frame_count = dbrow[2] == nullptr ? 0 : atoi(dbrow[2]);
@ -158,24 +157,24 @@ bool EventStream::loadEventData(uint64_t event_id) {
event_data->Orientation = (Monitor::Orientation)(dbrow[9] == nullptr ? 0 : atoi(dbrow[9]));
mysql_free_result(result);
if ( !monitor ) {
if (!monitor) {
monitor = Monitor::Load(event_data->monitor_id, false, Monitor::QUERY);
} else if ( monitor->Id() != event_data->monitor_id ) {
} else if (monitor->Id() != event_data->monitor_id) {
monitor = Monitor::Load(event_data->monitor_id, false, Monitor::QUERY);
}
if ( !monitor ) {
if (!monitor) {
Fatal("Unable to load monitor id %d for streaming", event_data->monitor_id);
}
if ( !storage ) {
if (!storage) {
storage = new Storage(event_data->storage_id);
} else if ( storage->Id() != event_data->storage_id ) {
} else if (storage->Id() != event_data->storage_id) {
delete storage;
storage = new Storage(event_data->storage_id);
}
const char *storage_path = storage->Path();
if ( event_data->scheme == Storage::DEEP ) {
if (event_data->scheme == Storage::DEEP) {
tm event_time = {};
time_t start_time_t = std::chrono::system_clock::to_time_t(event_data->start_time);
localtime_r(&start_time_t, &event_time);
@ -231,56 +230,87 @@ bool EventStream::loadEventData(uint64_t event_id) {
}
event_data->n_frames = mysql_num_rows(result);
event_data->frames = new FrameData[event_data->frame_count];
if (event_data->frame_count < event_data->n_frames) {
event_data->frame_count = event_data->n_frames;
Warning("Event %" PRId64 " has more frames in the Frames table (%d) than in the Event record (%d)",
event_data->event_id, event_data->n_frames, event_data->frame_count);
}
event_data->frames.clear();
event_data->frames.reserve(event_data->frame_count);
int last_id = 0;
SystemTimePoint last_timestamp = event_data->start_time;
Microseconds last_delta = Seconds(0);
Microseconds last_offset = Seconds(0);
const FrameData *last_frame;
// Here are the issues: if showing jpegs, need FrameId.
// Delta is the time since last frame, not since beginning of Event
while ((dbrow = mysql_fetch_row(result))) {
int id = atoi(dbrow[0]);
//timestamp = atof(dbrow[1]);
Microseconds delta = std::chrono::duration_cast<Microseconds>(FPSeconds(atof(dbrow[2])));
//timestamp = atof(dbrow[1]); // timestamp is useless because it's just seconds.
// What is in the Delta column is distance from StartTime. We will call that offset.
Microseconds offset = std::chrono::duration_cast<Microseconds>(FPSeconds(atof(dbrow[2])));
SystemTimePoint timestamp = event_data->start_time + offset;
int id_diff = id - last_id;
Microseconds frame_delta =
std::chrono::duration_cast<Microseconds>(id_diff ? (delta - last_delta) / id_diff : (delta - last_delta));
Microseconds delta =
std::chrono::duration_cast<Microseconds>(id_diff ? (offset - last_offset) / id_diff : (offset - last_offset));
Debug(1, "New delta %f from id_diff %d = id %d - last_id %d offset %f - last)_offset %f",
FPSeconds(delta).count(), id_diff, id, last_id, FPSeconds(offset).count(), FPSeconds(last_offset).count());
// Fill in data between bulk frames
if (id_diff > 1) {
for (int i = last_id + 1; i < id; i++) {
// Delta is the time since last frame, no since beginning of Event
event_data->frames[i - 1].delta = frame_delta;
event_data->frames[i - 1].timestamp = last_timestamp + ((i - last_id) * frame_delta);
event_data->frames[i - 1].offset =
std::chrono::duration_cast<Microseconds>(event_data->frames[i - 1].timestamp - event_data->start_time);
event_data->frames[i - 1].in_db = false;
Debug(3, "Frame %d timestamp (%f s), offset (%f s) delta (%f s), in_db (%d)",
i,
FPSeconds(event_data->frames[i - 1].timestamp.time_since_epoch()).count(),
FPSeconds(event_data->frames[i - 1].offset).count(),
FPSeconds(event_data->frames[i - 1].delta).count(),
event_data->frames[i - 1].in_db);
auto frame = event_data->frames.emplace_back(
i,
last_timestamp + ((i - last_id) * delta),
std::chrono::duration_cast<Microseconds>((last_frame->timestamp - event_data->start_time) + delta),
delta,
false
);
last_frame = &frame;
Debug(3, "Frame %d %d timestamp (%f s), offset (%f s) delta (%f s), in_db (%d)",
i, frame.id,
FPSeconds(frame.timestamp.time_since_epoch()).count(),
FPSeconds(frame.offset).count(),
FPSeconds(frame.delta).count(),
frame.in_db);
}
}
event_data->frames[id - 1].timestamp = event_data->start_time + delta;
event_data->frames[id - 1].offset = delta;
event_data->frames[id - 1].delta = frame_delta;
event_data->frames[id - 1].in_db = true;
auto frame = event_data->frames.emplace_back(id, timestamp, offset, delta, true);
last_frame = &frame;
last_id = id;
last_delta = delta;
last_timestamp = event_data->frames[id-1].timestamp;
last_offset = offset;
last_timestamp = timestamp;
Debug(3, "Frame %d timestamp (%f s), offset (%f s), delta(%f s), in_db(%d)",
id,
FPSeconds(event_data->frames[id - 1].timestamp.time_since_epoch()).count(),
FPSeconds(event_data->frames[id - 1].offset).count(),
FPSeconds(event_data->frames[id - 1].delta).count(),
event_data->frames[id - 1].in_db);
FPSeconds(frame.timestamp.time_since_epoch()).count(),
FPSeconds(frame.offset).count(),
FPSeconds(frame.delta).count(),
frame.in_db);
}
if (event_data->end_time.time_since_epoch() != Seconds(0)) {
while (event_data->end_time > last_timestamp) {
last_timestamp += last_frame->delta;
last_id ++;
auto frame = event_data->frames.emplace_back(
last_id,
last_timestamp,
last_frame->offset + last_frame->delta,
last_frame->delta,
false
);
last_frame = &frame;
Debug(3, "Trailing Frame %d timestamp (%f s), offset (%f s), delta(%f s), in_db(%d)",
last_id,
FPSeconds(frame.timestamp.time_since_epoch()).count(),
FPSeconds(frame.offset).count(),
FPSeconds(frame.delta).count(),
frame.in_db);
} // end while
} // end if have endtime
// Incomplete events might not have any frame data
event_data->last_frame_id = last_id;
@ -333,93 +363,108 @@ void EventStream::processCommand(const CmdMsg *msg) {
// Set paused flag
paused = true;
replay_rate = ZM_RATE_BASE;
last_frame_sent = now;
//replay_rate = ZM_RATE_BASE;
//last_frame_sent = now;
break;
case CMD_PLAY :
Debug(1, "Got PLAY command");
if ( paused ) {
paused = false;
}
{
std::scoped_lock lck{mutex};
Debug(1, "Got PLAY command");
if ( paused ) {
paused = false;
}
// If we are in single event mode and at the last frame, replay the current event
if (
(mode == MODE_SINGLE || mode == MODE_NONE)
&&
(curr_frame_id == event_data->last_frame_id)
) {
Debug(1, "Was in single or no replay mode, and at last frame, so jumping to 1st frame");
curr_frame_id = 1;
} else {
Debug(1, "mode is %s, current frame is %d, frame count is %d, last frame id is %d",
// If we are in single event mode and at the last frame, replay the current event
if (
(mode == MODE_SINGLE || mode == MODE_NONE)
&&
(curr_frame_id == event_data->last_frame_id)
) {
Debug(1, "Was in single or no replay mode, and at last frame, so jumping to 1st frame");
curr_frame_id = 1;
} else {
Debug(1, "mode is %s, current frame is %d, frame count is %d, last frame id is %d",
StreamMode_Strings[(int) mode].c_str(),
curr_frame_id,
event_data->frame_count,
event_data->last_frame_id);
}
}
replay_rate = ZM_RATE_BASE;
break;
replay_rate = ZM_RATE_BASE;
break;
}
case CMD_VARPLAY :
Debug(1, "Got VARPLAY command");
if ( paused ) {
paused = false;
{
std::scoped_lock lck{mutex};
Debug(1, "Got VARPLAY command");
if ( paused ) {
paused = false;
}
replay_rate = ntohs(((unsigned char)msg->msg_data[2]<<8)|(unsigned char)msg->msg_data[1])-32768;
if ( replay_rate > 50 * ZM_RATE_BASE ) {
Warning("requested replay rate (%d) is too high. We only support up to 50x", replay_rate);
replay_rate = 50 * ZM_RATE_BASE;
} else if ( replay_rate < -50*ZM_RATE_BASE ) {
Warning("requested replay rate (%d) is too low. We only support up to -50x", replay_rate);
replay_rate = -50 * ZM_RATE_BASE;
}
break;
}
replay_rate = ntohs(((unsigned char)msg->msg_data[2]<<8)|(unsigned char)msg->msg_data[1])-32768;
if ( replay_rate > 50 * ZM_RATE_BASE ) {
Warning("requested replay rate (%d) is too high. We only support up to 50x", replay_rate);
replay_rate = 50 * ZM_RATE_BASE;
} else if ( replay_rate < -50*ZM_RATE_BASE ) {
Warning("requested replay rate (%d) is too low. We only support up to -50x", replay_rate);
replay_rate = -50 * ZM_RATE_BASE;
}
break;
case CMD_STOP :
Debug(1, "Got STOP command");
paused = false;
break;
case CMD_FASTFWD :
Debug(1, "Got FAST FWD command");
if ( paused ) {
paused = false;
{
Debug(1, "Got FAST FWD command");
std::scoped_lock lck{mutex};
if ( paused ) {
paused = false;
}
// Set play rate
switch ( replay_rate ) {
case 2 * ZM_RATE_BASE :
replay_rate = 5 * ZM_RATE_BASE;
break;
case 5 * ZM_RATE_BASE :
replay_rate = 10 * ZM_RATE_BASE;
break;
case 10 * ZM_RATE_BASE :
replay_rate = 25 * ZM_RATE_BASE;
break;
case 25 * ZM_RATE_BASE :
case 50 * ZM_RATE_BASE :
replay_rate = 50 * ZM_RATE_BASE;
break;
default :
Debug(1,"Defaulting replay_rate to 2*ZM_RATE_BASE because it is %d", replay_rate);
replay_rate = 2 * ZM_RATE_BASE;
break;
}
break;
}
// Set play rate
switch ( replay_rate ) {
case 2 * ZM_RATE_BASE :
replay_rate = 5 * ZM_RATE_BASE;
break;
case 5 * ZM_RATE_BASE :
replay_rate = 10 * ZM_RATE_BASE;
break;
case 10 * ZM_RATE_BASE :
replay_rate = 25 * ZM_RATE_BASE;
break;
case 25 * ZM_RATE_BASE :
case 50 * ZM_RATE_BASE :
replay_rate = 50 * ZM_RATE_BASE;
break;
default :
Debug(1,"Defaulting replay_rate to 2*ZM_RATE_BASE because it is %d", replay_rate);
replay_rate = 2 * ZM_RATE_BASE;
break;
}
break;
case CMD_SLOWFWD :
paused = true;
replay_rate = ZM_RATE_BASE;
step = 1;
if (curr_frame_id < event_data->last_frame_id)
curr_frame_id += 1;
Debug(1, "Got SLOWFWD command new frame id %d", curr_frame_id);
break;
{
std::scoped_lock lck{mutex};
paused = true;
replay_rate = ZM_RATE_BASE;
step = 1;
if (curr_frame_id < event_data->last_frame_id)
curr_frame_id += 1;
Debug(1, "Got SLOWFWD command new frame id %d", curr_frame_id);
break;
}
case CMD_SLOWREV :
paused = true;
replay_rate = ZM_RATE_BASE;
step = -1;
curr_frame_id -= 1;
if ( curr_frame_id < 1 ) curr_frame_id = 1;
Debug(1, "Got SLOWREV command new frame id %d", curr_frame_id);
break;
{
std::scoped_lock lck{mutex};
paused = true;
replay_rate = ZM_RATE_BASE;
step = -1;
curr_frame_id -= 1;
if ( curr_frame_id < 1 ) curr_frame_id = 1;
Debug(1, "Got SLOWREV command new frame id %d", curr_frame_id);
break;
}
case CMD_FASTREV :
Debug(1, "Got FAST REV command");
paused = false;
@ -452,17 +497,29 @@ void EventStream::processCommand(const CmdMsg *msg) {
Debug(1, "Got ZOOM IN command, to %d,%d", x, y);
zoom += 10;
send_frame = true;
if (paused) {
step = 1;
send_twice = true;
}
break;
case CMD_ZOOMOUT :
Debug(1, "Got ZOOM OUT command");
zoom -= 10;
if (zoom < 100) zoom = 100;
send_frame = true;
if (paused) {
step = 1;
send_twice = true;
}
break;
case CMD_ZOOMSTOP :
Debug(1, "Got ZOOM STOP command");
zoom = 100;
send_frame = true;
if (paused) {
step = 1;
send_twice = true;
}
break;
case CMD_PAN :
x = ((unsigned char)msg->msg_data[1]<<8)|(unsigned char)msg->msg_data[2];
@ -493,6 +550,7 @@ void EventStream::processCommand(const CmdMsg *msg) {
break;
case CMD_SEEK :
{
std::scoped_lock lck{mutex};
double int_part = ((unsigned char) msg->msg_data[1] << 24) | ((unsigned char) msg->msg_data[2] << 16)
| ((unsigned char) msg->msg_data[3] << 8) | (unsigned char) msg->msg_data[4];
double dec_part = ((unsigned char) msg->msg_data[5] << 24) | ((unsigned char) msg->msg_data[6] << 16)
@ -506,13 +564,37 @@ void EventStream::processCommand(const CmdMsg *msg) {
// This should get us close, but not all frames will have the same duration
curr_frame_id = (int) (event_data->frame_count * offset / event_data->duration) + 1;
if (curr_frame_id < 1) {
Debug(1, "curr_frame_id = %d, so setting to 1", curr_frame_id);
curr_frame_id = 1;
}
// TODO Replace this with a binary search
if (event_data->frames[curr_frame_id - 1].offset > offset) {
while ((curr_frame_id--) && (event_data->frames[curr_frame_id - 1].offset > offset)) {}
Debug(1, "Searching for frame at %.2f, offset of frame %d is %.2f",
FPSeconds(offset).count(),
curr_frame_id,
FPSeconds(event_data->frames[curr_frame_id - 1].offset).count()
);
while ((curr_frame_id--) && (event_data->frames[curr_frame_id - 1].offset > offset)) {
Debug(1, "Searching for frame at %.2f, offset of frame %d is %.2f",
FPSeconds(offset).count(),
curr_frame_id,
FPSeconds(event_data->frames[curr_frame_id - 1].offset).count()
);
}
} else if (event_data->frames[curr_frame_id - 1].offset < offset) {
while ((curr_frame_id++) && (event_data->frames[curr_frame_id - 1].offset > offset)) {}
while ((curr_frame_id++ < event_data->last_frame_id) && (event_data->frames[curr_frame_id - 1].offset < offset)) {
Debug(1, "Searching for frame at %.2f, offset of frame %d is %.2f",
FPSeconds(offset).count(),
curr_frame_id,
FPSeconds(event_data->frames[curr_frame_id - 1].offset).count()
);
}
curr_frame_id--;
}
if ( curr_frame_id < 1 ) {
if (curr_frame_id < 1) {
Debug(1, "curr_frame_id = %d, so setting to 1", curr_frame_id);
curr_frame_id = 1;
} else if (curr_frame_id > event_data->last_frame_id) {
curr_frame_id = event_data->last_frame_id;
@ -523,6 +605,10 @@ void EventStream::processCommand(const CmdMsg *msg) {
FPSeconds(offset).count(),
curr_frame_id,
FPSeconds(event_data->frames[curr_frame_id - 1].offset).count());
if (paused) {
step = 1; // if we are paused, we won't send a frame except a keepalive.
send_twice = true;
}
send_frame = true;
break;
}
@ -567,8 +653,7 @@ void EventStream::processCommand(const CmdMsg *msg) {
DataMsg status_msg;
status_msg.msg_type = MSG_DATA_EVENT;
memcpy(&status_msg.msg_data, &status_data, sizeof(status_data));
Debug(1, "Size of msg %zu", sizeof(status_data));
if ( sendto(sd, &status_msg, sizeof(status_msg), MSG_DONTWAIT, (sockaddr *)&rem_addr, sizeof(rem_addr)) < 0 ) {
if (sendto(sd, &status_msg, sizeof(status_msg), MSG_DONTWAIT, (sockaddr *)&rem_addr, sizeof(rem_addr)) < 0) {
//if ( errno != EAGAIN )
{
Error("Can't sendto on sd %d: %s", sd, strerror(errno));
@ -730,7 +815,7 @@ bool EventStream::sendFrame(Microseconds delta_us) {
image = new Image(filepath.c_str());
} else if ( ffmpeg_input ) {
// Get the frame from the mp4 input
FrameData *frame_data = &event_data->frames[curr_frame_id-1];
const FrameData *frame_data = &event_data->frames[curr_frame_id-1];
AVFrame *frame =
ffmpeg_input->get_frame(ffmpeg_input->get_video_stream_id(), FPSeconds(frame_data->offset).count());
if (frame) {
@ -831,9 +916,6 @@ void EventStream::runStream() {
}
updateFrameRate(fps);
start = std::chrono::steady_clock::now();
SystemTimePoint::duration last_frame_offset = Seconds(0);
SystemTimePoint::duration time_to_event = Seconds(0);
std::thread command_processor;
@ -841,222 +923,207 @@ void EventStream::runStream() {
command_processor = std::thread(&EventStream::checkCommandQueue, this);
}
while ( !zm_terminate ) {
now = std::chrono::steady_clock::now();
// Has to go here, at the moment, for sendFrame(delta).
Microseconds delta = Microseconds(0);
Microseconds delta = Microseconds(0);
send_frame = false;
while (!zm_terminate) {
start = std::chrono::steady_clock::now();
// Get current frame data
FrameData *frame_data = &event_data->frames[curr_frame_id-1];
{
std::scoped_lock lck{mutex};
if ( !paused ) {
// Figure out if we should send this frame
Debug(3, "not paused at cur_frame_id (%d-1) mod frame_mod(%d)", curr_frame_id, frame_mod);
// If we are streaming and this frame is due to be sent
// frame mod defaults to 1 and if we are going faster than max_fps will get multiplied by 2
// so if it is 2, then we send every other frame, if is it 4 then every fourth frame, etc.
send_frame = false;
if ( (frame_mod == 1) || (((curr_frame_id-1)%frame_mod) == 0) ) {
if (!paused) {
// Figure out if we should send this frame
Debug(3, "not paused at cur_frame_id (%d-1) mod frame_mod(%d)", curr_frame_id, frame_mod);
// If we are streaming and this frame is due to be sent
// frame mod defaults to 1 and if we are going faster than max_fps will get multiplied by 2
// so if it is 2, then we send every other frame, if is it 4 then every fourth frame, etc.
//if ( (frame_mod == 1) || (((curr_frame_id-1)%frame_mod) == 0) ) {
send_frame = true;
//}
} else if (step != 0) {
Debug(2, "Paused with step %d", step);
// We are paused and are just stepping forward or backward one frame
step = 0;
send_frame = true;
}
} else if ( step != 0 ) {
Debug(2, "Paused with step %d", step);
// We are paused and are just stepping forward or backward one frame
step = 0;
send_frame = true;
} else if ( !send_frame ) {
// We are paused, not stepping and doing nothing, meaning that comms didn't set send_frame to true
if (now - last_frame_sent > MAX_STREAM_DELAY) {
// Send keepalive
Debug(2, "Sending keepalive frame");
send_frame = true;
}
} // end if streaming stepping or doing nothing
} else if (!send_frame) {
// We are paused, not stepping and doing nothing, meaning that comms didn't set send_frame to true
if (now - last_frame_sent > MAX_STREAM_DELAY) {
// Send keepalive
Debug(2, "Sending keepalive frame");
send_frame = true;
}
} // end if streaming stepping or doing nothing
// time_to_event > 0 means that we are not in the event
if (time_to_event > Seconds(0) and mode == MODE_ALL) {
TimePoint::duration time_since_last_send = now - last_frame_sent;
Debug(1, "Time since last send = %.2f s", FPSeconds(time_since_last_send).count());
if (time_since_last_send > Seconds(1)) {
char frame_text[64];
// time_to_event > 0 means that we are not in the event
if (time_to_event > Seconds(0) and mode == MODE_ALL) {
TimePoint::duration time_since_last_send = now - last_frame_sent;
Debug(1, "Time since last send = %.2f s", FPSeconds(time_since_last_send).count());
if (time_since_last_send > Seconds(1)) {
char frame_text[64];
snprintf(frame_text, sizeof(frame_text), "Time to %s event = %f s",
(replay_rate > 0 ? "next" : "previous"),
FPSeconds(time_to_event).count());
snprintf(frame_text, sizeof(frame_text), "Time to %s event = %f s",
(replay_rate > 0 ? "next" : "previous"),
FPSeconds(time_to_event).count());
if (!sendTextFrame(frame_text)) {
zm_terminate = true;
if (!sendTextFrame(frame_text)) {
zm_terminate = true;
}
send_frame = false; // In case keepalive was set
}
send_frame = false; // In case keepalive was set
}
// FIXME ICON But we are not paused. We are somehow still in the event?
Milliseconds sleep_time = std::chrono::duration_cast<Milliseconds>(
(replay_rate > 0 ? 1 : -1) * ((1.0L * replay_rate * STREAM_PAUSE_WAIT) / ZM_RATE_BASE));
if (sleep_time == Seconds(0)) {
sleep_time += STREAM_PAUSE_WAIT;
}
// FIXME ICON But we are not paused. We are somehow still in the event?
Milliseconds sleep_time = std::chrono::duration_cast<Milliseconds>(
(replay_rate > 0 ? 1 : -1) * ((1.0L * replay_rate * STREAM_PAUSE_WAIT) / ZM_RATE_BASE));
//double sleep_time = (replay_rate * STREAM_PAUSE_WAIT)/(ZM_RATE_BASE * 1000000);
//// ZM_RATE_BASE == 100, and 1x replay_rate is 100
//double sleep_time = ((replay_rate/ZM_RATE_BASE) * STREAM_PAUSE_WAIT)/1000000;
if (sleep_time == Seconds(0)) {
sleep_time += STREAM_PAUSE_WAIT;
}
curr_stream_time += sleep_time;
time_to_event -= sleep_time;
Debug(2, "Sleeping (%" PRIi64 " ms) because we are not at the next event yet, adding %" PRIi64 " ms",
curr_stream_time += sleep_time;
time_to_event -= sleep_time;
Debug(2, "Sleeping (%" PRIi64 " ms) because we are not at the next event yet, adding %" PRIi64 " ms",
static_cast<int64>(Milliseconds(STREAM_PAUSE_WAIT).count()),
static_cast<int64>(Milliseconds(sleep_time).count()));
std::this_thread::sleep_for(STREAM_PAUSE_WAIT);
std::this_thread::sleep_for(STREAM_PAUSE_WAIT);
//curr_stream_time += (1.0L * replay_rate * STREAM_PAUSE_WAIT)/(ZM_RATE_BASE * 1000000);
//}
continue;
} // end if !in_event
continue;
} // end if !in_event
} // end scope for mutex lock
if (send_frame) {
if (!sendFrame(delta)) {
zm_terminate = true;
break;
}
if (send_twice and !sendFrame(delta)) {
zm_terminate = true;
break;
}
}
curr_stream_time = frame_data->timestamp;
{
std::scoped_lock lck{mutex};
if (!paused) {
// delta is since the last frame
delta = std::chrono::duration_cast<Microseconds>(frame_data->delta);
Debug(3, "frame delta %" PRIi64 "us ",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()));
if (!paused) {
// Get current frame data, curr_frame_id may have changed
FrameData *last_frame_data = &event_data->frames[curr_frame_id-1];
curr_stream_time = last_frame_data->timestamp;
curr_frame_id += (replay_rate > 0 ? frame_mod : -1*frame_mod);
// if effective > base we should speed up frame delivery
if (base_fps < effective_fps) {
delta = std::chrono::duration_cast<Microseconds>((delta * base_fps) / effective_fps);
Debug(3, "delta %" PRIi64 " us = base_fps (%f) / effective_fps (%f)",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()),
base_fps,
effective_fps);
// but must not exceed maxfps
delta = std::max(delta, Microseconds(lround(Microseconds::period::den / maxfps)));
Debug(3, "delta %" PRIi64 " us = base_fps (%f) / effective_fps (%f) from 30fps",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()),
base_fps,
effective_fps);
}
// +/- 1? What if we are skipping frames?
curr_frame_id += (replay_rate>0) ? frame_mod : -1*frame_mod;
// sending the frame may have taken some time, so reload now
now = std::chrono::steady_clock::now();
// we incremented by replay_rate, so might have jumped past frame_count
if ( (mode == MODE_SINGLE) && (
(curr_frame_id < 1 )
||
(curr_frame_id >= event_data->frame_count)
)
) {
Debug(2, "Have mode==MODE_SINGLE and at end of event, looping back to start");
curr_frame_id = 1;
// Have to reset start to now when replaying
start = now;
}
if (curr_frame_id <= event_data->frame_count) {
frame_data = &event_data->frames[curr_frame_id-1];
// frame_data->delta is the time since last frame as a float in seconds
// but what if we are skipping frames? We need the distance from the last frame sent
// Also, what about reverse? needs to be absolute value
// There are two ways to go about this, not sure which is correct.
// you can calculate the relationship between now and the start
// or calc the relationship from the last frame. I think from the start is better as it self-corrects
//
if (last_frame_offset != Seconds(0)) {
// We assume that we are going forward and the next frame is in the future.
delta = std::chrono::duration_cast<Microseconds>(frame_data->offset - (now - start));
Debug(2, "New delta: now - start = %" PRIu64 " us offset %" PRIi64 " us- elapsed = %" PRIu64 " us",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(now - start).count()),
static_cast<int64>(std::chrono::duration_cast<Microseconds>(frame_data->offset).count()),
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()));
} else {
Debug(2, "No last frame_offset, no sleep");
delta = Seconds(0);
// we incremented by replay_rate, so might have jumped past frame_count
if ( (mode == MODE_SINGLE) && (
(curr_frame_id < 1 )
||
(curr_frame_id >= event_data->frame_count)
)
) {
Debug(2, "Have mode==MODE_SINGLE and at end of event, looping back to start");
curr_frame_id = 1;
}
last_frame_offset = frame_data->offset;
if (send_frame && type != STREAM_MPEG) {
if (delta != Seconds(0)) {
if (delta > MAX_SLEEP) {
Debug(1, "Limiting sleep to %" PRIi64 " ms because calculated sleep is too long: %" PRIi64" us",
static_cast<int64>(std::chrono::duration_cast<Milliseconds>(MAX_SLEEP).count()),
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()));
delta = MAX_SLEEP;
}
if (curr_frame_id <= event_data->frame_count) {
const FrameData *next_frame_data = &event_data->frames[curr_frame_id-1];
Debug(3, "Have Frame %d %d timestamp (%f s), offset (%f s) delta (%f s), in_db (%d)",
curr_frame_id, next_frame_data->id,
FPSeconds(next_frame_data->timestamp.time_since_epoch()).count(),
FPSeconds(next_frame_data->offset).count(),
FPSeconds(next_frame_data->delta).count(),
next_frame_data->in_db);
std::this_thread::sleep_for(delta);
Debug(3, "Done sleeping: %" PRIi64 " us",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()));
// frame_data->delta is the time since last frame as a float in seconds
// but what if we are skipping frames? We need the distance from the last frame sent
// Also, what about reverse? needs to be absolute value
delta = abs(next_frame_data->offset - last_frame_data->offset);
Debug(2, "New delta: %fs from last frame offset %fs - next_frame_offset %fs",
FPSeconds(delta).count(),
FPSeconds(last_frame_data->offset).count(),
FPSeconds(next_frame_data->offset).count());
// if effective > base we should speed up frame delivery
if (base_fps < effective_fps) {
delta = std::chrono::duration_cast<Microseconds>((delta * base_fps) / effective_fps);
Debug(3, "delta %" PRIi64 " us = base_fps (%f) / effective_fps (%f)",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()),
base_fps,
effective_fps);
// but must not exceed maxfps
delta = std::max(delta, Microseconds(lround(Microseconds::period::den / maxfps)));
Debug(3, "delta %" PRIi64 " us = base_fps (%f) / effective_fps (%f) from 30fps",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()),
base_fps,
effective_fps);
}
} // end if need to sleep
now = std::chrono::steady_clock::now();
TimePoint::duration elapsed = now - start;
delta -= std::chrono::duration_cast<Milliseconds>(elapsed); // sending frames takes time, so remove it from the sleep time
Debug(2, "New delta: %fs from last frame offset %fs - next_frame_offset %fs - elapsed %fs",
FPSeconds(delta).count(),
FPSeconds(last_frame_data->offset).count(),
FPSeconds(next_frame_data->offset).count(),
FPSeconds(elapsed).count()
);
} else {
Debug(1, "invalid curr_frame_id %d !< %d", curr_frame_id, event_data->frame_count);
} // end if not at end of event
} else {
Debug(1, "invalid curr_frame_id %d !< %d", curr_frame_id, event_data->frame_count);
} // end if not at end of event
} else {
// Paused
delta = std::chrono::duration_cast<Microseconds>(FPSeconds(
ZM_RATE_BASE / ((base_fps ? base_fps : 1) * (replay_rate ? abs(replay_rate * 2) : 2))));
// Paused
delta = MAX_SLEEP;
Debug(2, "Sleeping %" PRIi64 " us because ZM_RATE_BASE (%d) / ( base_fps (%f) * replay_rate (%d)",
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()),
ZM_RATE_BASE,
(base_fps ? base_fps : 1),
(replay_rate ? abs(replay_rate * 2) : 0));
// We are paused, so might be stepping
//if ( step != 0 )// Adding 0 is cheaper than an if 0
// curr_frame_id starts at 1 though, so we might skip the first frame?
curr_frame_id += step;
} // end if !paused
} // end scope for mutex lock
if (delta != Seconds(0)) {
if (send_frame && type != STREAM_MPEG) {
if (delta > Seconds(0)) {
if (delta > MAX_SLEEP) {
Debug(1, "Limiting sleep to %" PRIi64 " ms because calculated sleep is too long %" PRIi64,
static_cast<int64>(std::chrono::duration_cast<Milliseconds>(MAX_SLEEP).count()),
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()));
static_cast<int64>(std::chrono::duration_cast<Milliseconds>(MAX_SLEEP).count()),
static_cast<int64>(std::chrono::duration_cast<Microseconds>(delta).count()));
delta = MAX_SLEEP;
}
std::this_thread::sleep_for(delta);
}
// We are paused, so might be stepping
//if ( step != 0 )// Adding 0 is cheaper than an if 0
// curr_frame_id starts at 1 though, so we might skip the first frame?
curr_frame_id += step;
} // end if !paused
} // end if need to sleep
}
// Detects when we hit end of event and will load the next event or previous event
if ( checkEventLoaded() ) {
// Have change of event
{
std::scoped_lock lck{mutex};
// Detects when we hit end of event and will load the next event or previous event
if (checkEventLoaded()) {
// Have change of event
// This next bit is to determine if we are in the current event time wise
// and whether to show an image saying how long until the next event.
if ( replay_rate > 0 ) {
// This doesn't make sense unless we have hit the end of the event.
time_to_event = event_data->frames[0].timestamp - curr_stream_time;
Debug(1, "replay rate (%d) time_to_event (%f s) = frame timestamp (%f s) - curr_stream_time (%f s)",
// This next bit is to determine if we are in the current event time wise
// and whether to show an image saying how long until the next event.
if (replay_rate > 0) {
// This doesn't make sense unless we have hit the end of the event.
time_to_event = event_data->frames[0].timestamp - curr_stream_time;
Debug(1, "replay rate (%d) time_to_event (%f s) = frame timestamp (%f s) - curr_stream_time (%f s)",
replay_rate,
FPSeconds(time_to_event).count(),
FPSeconds(event_data->frames[0].timestamp.time_since_epoch()).count(),
FPSeconds(curr_stream_time.time_since_epoch()).count());
} else if ( replay_rate < 0 ) {
time_to_event = curr_stream_time - event_data->frames[event_data->frame_count-1].timestamp;
Debug(1, "replay rate (%d), time_to_event(%f s) = curr_stream_time (%f s) - frame timestamp (%f s)",
} else if (replay_rate < 0) {
time_to_event = curr_stream_time - event_data->frames[event_data->frame_count-1].timestamp;
Debug(1, "replay rate (%d), time_to_event(%f s) = curr_stream_time (%f s) - frame timestamp (%f s)",
replay_rate,
FPSeconds(time_to_event).count(),
FPSeconds(curr_stream_time.time_since_epoch()).count(),
FPSeconds(event_data->frames[event_data->frame_count - 1].timestamp.time_since_epoch()).count());
} // end if forward or reverse
} // end if checkEventLoaded
} // end if forward or reverse
} // end if checkEventLoaded
} // end scope for lock
} // end while ! zm_terminate
if (type == STREAM_MPEG) {
delete vid_stream;
}

View File

@ -32,6 +32,8 @@ extern "C" {
#include <libavcodec/avcodec.h>
}
#include <mutex>
class EventStream : public StreamBase {
public:
typedef enum { MODE_NONE, MODE_SINGLE, MODE_ALL, MODE_ALL_GAPLESS } StreamMode;
@ -39,11 +41,20 @@ class EventStream : public StreamBase {
protected:
struct FrameData {
//unsigned long id;
unsigned int id;
SystemTimePoint timestamp;
Microseconds offset;
Microseconds delta;
Microseconds offset; // distance from event->starttime
Microseconds delta; // distance from last frame
bool in_db;
public:
FrameData(unsigned int p_id, SystemTimePoint p_timestamp, Microseconds p_offset, Microseconds p_delta, bool p_in_db) :
id(p_id),
timestamp(p_timestamp),
offset(p_offset),
delta(p_delta),
in_db(p_in_db)
{
}
};
struct EventData {
@ -58,7 +69,7 @@ class EventStream : public StreamBase {
Microseconds frames_duration;
std::string path;
int n_frames; // # of frame rows returned from database
FrameData *frames;
std::vector<FrameData> frames;
std::string video_file;
Storage::Schemes scheme;
int SaveJPEGs;
@ -73,6 +84,7 @@ class EventStream : public StreamBase {
StreamMode mode;
bool forceEventChange;
std::mutex mutex;
int curr_frame_id;
SystemTimePoint curr_stream_time;
bool send_frame;
@ -101,22 +113,9 @@ class EventStream : public StreamBase {
{}
~EventStream() {
if ( event_data ) {
if ( event_data->frames ) {
delete[] event_data->frames;
event_data->frames = nullptr;
}
delete event_data;
event_data = nullptr;
}
if ( storage ) {
delete storage;
storage = nullptr;
}
if ( ffmpeg_input ) {
delete ffmpeg_input;
ffmpeg_input = nullptr;
}
delete event_data;
delete storage;
delete ffmpeg_input;
}
void setStreamStart(uint64_t init_event_id, int init_frame_id);
void setStreamStart(int monitor_id, time_t event_time);

View File

@ -136,7 +136,7 @@ int main(int argc, char *argv[]) {
HwCapsDetect();
std::string where = "`Function` != 'None' AND `RTSPServer` != false";
std::string where = "`Capturing` != 'None' AND `RTSPServer` != false";
if (staticConfig.SERVER_ID)
where += stringtf(" AND `ServerId`=%d", staticConfig.SERVER_ID);
if (monitor_id > 0)

View File

@ -125,6 +125,7 @@ protected:
int lock_fd;
bool paused;
int step;
bool send_twice; // flag to send the same frame twice
TimePoint now;
TimePoint last_comm_update;