diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c6a9c04c7..37b08ad58 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,7 @@ set(ZM_BIN_SRC_FILES zm_crypt.cpp zm.cpp zm_db.cpp + zm_decoder_thread.cpp zm_logger.cpp zm_event.cpp zm_eventstream.cpp diff --git a/src/zm_decoder_thread.cpp b/src/zm_decoder_thread.cpp new file mode 100644 index 000000000..e8023e905 --- /dev/null +++ b/src/zm_decoder_thread.cpp @@ -0,0 +1,54 @@ +#include "zm_decoder_thread.h" + +#include "zm_monitor.h" +#include "zm_signal.h" +#include "zm_utils.h" + +//DecoderThread::DecoderThread(std::shared_ptr monitor) : +DecoderThread::DecoderThread(Monitor * monitor) : + monitor_(monitor), terminate_(false) { + //monitor_(std::move(monitor)), terminate_(false) { + thread_ = std::thread(&DecoderThread::Run, this); +} + +DecoderThread::~DecoderThread() { + Stop(); + if (thread_.joinable()) + thread_.join(); +} + +void DecoderThread::Run() { + Debug(2, "DecoderThread::Run() for %d", monitor_->Id()); + + //Microseconds decoder_rate = Microseconds(monitor_->GetDecoderRate()); + //Seconds decoder_update_delay = Seconds(monitor_->GetDecoderUpdateDelay()); + //Debug(2, "DecoderThread::Run() have update delay %d", decoder_update_delay); + + //TimePoint last_decoder_update_time = std::chrono::steady_clock::now(); + //TimePoint cur_time; + + while (!(terminate_ or zm_terminate)) { + // Some periodic updates are required for variable capturing framerate + //if (decoder_update_delay != Seconds::zero()) { + //cur_time = std::chrono::steady_clock::now(); + //Debug(2, "Updating adaptive skip"); + //if ((cur_time - last_decoder_update_time) > decoder_update_delay) { + //decoder_rate = Microseconds(monitor_->GetDecoderRate()); + //last_decoder_update_time = cur_time; + //} + //} + + if (!monitor_->Decode()) { + //if ( !(terminate_ or zm_terminate) ) { + //Microseconds sleep_for = monitor_->Active() ? Microseconds(ZM_SAMPLE_RATE) : Microseconds(ZM_SUSPENDED_RATE); + //Debug(2, "Sleeping for %" PRId64 "us", int64(sleep_for.count())); + //std::this_thread::sleep_for(sleep_for); + //} + //} else if (decoder_rate != Microseconds::zero()) { + //Debug(2, "Sleeping for %" PRId64 " us", int64(decoder_rate.count())); + //std::this_thread::sleep_for(decoder_rate); + //} else { + //Debug(2, "Not sleeping"); + } + } +} diff --git a/src/zm_decoder_thread.h b/src/zm_decoder_thread.h new file mode 100644 index 000000000..703a7e1e1 --- /dev/null +++ b/src/zm_decoder_thread.h @@ -0,0 +1,29 @@ +#ifndef ZM_DECODER_THREAD_H +#define ZM_DECODER_THREAD_H + +#include +#include +#include + +class Monitor; + +class DecoderThread { + public: + explicit DecoderThread(Monitor* monitor); + //explicit DecoderThread(std::shared_ptr monitor); + ~DecoderThread(); + DecoderThread(DecoderThread &rhs) = delete; + DecoderThread(DecoderThread &&rhs) = delete; + + void Stop() { terminate_ = true; } + + private: + void Run(); + + Monitor* monitor_; + //std::shared_ptr monitor_; + std::atomic terminate_; + std::thread thread_; +}; + +#endif diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index 88028ca32..0ea6b5fb6 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -395,6 +395,8 @@ Monitor::Monitor() storage(nullptr), videoStore(nullptr), analysis_it(nullptr), + decoder_it(nullptr), + decoder(nullptr), n_zones(0), zones(nullptr), privacy_bitmask(nullptr), @@ -1802,22 +1804,22 @@ void Monitor::UpdateAnalysisFPS() { // If there isn't then we keep pre-event + alarm frames. = pre_event_count bool Monitor::Analyse() { - if ( !Enabled() ) { + if (!Enabled()) { Warning("Shouldn't be doing Analyse when not Enabled"); return false; } - if ( !analysis_it ) - analysis_it = packetqueue.get_video_it(true); + if (!analysis_it) analysis_it = packetqueue.get_video_it(true); // if have event, send frames until we find a video packet, at which point do analysis. Adaptive skip should only affect which frames we do analysis on. // get_analysis_packet will lock the packet and may wait if analysis_it is at the end - ZMPacket *snap = packetqueue.get_packet(analysis_it); - if ( !snap ) return false; + ZMLockedPacket *packet_lock = packetqueue.get_packet(analysis_it); + if (!packet_lock) return false; + ZMPacket *snap = packet_lock->packet_; // Is it possible for snap->score to be ! -1 ? Not if everything is working correctly - if ( snap->score != -1 ) { - snap->unlock(); + if (snap->score != -1) { + delete packet_lock; packetqueue.increment_it(analysis_it); Error("skipping because score was %d", snap->score); return false; @@ -1837,25 +1839,24 @@ bool Monitor::Analyse() { std::lock_guard lck(event_mutex); // if we have been told to be OFF, then we are off and don't do any processing. - if ( trigger_data->trigger_state != TriggerState::TRIGGER_OFF ) { + if (trigger_data->trigger_state != TriggerState::TRIGGER_OFF) { Debug(4, "Trigger not OFF state is (%d)", int(trigger_data->trigger_state)); int score = 0; // Ready means that we have captured the warmup # of frames - if ( !Ready() ) { + if (!Ready()) { Debug(3, "Not ready?"); - snap->unlock(); + delete packet_lock; return false; } - Debug(4, "Ready"); std::string cause; Event::StringSetMap noteSetMap; // Specifically told to be on. Setting the score here will trigger the alarm. - if ( trigger_data->trigger_state == TriggerState::TRIGGER_ON ) { + if (trigger_data->trigger_state == TriggerState::TRIGGER_ON) { score += trigger_data->trigger_score; Debug(1, "Triggered on score += %d => %d", trigger_data->trigger_score, score); - if ( !event ) { + if (!event) { cause += trigger_data->trigger_cause; } Event::StringSet noteSet; @@ -1863,12 +1864,13 @@ bool Monitor::Analyse() { noteSetMap[trigger_data->trigger_cause] = noteSet; } // end if trigger_on - if ( signal_change ) { + // FIXME this snap might not be the one that caused the signal change. Need to store that in the packet. + if (signal_change) { Debug(2, "Signal change, new signal is %d", signal); const char *signalText = "Unknown"; - if ( !signal ) { + if (!signal) { signalText = "Lost"; - if ( event ) { + if (event) { Info("%s: %03d - Closing event %" PRIu64 ", signal loss", name, analysis_image_count, event->Id()); closeEvent(); last_section_mod = 0; @@ -1877,9 +1879,8 @@ bool Monitor::Analyse() { signalText = "Reacquired"; score += 100; } - if ( !event ) { - if ( cause.length() ) - cause += ", "; + if (!event) { + if (cause.length()) cause += ", "; cause += SIGNAL_CAUSE; } Event::StringSet noteSet; @@ -1887,15 +1888,26 @@ bool Monitor::Analyse() { noteSetMap[SIGNAL_CAUSE] = noteSet; shared_data->state = state = IDLE; shared_data->active = signal; - if ( (function == MODECT or function == MOCORD) and snap->image ) + if ((function == MODECT or function == MOCORD) and snap->image) ref_image.Assign(*(snap->image)); }// else if (signal) { - if (snap->image or (snap->codec_type == AVMEDIA_TYPE_VIDEO)) { + if (snap->codec_type == AVMEDIA_TYPE_VIDEO) { + while (!snap->image and !snap->decoded) { + // Need to wait for the decoder thread. + Debug(1, "Waiting for decode"); + packet_lock->wait(); + if (!snap->image and snap->decoded) { + Debug(1, "No image but was decoded, giving up"); + delete packet_lock; + return false; + } + } // end while ! decoded + struct timeval *timestamp = snap->timestamp; - if ( Active() and (function == MODECT or function == MOCORD) and snap->image ) { + if (Active() and (function == MODECT or function == MOCORD)) { Debug(3, "signal and active and modect"); Event::StringSet zoneSet; @@ -1999,22 +2011,32 @@ bool Monitor::Analyse() { ); // This gets a lock on the starting packet - ZMPacket *starting_packet = packetqueue.get_packet(start_it); + + ZMLockedPacket *starting_packet_lock = nullptr; + ZMPacket *starting_packet = nullptr; + if ( *start_it != snap_it ) { + starting_packet_lock = packetqueue.get_packet(start_it); + if (!starting_packet_lock) return false; + starting_packet = starting_packet_lock->packet_; + } else { + starting_packet = snap; + } event = new Event(this, *(starting_packet->timestamp), "Continuous", noteSetMap); // Write out starting packets, do not modify packetqueue it will garbage collect itself - while ( starting_packet and (*start_it) != snap_it ) { + while ( starting_packet and ((*start_it) != snap_it) ) { event->AddPacket(starting_packet); // Have added the packet, don't want to unlock it until we have locked the next packetqueue.increment_it(start_it); if ( (*start_it) == snap_it ) { - starting_packet->unlock(); + if (starting_packet_lock) delete starting_packet_lock; break; } - ZMPacket *p = packetqueue.get_packet(start_it); - starting_packet->unlock(); - starting_packet = p; + ZMLockedPacket *lp = packetqueue.get_packet(start_it); + delete starting_packet_lock; + starting_packet_lock = lp; + starting_packet = lp->packet_; } packetqueue.free_it(start_it); delete start_it; @@ -2030,9 +2052,7 @@ bool Monitor::Analyse() { for ( int i=0; i < n_zones; i++ ) { if ( zones[i]->Alarmed() ) { alarm_cause += std::string(zones[i]->Label()); - if ( i < n_zones-1 ) { - alarm_cause += ","; - } + if (i < n_zones-1) alarm_cause += ","; } } alarm_cause = cause+" "+alarm_cause; @@ -2085,7 +2105,16 @@ bool Monitor::Analyse() { snap_it, (pre_event_count > alarm_frame_count ? pre_event_count : alarm_frame_count) ); - ZMPacket *starting_packet = *(*start_it); + + ZMLockedPacket *starting_packet_lock = nullptr; + ZMPacket *starting_packet = nullptr; + if ( *start_it != snap_it ) { + starting_packet_lock = packetqueue.get_packet(start_it); + if (!starting_packet_lock) return false; + starting_packet = starting_packet_lock->packet_; + } else { + starting_packet = snap; + } event = new Event(this, *(starting_packet->timestamp), cause, noteSetMap); shared_data->last_event_id = event->Id(); @@ -2099,12 +2128,13 @@ bool Monitor::Analyse() { packetqueue.increment_it(start_it); if ( (*start_it) == snap_it ) { - starting_packet->unlock(); + if (starting_packet_lock) delete starting_packet_lock; break; } - ZMPacket *p = packetqueue.get_packet(start_it); - starting_packet->unlock(); - starting_packet = p; + ZMLockedPacket *lp = packetqueue.get_packet(start_it); + delete starting_packet_lock; + starting_packet_lock = lp; + starting_packet = lp->packet_; } packetqueue.free_it(start_it); delete start_it; @@ -2255,27 +2285,9 @@ bool Monitor::Analyse() { } // end if ( trigger_data->trigger_state != TRIGGER_OFF ) if (event) event->AddPacket(snap); -#if 0 - if (snap->packet.stream_index == video_stream_id) { - if (video_fifo) { - if ( snap->keyframe ) { - // avcodec strips out important nals that describe the stream and - // stick them in extradata. Need to send them along with keyframes - AVStream *stream = camera->getVideoStream(); - video_fifo->write( - static_cast(stream->codecpar->extradata), - stream->codecpar->extradata_size); - } - video_fifo->writePacket(*snap); - } - } else if (snap->packet.stream_index == audio_stream_id) { - if (audio_fifo) - audio_fifo->writePacket(*snap); - } -#endif // popPacket will have placed a second lock on snap, so release it here. - snap->unlock(); + delete packet_lock; if ( snap->image_index > 0 ) { // Only do these if it's a video packet. @@ -2507,7 +2519,6 @@ int Monitor::Capture() { gettimeofday(packet->timestamp, nullptr); shared_data->zmc_heartbeat_time = packet->timestamp->tv_sec; - Image* capture_image = image_buffer[index].image; int captureResult = 0; if ( deinterlacing_value == 4 ) { @@ -2536,7 +2547,7 @@ int Monitor::Capture() { Rgb signalcolor; /* HTML colour code is actually BGR in memory, we want RGB */ signalcolor = rgb_convert(signal_check_colour, ZM_SUBPIX_ORDER_BGR); - capture_image = new Image(width, height, camera->Colours(), camera->SubpixelOrder()); + Image *capture_image = new Image(width, height, camera->Colours(), camera->SubpixelOrder()); capture_image->Fill(signalcolor); shared_data->signal = false; shared_data->last_write_index = index; @@ -2592,6 +2603,7 @@ int Monitor::Capture() { return 1; } // end if audio +#if 0 if ( !packet->image ) { if ( packet->packet.size and !packet->in_frame ) { if ( !decoding_enabled ) { @@ -2678,6 +2690,7 @@ int Monitor::Capture() { shared_data->signal = ( capture_image and signal_check_points ) ? CheckSignal(capture_image) : true; shared_data->last_write_index = index; shared_data->last_write_time = packet->timestamp->tv_sec; +#endif image_count++; // Will only be queued if there are iterators allocated in the queue. @@ -2710,6 +2723,101 @@ int Monitor::Capture() { return captureResult; } // end Monitor::Capture +bool Monitor::Decode() { + if (!decoder_it) decoder_it = packetqueue.get_video_it(true); + + ZMLockedPacket *packet_lock = packetqueue.get_packet(decoder_it); + if (!packet_lock) return false; + ZMPacket *packet = packet_lock->packet_; + packetqueue.increment_it(decoder_it); + if (packet->image or (packet->codec_type != AVMEDIA_TYPE_VIDEO)) { + delete packet_lock; + return true; // Don't need decode + } + + int ret = 0; + if (packet->packet.size and !packet->in_frame) { + // Allocate the image first so that it can be used by hwaccel + // We don't actually care about camera colours, pixel order etc. We care about the desired settings + // + //capture_image = packet->image = new Image(width, height, camera->Colours(), camera->SubpixelOrder()); + ret = packet->decode(camera->getVideoCodecContext()); + } else { + Debug(1, "No packet.size(%d) or packet->in_frame(%p). Not decoding", packet->packet.size, packet->in_frame); + } + Image* capture_image = nullptr; + unsigned int index = image_count % image_buffer_count; + if (ret > 0) { + if (packet->in_frame and !packet->image) { + packet->image = new Image(camera_width, camera_height, camera->Colours(), camera->SubpixelOrder()); + packet->get_image(); + } + + if (packet->image) { + capture_image = packet->image; + + /* Deinterlacing */ + if ( deinterlacing_value ) { + if ( deinterlacing_value == 1 ) { + capture_image->Deinterlace_Discard(); + } else if ( deinterlacing_value == 2 ) { + capture_image->Deinterlace_Linear(); + } else if ( deinterlacing_value == 3 ) { + capture_image->Deinterlace_Blend(); + } else if ( deinterlacing_value == 4 ) { + capture_image->Deinterlace_4Field(next_buffer.image, (deinterlacing>>8)&0xff); + } else if ( deinterlacing_value == 5 ) { + capture_image->Deinterlace_Blend_CustomRatio((deinterlacing>>8)&0xff); + } + } + + if ( orientation != ROTATE_0 ) { + Debug(2, "Doing rotation"); + switch ( orientation ) { + case ROTATE_0 : + // No action required + break; + case ROTATE_90 : + case ROTATE_180 : + case ROTATE_270 : + capture_image->Rotate((orientation-1)*90); + break; + case FLIP_HORI : + case FLIP_VERT : + capture_image->Flip(orientation==FLIP_HORI); + break; + } + } // end if have rotation + + if (privacy_bitmask) { + Debug(1, "Applying privacy"); + capture_image->MaskPrivacy(privacy_bitmask); + } + + if (config.timestamp_on_capture) { + Debug(1, "Timestampprivacy"); + TimestampImage(packet->image, packet->timestamp); + } + + if (!ref_image.Buffer()) { + // First image, so assign it to ref image + Debug(1, "Assigning ref image %dx%d size: %d", width, height, camera->ImageSize()); + ref_image.Assign(width, height, camera->Colours(), camera->SubpixelOrder(), + packet->image->Buffer(), camera->ImageSize()); + } + image_buffer[index].image->Assign(*(packet->image)); + *(image_buffer[index].timestamp) = *(packet->timestamp); + } // end if have image + } // end if did decoding + packet->decoded = true; + delete packet_lock; + + shared_data->signal = ( capture_image and signal_check_points ) ? CheckSignal(capture_image) : true; + shared_data->last_write_index = index; + shared_data->last_write_time = packet->timestamp->tv_sec; + return true; +} // end bool Monitor::Decode() + void Monitor::TimestampImage(Image *ts_image, const struct timeval *ts_time) const { if ( !label_format[0] ) return; @@ -3026,6 +3134,7 @@ int Monitor::PrimeCapture() { audio_fifo = new Fifo(shared_data->audio_fifo_path, true); } } // end if rtsp_server + if (decoding_enabled) decoder = new DecoderThread(this); } else { Debug(2, "Failed to prime %d", ret); } @@ -3035,6 +3144,8 @@ int Monitor::PrimeCapture() { int Monitor::PreCapture() const { return camera->PreCapture(); } int Monitor::PostCapture() const { return camera->PostCapture(); } int Monitor::Close() { + decoder->Stop(); + delete decoder; std::lock_guard lck(event_mutex); if (event) { Info("%s: image_count:%d - Closing event %" PRIu64 ", shutting down", name, image_count, event->Id()); @@ -3051,25 +3162,25 @@ Monitor::Orientation Monitor::getOrientation() const { return orientation; } // So this should be done as the first task in the analysis thread startup. // This function is deprecated. void Monitor::get_ref_image() { - ZMPacket *snap = nullptr; + ZMLockedPacket *snap_lock = nullptr; if ( !analysis_it ) analysis_it = packetqueue.get_video_it(true); while ( ( - !( snap = packetqueue.get_packet(analysis_it)) + !( snap_lock = packetqueue.get_packet(analysis_it)) or - ( snap->codec_type != AVMEDIA_TYPE_VIDEO ) + ( snap_lock->packet_->codec_type != AVMEDIA_TYPE_VIDEO ) or - ! snap->image + ! snap_lock->packet_->image ) and !zm_terminate) { Debug(1, "Waiting for capture daemon lastwriteindex(%d) lastwritetime(%d)", shared_data->last_write_index, shared_data->last_write_time); - if ( snap and ! snap->image ) { - snap->unlock(); + if ( snap_lock and ! snap_lock->packet_->image ) { + delete snap_lock; // can't analyse it anyways, incremement packetqueue.increment_it(analysis_it); } @@ -3078,6 +3189,7 @@ void Monitor::get_ref_image() { if ( zm_terminate ) return; + ZMPacket *snap = snap_lock->packet_; Debug(1, "get_ref_image: packet.stream %d ?= video_stream %d, packet image id %d packet image %p", snap->packet.stream_index, video_stream_id, snap->image_index, snap->image ); // Might not have been decoded yet FIXME @@ -3088,7 +3200,7 @@ void Monitor::get_ref_image() { } else { Debug(2, "Have no ref image about to unlock"); } - snap->unlock(); + delete snap_lock; } std::vector Monitor::Groups() { diff --git a/src/zm_monitor.h b/src/zm_monitor.h index 20a59d79c..91d12dd24 100644 --- a/src/zm_monitor.h +++ b/src/zm_monitor.h @@ -22,6 +22,7 @@ #include "zm_define.h" #include "zm_camera.h" +#include "zm_decoder_thread.h" #include "zm_event.h" #include "zm_fifo.h" #include "zm_image.h" @@ -375,6 +376,8 @@ protected: VideoStore *videoStore; PacketQueue packetqueue; packetqueue_iterator *analysis_it; + packetqueue_iterator *decoder_it; + DecoderThread *decoder; int n_zones; @@ -549,6 +552,7 @@ public: //unsigned int DetectBlack( const Image &comp_image, Event::StringSet &zoneSet ); bool CheckSignal( const Image *image ); bool Analyse(); + bool Decode(); void DumpImage( Image *dump_image ) const; void TimestampImage( Image *ts_image, const struct timeval *ts_time ) const; bool closeEvent(); diff --git a/src/zm_packet.cpp b/src/zm_packet.cpp index 58c2bda8c..cb37558b3 100644 --- a/src/zm_packet.cpp +++ b/src/zm_packet.cpp @@ -37,7 +37,8 @@ ZMPacket::ZMPacket() : score(-1), codec_type(AVMEDIA_TYPE_UNKNOWN), image_index(-1), - codec_imgsize(0) + codec_imgsize(0), + decoded(0) { av_init_packet(&packet); packet.size = 0; // So we can detect whether it has been filled. diff --git a/src/zm_packet.h b/src/zm_packet.h index 118239860..507547f99 100644 --- a/src/zm_packet.h +++ b/src/zm_packet.h @@ -21,6 +21,7 @@ #define ZM_PACKET_H #include "zm_logger.h" +#include #include extern "C" { @@ -36,7 +37,9 @@ class Image; class ZMPacket { public: - std::recursive_mutex mutex; + std::mutex mutex_; + std::condition_variable condition_; + int keyframe; AVStream *stream; // Input stream AVPacket packet; // Input packet, undecoded @@ -51,6 +54,7 @@ class ZMPacket { int image_index; int codec_imgsize; int64_t pts; // pts in the packet can be in another time base. This MUST be in AV_TIME_BASE_Q + bool decoded; public: AVPacket *av_packet() { return &packet; } @@ -65,21 +69,45 @@ class ZMPacket { explicit ZMPacket(ZMPacket &packet); ZMPacket(); ~ZMPacket(); - void lock() { - Debug(4,"Locking packet %d", this->image_index); - mutex.lock(); - Debug(4,"packet %d locked", this->image_index); - }; - bool trylock() { - Debug(4,"TryLocking packet %d", this->image_index); - return mutex.try_lock(); - }; - void unlock() { - Debug(4,"packet %d unlocked", this->image_index); - mutex.unlock(); - }; - AVFrame *get_out_frame( const AVCodecContext *ctx ); + + AVFrame *get_out_frame(const AVCodecContext *ctx); int get_codec_imgsize() { return codec_imgsize; }; }; +class ZMLockedPacket { + public: + ZMPacket *packet_; + std::unique_lock lck_; + + ZMLockedPacket(ZMPacket *p) : + packet_(p), + lck_(packet_->mutex_, std::defer_lock) { + } + ~ZMLockedPacket() { + unlock(); + } + + void lock() { + Debug(4, "locking packet %d", packet_->image_index); + lck_.lock(); + Debug(4, "packet %d locked", packet_->image_index); + }; + bool trylock() { + Debug(4, "TryLocking packet %d", packet_->image_index); + return lck_.try_lock(); + }; + void unlock() { + Debug(4, "packet %d unlocked", packet_->image_index); + lck_.unlock(); + packet_->condition_.notify_all(); + }; + + void wait() { + Debug(4, "packet %d waiting", packet_->image_index); + // We already have a lock, but it's a recursive mutex.. so this may be ok + packet_->condition_.wait(lck_); + } + +}; + #endif /* ZM_PACKET_H */ diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index 2ba04e71b..fe1c6f5b2 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -144,17 +144,19 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) { // First packet is special because we know it is a video keyframe and only need to check for lock ZMPacket *zm_packet = *it; - if ( zm_packet->trylock() ) { + ZMLockedPacket *lp = new ZMLockedPacket(zm_packet); + if ( lp->trylock() ) { ++it; - zm_packet->unlock(); + delete lp; // Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that while ( *it != add_packet ) { zm_packet = *it; - if ( !zm_packet->trylock() ) { + lp = new ZMLockedPacket(zm_packet); + if ( !lp->trylock() ) { break; } - zm_packet->unlock(); + delete lp; if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) { Warning("Found iterator at beginning of queue. Some thread isn't keeping up"); @@ -200,7 +202,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) { return; } // end voidPacketQueue::clearPackets(ZMPacket* zm_packet) -ZMPacket* PacketQueue::popPacket( ) { +ZMLockedPacket* PacketQueue::popPacket( ) { Debug(4, "pktQueue size %d", pktQueue.size()); if ( pktQueue.empty() ) { return nullptr; @@ -222,14 +224,15 @@ ZMPacket* PacketQueue::popPacket( ) { } } // end foreach iterator - zm_packet->lock(); + ZMLockedPacket *lp = new ZMLockedPacket (zm_packet); + lp->lock(); pktQueue.pop_front(); packet_counts[zm_packet->packet.stream_index] -= 1; mutex.unlock(); - return zm_packet; + return lp; } // popPacket @@ -325,9 +328,9 @@ void PacketQueue::clear() { while (!pktQueue.empty()) { ZMPacket *packet = pktQueue.front(); // Someone might have this packet, but not for very long and since we have locked the queue they won't be able to get another one - packet->lock(); + ZMLockedPacket lp(packet); + lp.lock(); pktQueue.pop_front(); - packet->unlock(); delete packet; } @@ -452,7 +455,7 @@ int PacketQueue::packet_count(int stream_id) { // Returns a packet. Packet will be locked -ZMPacket *PacketQueue::get_packet(packetqueue_iterator *it) { +ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) { if ( deleting or zm_terminate ) return nullptr; @@ -477,14 +480,17 @@ ZMPacket *PacketQueue::get_packet(packetqueue_iterator *it) { Error("Null p?!"); return nullptr; } + ZMLockedPacket *lp = new ZMLockedPacket(p); Debug(3, "get_packet %p image_index: %d, about to lock packet", p, p->image_index); - while ( !(zm_terminate or deleting) and !p->trylock() ) { - Debug(3, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) ); + while (!(zm_terminate or deleting) and !lp->trylock()) { + Debug(3, "waiting on index %d. Queue size %d it == end? %d", + p->image_index, pktQueue.size(), ( *it == pktQueue.end() ) ); + ZM_DUMP_PACKET(p->packet, ""); condition.wait(lck); } Debug(2, "Locked packet, unlocking packetqueue mutex"); - return p; -} // end ZMPacket *PacketQueue::get_packet(it) + return lp; +} // end ZMLockedPacket *PacketQueue::get_packet(it) bool PacketQueue::increment_it(packetqueue_iterator *it) { Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end())); @@ -612,14 +618,14 @@ packetqueue_iterator * PacketQueue::get_video_it(bool wait) { while ( *it != pktQueue.end() ) { ZMPacket *zm_packet = *(*it); - if ( !zm_packet ) { + if (!zm_packet) { Error("Null zmpacket in queue!?"); free_it(it); return nullptr; } Debug(1, "Packet keyframe %d for stream %d, so returning the it to it", zm_packet->keyframe, zm_packet->packet.stream_index); - if ( zm_packet->keyframe and ( zm_packet->packet.stream_index == video_stream_id ) ) { + if (zm_packet->keyframe and ( zm_packet->packet.stream_index == video_stream_id )) { Debug(1, "Found a keyframe for stream %d, so returning the it to it", video_stream_id); return it; } @@ -627,7 +633,7 @@ packetqueue_iterator * PacketQueue::get_video_it(bool wait) { } Debug(1, "DIdn't Found a keyframe for stream %d, so returning the it to it", video_stream_id); return it; -} +} // get video_it void PacketQueue::free_it(packetqueue_iterator *it) { for ( diff --git a/src/zm_packetqueue.h b/src/zm_packetqueue.h index c0ba045b8..e759dc777 100644 --- a/src/zm_packetqueue.h +++ b/src/zm_packetqueue.h @@ -24,6 +24,7 @@ #include class ZMPacket; +class ZMLockedPacket; typedef std::list::iterator packetqueue_iterator; @@ -52,7 +53,7 @@ class PacketQueue { void setMaxVideoPackets(int p); bool queuePacket(ZMPacket* packet); - ZMPacket * popPacket(); + ZMLockedPacket * popPacket(); bool popVideoPacket(ZMPacket* packet); bool popAudioPacket(ZMPacket* packet); unsigned int clear(unsigned int video_frames_to_keep, int stream_id); @@ -68,7 +69,7 @@ class PacketQueue { bool increment_it(packetqueue_iterator *it); bool increment_it(packetqueue_iterator *it, int stream_id); - ZMPacket *get_packet(packetqueue_iterator *); + ZMLockedPacket *get_packet(packetqueue_iterator *); packetqueue_iterator *get_video_it(bool wait); packetqueue_iterator *get_stream_it(int stream_id); void free_it(packetqueue_iterator *); diff --git a/src/zm_videostore.cpp b/src/zm_videostore.cpp index 1ffc69e90..ba19f6f57 100644 --- a/src/zm_videostore.cpp +++ b/src/zm_videostore.cpp @@ -1211,29 +1211,6 @@ int VideoStore::writeAudioFramePacket(ZMPacket *zm_packet) { return 0; } // end int VideoStore::writeAudioFramePacket(AVPacket *ipkt) -int VideoStore::write_packets(PacketQueue &queue) { - // Need to write out all the frames from the last keyframe? - // No... need to write out all frames from when the event began. Due to PreEventFrames, this could be more than since the last keyframe. - unsigned int packet_count = 0; - ZMPacket *queued_packet; - - while ( ( queued_packet = queue.popPacket() ) ) { - AVPacket *avp = queued_packet->av_packet(); - - packet_count += 1; - //Write the packet to our video store - Debug(2, "Writing queued packet stream: %d KEY %d, remaining (%d)", - avp->stream_index, avp->flags & AV_PKT_FLAG_KEY, queue.size() ); - int ret = this->writePacket( queued_packet ); - if ( ret < 0 ) { - //Less than zero and we skipped a frame - } - delete queued_packet; - } // end while packets in the packetqueue - Debug(2, "Wrote %d queued packets", packet_count ); - return packet_count; -} // end int VideoStore::write_packets( PacketQueue &queue ) { - int VideoStore::write_packet(AVPacket *pkt, AVStream *stream) { pkt->pos = -1; pkt->stream_index = stream->index;