Rework locking in ZMPacket by using a new class called ZMLockedPacket.

pull/3195/head
Isaac Connor 2021-03-15 17:05:30 -04:00
parent 6a11b23aaf
commit 9903e909af
6 changed files with 105 additions and 98 deletions

View File

@ -1813,12 +1813,13 @@ bool Monitor::Analyse() {
// if have event, send frames until we find a video packet, at which point do analysis. Adaptive skip should only affect which frames we do analysis on.
// get_analysis_packet will lock the packet and may wait if analysis_it is at the end
ZMPacket *snap = packetqueue.get_packet(analysis_it);
if (!snap) return false;
ZMLockedPacket *packet_lock = packetqueue.get_packet(analysis_it);
if (!packet_lock) return false;
ZMPacket *snap = packet_lock->packet_;
// Is it possible for snap->score to be ! -1 ? Not if everything is working correctly
if (snap->score != -1) {
snap->unlock();
delete packet_lock;
packetqueue.increment_it(analysis_it);
Error("skipping because score was %d", snap->score);
return false;
@ -1844,7 +1845,7 @@ bool Monitor::Analyse() {
// Ready means that we have captured the warmup # of frames
if (!Ready()) {
Debug(3, "Not ready?");
snap->unlock();
delete packet_lock;
return false;
}
@ -1896,10 +1897,10 @@ bool Monitor::Analyse() {
while (!snap->image and !snap->decoded) {
// Need to wait for the decoder thread.
Debug(1, "Waiting for decode");
snap->wait();
packet_lock->wait();
if (!snap->image and snap->decoded) {
Debug(1, "No image but was decoded, giving up");
snap->unlock();
delete packet_lock;
return false;
}
} // end while ! decoded
@ -2010,22 +2011,32 @@ bool Monitor::Analyse() {
);
// This gets a lock on the starting packet
ZMPacket *starting_packet = ( *start_it == snap_it ) ? snap : packetqueue.get_packet(start_it);
ZMLockedPacket *starting_packet_lock = nullptr;
ZMPacket *starting_packet = nullptr;
if ( *start_it != snap_it ) {
starting_packet_lock = packetqueue.get_packet(start_it);
if (!starting_packet_lock) return false;
starting_packet = starting_packet_lock->packet_;
} else {
starting_packet = snap;
}
event = new Event(this, *(starting_packet->timestamp), "Continuous", noteSetMap);
// Write out starting packets, do not modify packetqueue it will garbage collect itself
while ( starting_packet and (*start_it) != snap_it ) {
while ( starting_packet and ((*start_it) != snap_it) ) {
event->AddPacket(starting_packet);
// Have added the packet, don't want to unlock it until we have locked the next
packetqueue.increment_it(start_it);
if ( (*start_it) == snap_it ) {
starting_packet->unlock();
if (starting_packet_lock) delete starting_packet_lock;
break;
}
ZMPacket *p = packetqueue.get_packet(start_it);
starting_packet->unlock();
starting_packet = p;
ZMLockedPacket *lp = packetqueue.get_packet(start_it);
delete starting_packet_lock;
starting_packet_lock = lp;
starting_packet = lp->packet_;
}
packetqueue.free_it(start_it);
delete start_it;
@ -2094,7 +2105,16 @@ bool Monitor::Analyse() {
snap_it,
(pre_event_count > alarm_frame_count ? pre_event_count : alarm_frame_count)
);
ZMPacket *starting_packet = ( *start_it == snap_it ) ? snap : packetqueue.get_packet(start_it);
ZMLockedPacket *starting_packet_lock = nullptr;
ZMPacket *starting_packet = nullptr;
if ( *start_it != snap_it ) {
starting_packet_lock = packetqueue.get_packet(start_it);
if (!starting_packet_lock) return false;
starting_packet = starting_packet_lock->packet_;
} else {
starting_packet = snap;
}
event = new Event(this, *(starting_packet->timestamp), cause, noteSetMap);
shared_data->last_event_id = event->Id();
@ -2108,12 +2128,13 @@ bool Monitor::Analyse() {
packetqueue.increment_it(start_it);
if ( (*start_it) == snap_it ) {
starting_packet->unlock();
if (starting_packet_lock) delete starting_packet_lock;
break;
}
ZMPacket *p = packetqueue.get_packet(start_it);
starting_packet->unlock();
starting_packet = p;
ZMLockedPacket *lp = packetqueue.get_packet(start_it);
delete starting_packet_lock;
starting_packet_lock = lp;
starting_packet = lp->packet_;
}
packetqueue.free_it(start_it);
delete start_it;
@ -2266,7 +2287,7 @@ bool Monitor::Analyse() {
if (event) event->AddPacket(snap);
// popPacket will have placed a second lock on snap, so release it here.
snap->unlock();
delete packet_lock;
if ( snap->image_index > 0 ) {
// Only do these if it's a video packet.
@ -2705,11 +2726,12 @@ int Monitor::Capture() {
bool Monitor::Decode() {
if (!decoder_it) decoder_it = packetqueue.get_video_it(true);
ZMPacket *packet = packetqueue.get_packet(decoder_it);
if (!packet) return false;
ZMLockedPacket *packet_lock = packetqueue.get_packet(decoder_it);
if (!packet_lock) return false;
ZMPacket *packet = packet_lock->packet_;
packetqueue.increment_it(decoder_it);
if (packet->image or (packet->codec_type != AVMEDIA_TYPE_VIDEO)) {
packet->unlock();
delete packet_lock;
return true; // Don't need decode
}
@ -2788,7 +2810,7 @@ bool Monitor::Decode() {
} // end if have image
} // end if did decoding
packet->decoded = true;
packet->unlock(); // unlock will also signal
delete packet_lock;
shared_data->signal = ( capture_image and signal_check_points ) ? CheckSignal(capture_image) : true;
shared_data->last_write_index = index;
@ -3112,7 +3134,7 @@ int Monitor::PrimeCapture() {
audio_fifo = new Fifo(shared_data->audio_fifo_path, true);
}
} // end if rtsp_server
decoder = new DecoderThread(this);
if (decoding_enabled) decoder = new DecoderThread(this);
} else {
Debug(2, "Failed to prime %d", ret);
}
@ -3140,25 +3162,25 @@ Monitor::Orientation Monitor::getOrientation() const { return orientation; }
// So this should be done as the first task in the analysis thread startup.
// This function is deprecated.
void Monitor::get_ref_image() {
ZMPacket *snap = nullptr;
ZMLockedPacket *snap_lock = nullptr;
if ( !analysis_it )
analysis_it = packetqueue.get_video_it(true);
while (
(
!( snap = packetqueue.get_packet(analysis_it))
!( snap_lock = packetqueue.get_packet(analysis_it))
or
( snap->codec_type != AVMEDIA_TYPE_VIDEO )
( snap_lock->packet_->codec_type != AVMEDIA_TYPE_VIDEO )
or
! snap->image
! snap_lock->packet_->image
)
and !zm_terminate) {
Debug(1, "Waiting for capture daemon lastwriteindex(%d) lastwritetime(%d)",
shared_data->last_write_index, shared_data->last_write_time);
if ( snap and ! snap->image ) {
snap->unlock();
if ( snap_lock and ! snap_lock->packet_->image ) {
delete snap_lock;
// can't analyse it anyways, incremement
packetqueue.increment_it(analysis_it);
}
@ -3167,6 +3189,7 @@ void Monitor::get_ref_image() {
if ( zm_terminate )
return;
ZMPacket *snap = snap_lock->packet_;
Debug(1, "get_ref_image: packet.stream %d ?= video_stream %d, packet image id %d packet image %p",
snap->packet.stream_index, video_stream_id, snap->image_index, snap->image );
// Might not have been decoded yet FIXME
@ -3177,7 +3200,7 @@ void Monitor::get_ref_image() {
} else {
Debug(2, "Have no ref image about to unlock");
}
snap->unlock();
delete snap_lock;
}
std::vector<Group *> Monitor::Groups() {

View File

@ -27,7 +27,6 @@ using namespace std;
AVPixelFormat target_format = AV_PIX_FMT_NONE;
ZMPacket::ZMPacket() :
lck(mutex,std::defer_lock),
keyframe(0),
in_frame(nullptr),
out_frame(nullptr),

View File

@ -37,6 +37,8 @@ class Image;
class ZMPacket {
public:
std::mutex mutex_;
std::condition_variable condition_;
int keyframe;
AVStream *stream; // Input stream
@ -68,43 +70,44 @@ class ZMPacket {
ZMPacket();
~ZMPacket();
std::unique_lock<std::mutex> * lock() {
std::unique_lock<std::mutex> *lck = new std::unique_lock<std::mutex>(mutex);
Debug(4, "packet %d locked", this->image_index);
return lck;
};
std::unique_lock<std::mutex> * trylock() {
std::unique_lock<std::mutex> *lck = new std::unique_lock<std::mutex>(mutex, std::defer_lock);
Debug(4, "TryLocking packet %d", this->image_index);
if ( lck.try_lock() )
return lck;
delete lck;
return nullptr;
};
void unlock(std::unique_lock<std::mutex> *lck) {
Debug(4, "packet %d unlocked", this->image_index);
lck->unlock();
condition.notify_all();
};
void wait(std::unique_lock<std::mutex> *lck) {
Debug(4, "packet %d waiting", this->image_index);
// We already have a lock, but it's a recursive mutex.. so this may be ok
condition.wait(*lck);
}
AVFrame *get_out_frame(const AVCodecContext *ctx);
int get_codec_imgsize() { return codec_imgsize; };
};
class ZMLockedPacket : public ZMPacket {
class ZMLockedPacket {
public:
std::mutex mutex_;
std::condition_variable condition_;
std::unique_lock<std::mutex> lck_;
ZMPacket *packet_;
std::unique_lock<std::mutex> lck_;
ZMLockedPacket(ZMPacket *p) : packet_(packet), lck_(mutex_) {
ZMLockedPacket(ZMPacket *p) :
packet_(p),
lck_(packet_->mutex_, std::defer_lock) {
}
~ZMLockedPacket() {
unlock();
}
}
void lock() {
Debug(4, "locking packet %d", packet_->image_index);
lck_.lock();
Debug(4, "packet %d locked", packet_->image_index);
};
bool trylock() {
Debug(4, "TryLocking packet %d", packet_->image_index);
return lck_.try_lock();
};
void unlock() {
Debug(4, "packet %d unlocked", packet_->image_index);
lck_.unlock();
packet_->condition_.notify_all();
};
void wait() {
Debug(4, "packet %d waiting", packet_->image_index);
// We already have a lock, but it's a recursive mutex.. so this may be ok
packet_->condition_.wait(lck_);
}
};
#endif /* ZM_PACKET_H */

View File

@ -144,17 +144,19 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
// First packet is special because we know it is a video keyframe and only need to check for lock
ZMPacket *zm_packet = *it;
if ( zm_packet->trylock() ) {
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if ( lp->trylock() ) {
++it;
zm_packet->unlock();
delete lp;
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
while ( *it != add_packet ) {
zm_packet = *it;
if ( !zm_packet->trylock() ) {
lp = new ZMLockedPacket(zm_packet);
if ( !lp->trylock() ) {
break;
}
zm_packet->unlock();
delete lp;
if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) {
Warning("Found iterator at beginning of queue. Some thread isn't keeping up");
@ -200,7 +202,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
return;
} // end voidPacketQueue::clearPackets(ZMPacket* zm_packet)
ZMPacket* PacketQueue::popPacket( ) {
ZMLockedPacket* PacketQueue::popPacket( ) {
Debug(4, "pktQueue size %d", pktQueue.size());
if ( pktQueue.empty() ) {
return nullptr;
@ -222,14 +224,15 @@ ZMPacket* PacketQueue::popPacket( ) {
}
} // end foreach iterator
zm_packet->lock();
ZMLockedPacket *lp = new ZMLockedPacket (zm_packet);
lp->lock();
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
mutex.unlock();
return zm_packet;
return lp;
} // popPacket
@ -325,9 +328,9 @@ void PacketQueue::clear() {
while (!pktQueue.empty()) {
ZMPacket *packet = pktQueue.front();
// Someone might have this packet, but not for very long and since we have locked the queue they won't be able to get another one
packet->lock();
ZMLockedPacket lp(packet);
lp.lock();
pktQueue.pop_front();
packet->unlock();
delete packet;
}
@ -452,7 +455,7 @@ int PacketQueue::packet_count(int stream_id) {
// Returns a packet. Packet will be locked
ZMPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
if ( deleting or zm_terminate )
return nullptr;
@ -477,16 +480,17 @@ ZMPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
Error("Null p?!");
return nullptr;
}
ZMLockedPacket *lp = new ZMLockedPacket(p);
Debug(3, "get_packet %p image_index: %d, about to lock packet", p, p->image_index);
while (!(zm_terminate or deleting) and !p->trylock()) {
while (!(zm_terminate or deleting) and !lp->trylock()) {
Debug(3, "waiting on index %d. Queue size %d it == end? %d",
p->image_index, pktQueue.size(), ( *it == pktQueue.end() ) );
ZM_DUMP_PACKET(p->packet, "");
condition.wait(lck);
}
Debug(2, "Locked packet, unlocking packetqueue mutex");
return p;
} // end ZMPacket *PacketQueue::get_packet(it)
return lp;
} // end ZMLockedPacket *PacketQueue::get_packet(it)
bool PacketQueue::increment_it(packetqueue_iterator *it) {
Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end()));

View File

@ -24,6 +24,7 @@
#include <mutex>
class ZMPacket;
class ZMLockedPacket;
typedef std::list<ZMPacket *>::iterator packetqueue_iterator;
@ -52,7 +53,7 @@ class PacketQueue {
void setMaxVideoPackets(int p);
bool queuePacket(ZMPacket* packet);
ZMPacket * popPacket();
ZMLockedPacket * popPacket();
bool popVideoPacket(ZMPacket* packet);
bool popAudioPacket(ZMPacket* packet);
unsigned int clear(unsigned int video_frames_to_keep, int stream_id);
@ -68,7 +69,7 @@ class PacketQueue {
bool increment_it(packetqueue_iterator *it);
bool increment_it(packetqueue_iterator *it, int stream_id);
ZMPacket *get_packet(packetqueue_iterator *);
ZMLockedPacket *get_packet(packetqueue_iterator *);
packetqueue_iterator *get_video_it(bool wait);
packetqueue_iterator *get_stream_it(int stream_id);
void free_it(packetqueue_iterator *);

View File

@ -1211,29 +1211,6 @@ int VideoStore::writeAudioFramePacket(ZMPacket *zm_packet) {
return 0;
} // end int VideoStore::writeAudioFramePacket(AVPacket *ipkt)
int VideoStore::write_packets(PacketQueue &queue) {
// Need to write out all the frames from the last keyframe?
// No... need to write out all frames from when the event began. Due to PreEventFrames, this could be more than since the last keyframe.
unsigned int packet_count = 0;
ZMPacket *queued_packet;
while ( ( queued_packet = queue.popPacket() ) ) {
AVPacket *avp = queued_packet->av_packet();
packet_count += 1;
//Write the packet to our video store
Debug(2, "Writing queued packet stream: %d KEY %d, remaining (%d)",
avp->stream_index, avp->flags & AV_PKT_FLAG_KEY, queue.size() );
int ret = this->writePacket( queued_packet );
if ( ret < 0 ) {
//Less than zero and we skipped a frame
}
delete queued_packet;
} // end while packets in the packetqueue
Debug(2, "Wrote %d queued packets", packet_count );
return packet_count;
} // end int VideoStore::write_packets( PacketQueue &queue ) {
int VideoStore::write_packet(AVPacket *pkt, AVStream *stream) {
pkt->pos = -1;
pkt->stream_index = stream->index;