Merge pull request #3188 from Carbenium/drop-zm-thread

Drop our custom threading code
pull/3189/head
Isaac Connor 2021-03-04 13:02:51 -05:00 committed by GitHub
commit 0a8b34843c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 241 additions and 1015 deletions

View File

@ -67,9 +67,7 @@ set(ZM_BIN_SRC_FILES
zm_signal.cpp
zm_stream.cpp
zm_swscale.cpp
zm_thread.cpp
zm_time.cpp
zm_timer.cpp
zm_user.cpp
zm_utils.cpp
zm_video.cpp

View File

@ -19,6 +19,8 @@
#include "zm_buffer.h"
#include <unistd.h>
unsigned int Buffer::assign(const unsigned char *pStorage, unsigned int pSize) {
if ( mAllocation < pSize ) {
delete[] mStorage;

View File

@ -27,6 +27,7 @@
#include <set>
#include <sys/uio.h>
#include <sys/un.h>
#include <unistd.h>
#include <vector>
#if defined(BSD)

View File

@ -20,7 +20,6 @@
#ifndef ZM_DB_H
#define ZM_DB_H
#include "zm_thread.h"
#include <condition_variable>
#include <mutex>
#include <mysql/mysql.h>

View File

@ -28,6 +28,7 @@
#include "zm_videostore.h"
#include <cstring>
#include <sys/stat.h>
#include <unistd.h>
//#define USE_PREPARED_SQL 1

View File

@ -20,6 +20,7 @@
#ifndef ZM_EVENT_H
#define ZM_EVENT_H
#include "zm_config.h"
#include "zm_define.h"
#include "zm_storage.h"
#include <map>

View File

@ -27,6 +27,8 @@
#include "zm_storage.h"
#include <arpa/inet.h>
#include <sys/stat.h>
#include <unistd.h>
#ifdef __FreeBSD__
#include <netinet/in.h>
#endif

View File

@ -24,6 +24,7 @@
#include <fcntl.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <unistd.h>
#define RAW_BUFFER 512

View File

@ -23,6 +23,7 @@
#include "zm_signal.h"
#include <fcntl.h>
#include <sys/file.h>
#include <unistd.h>
#define RAW_BUFFER 512
static bool zm_fifodbg_inited = false;

View File

@ -24,6 +24,7 @@
#include <fcntl.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <unistd.h>
#define RAW_BUFFER 512
bool FifoStream::sendRAWFrames() {

View File

@ -21,6 +21,7 @@
#include "zm_packet.h"
#include <sys/stat.h>
#include <unistd.h>
FileCamera::FileCamera(
const Monitor *monitor,

View File

@ -25,6 +25,7 @@
#include <algorithm>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
static unsigned char y_table_global[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 29, 30, 31, 32, 33, 34, 36, 37, 38, 39, 40, 41, 43, 44, 45, 46, 47, 48, 50, 51, 52, 53, 54, 55, 57, 58, 59, 60, 61, 62, 64, 65, 66, 67, 68, 69, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 85, 86, 87, 88, 89, 90, 91, 93, 94, 95, 96, 97, 98, 100, 101, 102, 103, 104, 105, 107, 108, 109, 110, 111, 112, 114, 115, 116, 117, 118, 119, 121, 122, 123, 124, 125, 126, 128, 129, 130, 131, 132, 133, 135, 136, 137, 138, 139, 140, 142, 143, 144, 145, 146, 147, 149, 150, 151, 152, 153, 154, 156, 157, 158, 159, 160, 161, 163, 164, 165, 166, 167, 168, 170, 171, 172, 173, 174, 175, 176, 178, 179, 180, 181, 182, 183, 185, 186, 187, 188, 189, 190, 192, 193, 194, 195, 196, 197, 199, 200, 201, 202, 203, 204, 206, 207, 208, 209, 210, 211, 213, 214, 215, 216, 217, 218, 220, 221, 222, 223, 224, 225, 227, 228, 229, 230, 231, 232, 234, 235, 236, 237, 238, 239, 241, 242, 243, 244, 245, 246, 248, 249, 250, 251, 252, 253, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255};

View File

@ -94,7 +94,11 @@ void LibvlcUnlockBuffer(void* opaque, void* picture, void *const *planes) {
// Return frames slightly faster than 1fps (if time() supports greater than one second resolution)
if ( newFrame || difftime(now, data->prevTime) >= 0.8 ) {
data->prevTime = now;
data->newImage.updateValueSignal(true);
{
std::lock_guard<std::mutex> lck(data->newImageMutex);
data->newImage = true;
}
data->newImageCv.notify_all();
}
}
@ -165,6 +169,9 @@ LibvlcCamera::~LibvlcCamera() {
if ( capture ) {
Terminate();
}
mLibvlcData.newImageCv.notify_all(); // to unblock on termination (zm_terminate)
if ( mLibvlcMediaPlayer != nullptr ) {
(*libvlc_media_player_release_f)(mLibvlcMediaPlayer);
mLibvlcMediaPlayer = nullptr;
@ -255,8 +262,8 @@ int LibvlcCamera::PrimeCapture() {
// Libvlc wants 32 byte alignment for images (should in theory do this for all image lines)
mLibvlcData.buffer = (uint8_t*)zm_mallocaligned(64, mLibvlcData.bufferSize);
mLibvlcData.prevBuffer = (uint8_t*)zm_mallocaligned(64, mLibvlcData.bufferSize);
mLibvlcData.newImage.setValueImmediate(false);
mLibvlcData.newImage = false;
(*libvlc_media_player_play_f)(mLibvlcMediaPlayer);
@ -271,17 +278,19 @@ int LibvlcCamera::PreCapture() {
// Should not return -1 as cancels capture. Always wait for image if available.
int LibvlcCamera::Capture( ZMPacket &zm_packet ) {
// newImage is a mutex/condition based flag to tell us when there is an image available
while( !mLibvlcData.newImage.getValueImmediate() ) {
if (zm_terminate)
return 0;
mLibvlcData.newImage.getUpdatedValue(1);
{
std::unique_lock<std::mutex> lck(mLibvlcData.newImageMutex);
mLibvlcData.newImageCv.wait(lck, [&]{ return mLibvlcData.newImage || zm_terminate; });
mLibvlcData.newImage = false;
}
if (zm_terminate)
return 0;
mLibvlcData.mutex.lock();
zm_packet.image->Assign(width, height, colours, subpixelorder, mLibvlcData.buffer, width * height * mBpp);
zm_packet.packet.stream_index = mVideoStreamId;
zm_packet.stream = mVideoStream;
mLibvlcData.newImage.setValueImmediate(false);
mLibvlcData.mutex.unlock();
return 1;

View File

@ -21,7 +21,8 @@
#define ZM_LIBVLC_CAMERA_H
#include "zm_camera.h"
#include "zm_thread.h"
#include <condition_variable>
#include <mutex>
#if HAVE_LIBVLC
@ -35,8 +36,11 @@ struct LibvlcPrivateData {
uint8_t* prevBuffer;
time_t prevTime;
uint32_t bufferSize;
Mutex mutex;
ThreadData<bool> newImage;
std::mutex mutex;
bool newImage;
std::mutex newImageMutex;
std::condition_variable newImageCv;
};
class LibvlcCamera : public Camera {

View File

@ -21,10 +21,10 @@
#include "zm_db.h"
#include "zm_utils.h"
#include <libgen.h>
#include <syslog.h>
#include <sys/time.h>
#include <unistd.h>
#ifdef __FreeBSD__
#include <sys/thr.h>

View File

@ -23,9 +23,9 @@
#include "zm_db.h"
#include "zm_config.h"
#include "zm_define.h"
#include "zm_thread.h"
#include <string>
#include <map>
#include <mutex>
#include <string>
#ifdef HAVE_SYS_SYSCALL_H
#include <sys/syscall.h>
@ -89,7 +89,7 @@ private:
static bool smInitialised;
static Logger *smInstance;
RecursiveMutex log_mutex;
std::recursive_mutex log_mutex;
static StringMap smCodes;
static IntMap smSyslogPriorities;

View File

@ -1033,7 +1033,7 @@ bool Monitor::connect() {
trigger_data->trigger_cause[0] = 0;
trigger_data->trigger_text[0] = 0;
trigger_data->trigger_showtext[0] = 0;
video_store_data->recording = (struct timeval){0,0};
video_store_data->recording = {};
// Uh, why nothing? Why not nullptr?
snprintf(video_store_data->event_file, sizeof(video_store_data->event_file), "nothing");
video_store_data->size = sizeof(VideoStoreData);
@ -2560,7 +2560,7 @@ int Monitor::Capture() {
packet->packet.stream_index = video_stream_id; // Convert to packetQueue's index
if (video_fifo) {
if ( packet->keyframe ) {
// avcodec strips out important nals that describe the stream and
// avcodec strips out important nals that describe the stream and
// stick them in extradata. Need to send them along with keyframes
#if LIBAVCODEC_VERSION_CHECK(57, 64, 0, 64, 0)
AVStream *stream = camera->getVideoStream();

View File

@ -27,6 +27,7 @@
#include "zm_image.h"
#include "zm_packet.h"
#include "zm_packetqueue.h"
#include "zm_utils.h"
#include "zm_video.h"
#include <memory>
#include <sys/time.h>

View File

@ -26,6 +26,8 @@
#include <glob.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#ifdef __FreeBSD__
#include <netinet/in.h>
#endif

View File

@ -22,6 +22,7 @@
#include "zm_logger.h"
#include "zm_rgb.h"
#include <cstring>
#include <unistd.h>
#if HAVE_LIBAVCODEC
extern "C" {

View File

@ -27,6 +27,7 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#ifdef SOLARIS
#include <sys/filio.h> // FIONREAD and friends

View File

@ -24,6 +24,7 @@
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#ifdef SOLARIS
#include <sys/filio.h> // FIONREAD and friends

View File

@ -48,7 +48,6 @@ RemoteCameraRtsp::RemoteCameraRtsp(
p_brightness, p_contrast, p_hue, p_colour,
p_capture, p_record_audio),
rtsp_describe(p_rtsp_describe),
rtspThread(0),
frameCount(0)
{
if ( p_method == "rtpUni" )
@ -114,19 +113,15 @@ void RemoteCameraRtsp::Terminate() {
}
int RemoteCameraRtsp::Connect() {
rtspThread = new RtspThread(monitor->Id(), method, protocol, host, port, path, auth, rtsp_describe);
rtspThread->start();
rtspThread = ZM::make_unique<RtspThread>(monitor->Id(), method, protocol, host, port, path, auth, rtsp_describe);
return 0;
}
int RemoteCameraRtsp::Disconnect() {
if ( rtspThread ) {
rtspThread->stop();
rtspThread->join();
delete rtspThread;
rtspThread = nullptr;
if (rtspThread) {
rtspThread->Stop();
rtspThread.reset();
}
return 0;
}
@ -214,7 +209,7 @@ int RemoteCameraRtsp::PrimeCapture() {
} // end PrimeCapture
int RemoteCameraRtsp::PreCapture() {
if ( !rtspThread->isRunning() )
if (!rtspThread || rtspThread->IsStopped())
return -1;
if ( !rtspThread->hasSources() ) {
Error("Cannot precapture, no RTP sources");
@ -234,7 +229,7 @@ int RemoteCameraRtsp::Capture(ZMPacket &zm_packet) {
while ( !frameComplete ) {
buffer.clear();
if ( !rtspThread->isRunning() )
if (!rtspThread || rtspThread->IsStopped())
return -1;
if ( rtspThread->getFrame(buffer) ) {

View File

@ -44,7 +44,7 @@ protected:
RtspThread::RtspMethod method;
RtspThread *rtspThread;
std::unique_ptr<RtspThread> rtspThread;
int frameCount;

View File

@ -25,9 +25,16 @@
#if HAVE_LIBAVFORMAT
RtpCtrlThread::RtpCtrlThread( RtspThread &rtspThread, RtpSource &rtpSource )
: mRtspThread( rtspThread ), mRtpSource( rtpSource ), mStop( false )
RtpCtrlThread::RtpCtrlThread(RtspThread &rtspThread, RtpSource &rtpSource)
: mRtspThread(rtspThread), mRtpSource(rtpSource), mTerminate(false)
{
mThread = std::thread(&RtpCtrlThread::Run, this);
}
RtpCtrlThread::~RtpCtrlThread() {
Stop();
if (mThread.joinable())
mThread.join();
}
int RtpCtrlThread::recvPacket( const unsigned char *packet, ssize_t packetLen ) {
@ -121,7 +128,7 @@ int RtpCtrlThread::recvPacket( const unsigned char *packet, ssize_t packetLen )
}
case RTCP_BYE :
Debug(5, "RTCP Got BYE");
mStop = true;
Stop();
break;
case RTCP_APP :
// Ignoring as per RFC 3550
@ -241,7 +248,7 @@ int RtpCtrlThread::recvPackets( unsigned char *buffer, ssize_t nBytes ) {
return nBytes;
}
int RtpCtrlThread::run() {
void RtpCtrlThread::Run() {
Debug( 2, "Starting control thread %x on port %d", mRtpSource.getSsrc(), mRtpSource.getLocalCtrlPort() );
ZM::SockAddrInet localAddr, remoteAddr;
@ -272,8 +279,7 @@ int RtpCtrlThread::run() {
time_t last_receive = time(nullptr);
bool timeout = false; // used as a flag that we had a timeout, and then sent an RR to see if we wake back up. Real timeout will happen when this is true.
while ( !mStop && select.wait() >= 0 ) {
while (!mTerminate && select.wait() >= 0) {
time_t now = time(nullptr);
ZM::Select::CommsList readable = select.getReadable();
if ( readable.size() == 0 ) {
@ -318,7 +324,7 @@ int RtpCtrlThread::run() {
}
} else {
// Here is another case of not receiving some data causing us to terminate... why? Sometimes there are pauses in the interwebs.
mStop = true;
Stop();
break;
}
} else {
@ -327,8 +333,7 @@ int RtpCtrlThread::run() {
} // end foeach comms iterator
}
rtpCtrlServer.close();
mRtspThread.stop();
return 0;
mRtspThread.Stop();
}
#endif // HAVE_LIBAVFORMAT

View File

@ -20,7 +20,8 @@
#ifndef ZM_RTP_CTRL_H
#define ZM_RTP_CTRL_H
#include "zm_thread.h"
#include <atomic>
#include <thread>
// Defined in ffmpeg rtp.h
//#define RTP_MAX_SDES 255 // maximum text length for SDES
@ -32,7 +33,7 @@
class RtspThread;
class RtpSource;
class RtpCtrlThread : public Thread {
class RtpCtrlThread {
friend class RtspThread;
private:
@ -121,7 +122,9 @@ private:
RtspThread &mRtspThread;
RtpSource &mRtpSource;
int mPort;
bool mStop;
std::atomic<bool> mTerminate;
std::thread mThread;
private:
int recvPacket( const unsigned char *packet, ssize_t packetLen );
@ -129,14 +132,13 @@ private:
int generateSdes( const unsigned char *packet, ssize_t packetLen );
int generateBye( const unsigned char *packet, ssize_t packetLen );
int recvPackets( unsigned char *buffer, ssize_t nBytes );
int run();
void Run();
public:
RtpCtrlThread( RtspThread &rtspThread, RtpSource &rtpSource );
~RtpCtrlThread();
void stop() {
mStop = true;
}
void Stop() { mTerminate = true; }
};
#endif // ZM_RTP_CTRL_H

View File

@ -26,8 +26,15 @@
#if HAVE_LIBAVFORMAT
RtpDataThread::RtpDataThread(RtspThread &rtspThread, RtpSource &rtpSource) :
mRtspThread(rtspThread), mRtpSource(rtpSource), mStop(false)
mRtspThread(rtspThread), mRtpSource(rtpSource), mTerminate(false)
{
mThread = std::thread(&RtpDataThread::Run, this);
}
RtpDataThread::~RtpDataThread() {
Stop();
if (mThread.joinable())
mThread.join();
}
bool RtpDataThread::recvPacket(const unsigned char *packet, size_t packetLen) {
@ -54,7 +61,7 @@ bool RtpDataThread::recvPacket(const unsigned char *packet, size_t packetLen) {
return mRtpSource.handlePacket(packet, packetLen);
}
int RtpDataThread::run() {
void RtpDataThread::Run() {
Debug(2, "Starting data thread %d on port %d",
mRtpSource.getSsrc(), mRtpSource.getLocalDataPort());
@ -75,11 +82,11 @@ int RtpDataThread::run() {
select.addReader(&rtpDataSocket);
unsigned char buffer[ZM_NETWORK_BUFSIZ];
while ( !zm_terminate && !mStop && (select.wait() >= 0) ) {
while ( !zm_terminate && !mTerminate && (select.wait() >= 0) ) {
ZM::Select::CommsList readable = select.getReadable();
if ( readable.size() == 0 ) {
Error("RTP timed out");
mStop = true;
Stop();
break;
}
for ( ZM::Select::CommsList::iterator iter = readable.begin(); iter != readable.end(); ++iter ) {
@ -89,7 +96,7 @@ int RtpDataThread::run() {
if ( nBytes ) {
recvPacket(buffer, nBytes);
} else {
mStop = true;
Stop();
break;
}
} else {
@ -98,8 +105,7 @@ int RtpDataThread::run() {
} // end foreach commsList
}
rtpDataSocket.close();
mRtspThread.stop();
return 0;
mRtspThread.Stop();
}
#endif // HAVE_LIBAVFORMAT

View File

@ -21,7 +21,8 @@
#define ZM_RTP_DATA_H
#include "zm_define.h"
#include "zm_thread.h"
#include <atomic>
#include <thread>
class RtspThread;
class RtpSource;
@ -40,26 +41,26 @@ struct RtpDataHeader
uint32_t csrc[]; // optional CSRC list
};
class RtpDataThread : public Thread
class RtpDataThread
{
friend class RtspThread;
private:
RtspThread &mRtspThread;
RtpSource &mRtpSource;
bool mStop;
std::atomic<bool> mTerminate;
std::thread mThread;
private:
bool recvPacket( const unsigned char *packet, size_t packetLen );
int run();
void Run();
public:
RtpDataThread( RtspThread &rtspThread, RtpSource &rtpSource );
~RtpDataThread();
void stop()
{
mStop = true;
}
void Stop() { mTerminate = true; }
};
#endif // ZM_RTP_DATA_H

View File

@ -23,6 +23,7 @@
#include "zm_rtp_data.h"
#include "zm_utils.h"
#include <arpa/inet.h>
#include <unistd.h>
#if HAVE_LIBAVCODEC
@ -79,6 +80,12 @@ RtpSource::RtpSource(
Warning("The device is using a codec (%d) that may not be supported. Do not be surprised if things don't work.", mCodecId);
}
RtpSource::~RtpSource() {
mTerminate = true;
mFrameReadyCv.notify_all();
mFrameProcessedCv.notify_all();
}
void RtpSource::init(uint16_t seq) {
Debug(3, "Initialising sequence");
mBaseSeq = seq;
@ -292,7 +299,7 @@ bool RtpSource::handlePacket(const unsigned char *packet, size_t packetLen) {
extraHeader = 2;
break;
default:
default:
Debug(3, "Unhandled nalType %d", nalType);
}
@ -301,7 +308,7 @@ bool RtpSource::handlePacket(const unsigned char *packet, size_t packetLen) {
mFrame.append("\x0\x0\x1", 3);
} // end if H264
mFrame.append(packet+rtpHeaderSize+extraHeader,
packetLen-rtpHeaderSize-extraHeader);
packetLen-rtpHeaderSize-extraHeader);
} else {
Debug(3, "NOT H264 frame: type is %d", mCodecId);
}
@ -312,16 +319,21 @@ bool RtpSource::handlePacket(const unsigned char *packet, size_t packetLen) {
if ( mFrameGood ) {
Debug(3, "Got new frame %d, %d bytes", mFrameCount, mFrame.size());
mFrameProcessed.setValueImmediate(false);
mFrameReady.updateValueSignal(true);
if ( !mFrameProcessed.getValueImmediate() ) {
// What is the point of this for loop? Is it just me, or will it call getUpdatedValue once or twice? Could it not be better written as
// if ( ! mFrameProcessed.getUpdatedValue( 1 ) && mFrameProcessed.getUpdatedValue( 1 ) ) return false;
for ( int count = 0; !mFrameProcessed.getUpdatedValue(1); count++ )
if ( count > 1 )
return false;
{
std::lock_guard<std::mutex> lck(mFrameReadyMutex);
mFrameReady = true;
}
mFrameReadyCv.notify_all();
{
std::unique_lock<std::mutex> lck(mFrameProcessedMutex);
mFrameProcessedCv.wait(lck, [&]{ return mFrameProcessed || mTerminate; });
mFrameProcessed = false;
}
if (mTerminate)
return false;
mFrameCount++;
} else {
Warning("Discarding incomplete frame %d, %d bytes", mFrameCount, mFrame.size());
@ -349,17 +361,21 @@ bool RtpSource::handlePacket(const unsigned char *packet, size_t packetLen) {
}
bool RtpSource::getFrame(Buffer &buffer) {
if ( !mFrameReady.getValueImmediate() ) {
Debug(3, "Getting frame but not ready");
// Allow for a couple of spurious returns
for ( int count = 0; !mFrameReady.getUpdatedValue(1); count++ ) {
if ( count > 1 )
return false;
}
{
std::unique_lock<std::mutex> lck(mFrameReadyMutex);
mFrameReadyCv.wait(lck, [&]{ return mFrameReady || mTerminate; });
mFrameReady = false;
}
if (mTerminate)
return false;
buffer = mFrame;
mFrameReady.setValueImmediate(false);
mFrameProcessed.updateValueSignal(true);
{
std::lock_guard<std::mutex> lck(mFrameProcessedMutex);
mFrameProcessed = true;
}
mFrameProcessedCv.notify_all();
Debug(4, "Copied %d bytes", buffer.size());
return true;
}

View File

@ -24,7 +24,8 @@
#include "zm_config.h"
#include "zm_define.h"
#include "zm_ffmpeg.h"
#include "zm_thread.h"
#include <condition_variable>
#include <mutex>
#include <string>
#include <sys/time.h>
@ -90,14 +91,23 @@ private:
int mFrameCount;
bool mFrameGood;
bool prevM;
ThreadData<bool> mFrameReady;
ThreadData<bool> mFrameProcessed;
bool mTerminate;
bool mFrameReady;
std::condition_variable mFrameReadyCv;
std::mutex mFrameReadyMutex;
bool mFrameProcessed;
std::condition_variable mFrameProcessedCv;
std::mutex mFrameProcessedMutex;
private:
void init(uint16_t seq);
public:
RtpSource( int id, const std::string &localHost, int localPortBase, const std::string &remoteHost, int remotePortBase, uint32_t ssrc, uint16_t seq, uint32_t rtpClock, uint32_t rtpTime, _AVCODECID codecId );
~RtpSource();
bool updateSeq( uint16_t seq );
void updateJitter( const RtpDataHeader *header );

View File

@ -91,16 +91,9 @@ int RtspThread::requestPorts() {
char sql[ZM_SQL_SML_BUFSIZ];
//FIXME Why not load specifically by Id? This will get ineffeicient with a lot of monitors
strncpy(sql, "SELECT `Id` FROM `Monitors` WHERE `Function` != 'None' AND `Type` = 'Remote' AND `Protocol` = 'rtsp' AND `Method` = 'rtpUni' ORDER BY `Id` ASC", sizeof(sql));
if ( mysql_query(&dbconn, sql) ) {
Error("Can't run query: %s", mysql_error(&dbconn));
exit(mysql_errno(&dbconn));
}
MYSQL_RES *result = mysql_store_result(&dbconn);
if ( !result ) {
Error("Can't use query result: %s", mysql_error(&dbconn));
exit(mysql_errno(&dbconn));
}
MYSQL_RES *result = zmDbFetch(sql);
int nMonitors = mysql_num_rows(result);
int position = 0;
if ( nMonitors ) {
@ -161,7 +154,7 @@ RtspThread::RtspThread(
mSsrc(0),
mDist(UNDEFINED),
mRtpTime(0),
mStop(false)
mTerminate(false)
{
mUrl = mProtocol+"://"+mHost+":"+mPort;
if ( !mPath.empty() ) {
@ -185,9 +178,15 @@ RtspThread::RtspThread(
mAuthenticator = new zm::Authenticator(parts[0], parts[1]);
else
mAuthenticator = new zm::Authenticator(parts[0], "");
mThread = std::thread(&RtspThread::Run, this);
}
RtspThread::~RtspThread() {
Stop();
if (mThread.joinable())
mThread.join();
if ( mFormatContext ) {
#if LIBAVFORMAT_VERSION_CHECK(52, 96, 0, 96, 0)
avformat_free_context(mFormatContext);
@ -204,7 +203,7 @@ RtspThread::~RtspThread() {
mAuthenticator = nullptr;
}
int RtspThread::run() {
void RtspThread::Run() {
std::string message;
std::string response;
@ -246,11 +245,11 @@ int RtspThread::run() {
Debug(2, "Sending HTTP message: %s", message.c_str());
if ( mRtspSocket.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
return -1;
return;
}
if ( mRtspSocket.recv(response) < 0 ) {
Error("Recv failed; %s", strerror(errno));
return -1;
return;
}
Debug(2, "Received HTTP response: %s (%zd bytes)", response.c_str(), response.size());
@ -264,7 +263,7 @@ int RtspThread::run() {
if ( response.size() )
Hexdump(Logger::ERROR, response.data(), std::min(int(response.size()), 16));
}
return -1;
return;
}
// If Server requests authentication, check WWW-Authenticate header and fill required fields
// for requested authentication method
@ -282,7 +281,7 @@ int RtspThread::run() {
if ( respCode != 200 ) {
Error("Unexpected response code %d, text is '%s'", respCode, respText);
return -1;
return;
}
message = "POST "+mPath+" HTTP/1.0\r\n";
@ -295,7 +294,7 @@ int RtspThread::run() {
Debug(2, "Sending HTTP message: %s", message.c_str());
if ( mRtspSocket2.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
return -1;
return;
}
} // end if ( mMethod == RTP_RTSP_HTTP )
@ -305,18 +304,18 @@ int RtspThread::run() {
// Request supported RTSP commands by the server
message = "OPTIONS "+mUrl+" RTSP/1.0\r\n";
if ( !sendCommand(message) )
return -1;
return;
// A negative return here may indicate auth failure, but we will have setup the auth mechanisms so we need to retry.
if ( !recvResponse(response) ) {
if ( mNeedAuth ) {
Debug(2, "Resending OPTIONS due to possible auth requirement");
if ( !sendCommand(message) )
return -1;
return;
if ( !recvResponse(response) )
return -1;
return;
} else {
return -1;
return;
}
} // end if failed response maybe due to auth
@ -347,7 +346,7 @@ int RtspThread::run() {
const std::string endOfHeaders = "\r\n\r\n";
size_t sdpStart = response.find(endOfHeaders);
if ( sdpStart == std::string::npos )
return -1;
return;
if ( mRtspDescribe ) {
std::string DescHeader = response.substr(0, sdpStart);
@ -374,7 +373,7 @@ int RtspThread::run() {
mFormatContext = mSessDesc->generateFormatContext();
} catch ( const Exception &e ) {
Error(e.getMessage().c_str());
return -1;
return;
}
#if 0
@ -450,9 +449,9 @@ int RtspThread::run() {
}
if ( !sendCommand(message) )
return -1;
return;
if ( !recvResponse(response) )
return -1;
return;
lines = split(response, "\r\n");
std::string session;
@ -525,9 +524,9 @@ int RtspThread::run() {
message = "PLAY "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\nRange: npt=0.000-\r\n";
if ( !sendCommand(message) )
return -1;
return;
if ( !recvResponse(response) )
return -1;
return;
lines = split(response, "\r\n");
std::string rtpInfo;
@ -587,17 +586,14 @@ int RtspThread::run() {
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
rtpDataThread.start();
rtpCtrlThread.start();
while( !mStop ) {
while (!mTerminate) {
now = time(nullptr);
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d",
sendKeepalive, timeout, now, lastKeepalive, (now-lastKeepalive) );
if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) {
if ( !sendCommand( message ) )
return( -1 );
return;
lastKeepalive = now;
}
usleep( 100000 );
@ -612,19 +608,16 @@ int RtspThread::run() {
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
return( -1 );
return;
if ( !recvResponse( response ) )
return( -1 );
return;
rtpDataThread.stop();
rtpCtrlThread.stop();
rtpDataThread.Stop();
rtpCtrlThread.Stop();
//rtpDataThread.kill( SIGTERM );
//rtpCtrlThread.kill( SIGTERM );
rtpDataThread.join();
rtpCtrlThread.join();
delete mSources[ssrc];
mSources.clear();
@ -647,7 +640,7 @@ int RtspThread::run() {
Buffer buffer( ZM_NETWORK_BUFSIZ );
std::string keepaliveMessage = "OPTIONS "+mUrl+" RTSP/1.0\r\n";
std::string keepaliveResponse = "RTSP/1.0 200 OK\r\n";
while ( !mStop && select.wait() >= 0 ) {
while (!mTerminate && select.wait() >= 0) {
ZM::Select::CommsList readable = select.getReadable();
if ( readable.size() == 0 ) {
Error( "RTSP timed out" );
@ -720,7 +713,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) )
{
if ( !sendCommand( message ) )
return( -1 );
return;
lastKeepalive = now;
}
buffer.tidy( 1 );
@ -735,7 +728,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
// Send a teardown message but don't expect a response as this may not be implemented on the server when using TCP
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
return( -1 );
return;
delete mSources[ssrc];
mSources.clear();
@ -749,14 +742,12 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
rtpDataThread.start();
rtpCtrlThread.start();
while ( !mStop ) {
while (!mTerminate) {
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
if ( sendKeepalive && (timeout > 0) && ((time(nullptr)-lastKeepalive) > (timeout-5)) ) {
if ( !sendCommand( message ) )
return -1;
return;
lastKeepalive = time(nullptr);
}
usleep(100000);
@ -770,15 +761,12 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
#endif
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand(message) )
return -1;
return;
if ( !recvResponse(response) )
return -1;
return;
rtpDataThread.stop();
rtpCtrlThread.stop();
rtpDataThread.join();
rtpCtrlThread.join();
rtpDataThread.Stop();
rtpCtrlThread.Stop();
delete mSources[ssrc];
mSources.clear();
@ -791,7 +779,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
break;
}
return 0;
return;
}
#endif // HAVE_LIBAVFORMAT

View File

@ -24,10 +24,12 @@
#include "zm_rtp_source.h"
#include "zm_rtsp_auth.h"
#include "zm_sdp.h"
#include <atomic>
#include <map>
#include <set>
#include <thread>
class RtspThread : public Thread {
class RtspThread {
public:
typedef enum { RTP_UNICAST, RTP_MULTICAST, RTP_RTSP, RTP_RTSP_HTTP } RtspMethod;
typedef enum { UNDEFINED, UNICAST, MULTICAST } RtspDist;
@ -84,12 +86,14 @@ private:
unsigned long mRtpTime;
bool mStop;
std::thread mThread;
std::atomic<bool> mTerminate;
private:
bool sendCommand( std::string message );
bool recvResponse( std::string &response );
void checkAuthResponse(std::string &response);
void checkAuthResponse(std::string &response);
void Run();
public:
RtspThread( int id, RtspMethod method, const std::string &protocol, const std::string &host, const std::string &port, const std::string &path, const std::string &auth, bool rtsp_describe );
@ -124,15 +128,10 @@ public:
return( false );
return( iter->second->getFrame( frame ) );
}
int run();
void stop()
{
mStop = true;
}
bool stopped() const
{
return( mStop );
}
void Stop() { mTerminate = true; }
bool IsStopped() const { return mTerminate; }
int getAddressFamily ()
{
return mRtspSocket.getDomain();

View File

@ -157,9 +157,9 @@ int main(int argc, char *argv[]) {
sigaddset(&block_set, SIGUSR1);
sigaddset(&block_set, SIGUSR2);
RTSPServerThread * rtsp_server_thread = nullptr;
std::unique_ptr<RTSPServerThread> rtsp_server_thread;
if (config.min_rtsp_port) {
rtsp_server_thread = new RTSPServerThread(config.min_rtsp_port);
rtsp_server_thread = ZM::make_unique<RTSPServerThread>(config.min_rtsp_port);
Debug(1, "Starting RTSP server because min_rtsp_port is set");
} else {
Debug(1, "Not starting RTSP server because min_rtsp_port not set");
@ -168,8 +168,6 @@ int main(int argc, char *argv[]) {
ServerMediaSession **sessions = new ServerMediaSession *[monitors.size()];
for (size_t i = 0; i < monitors.size(); i++) sessions[i] = nullptr;
rtsp_server_thread->start();
while (!zm_terminate) {
for (size_t i = 0; i < monitors.size(); i++) {
@ -226,11 +224,6 @@ int main(int argc, char *argv[]) {
} // end if zm_reload
} // end while ! zm_terminate
rtsp_server_thread->stop();
rtsp_server_thread->join();
delete rtsp_server_thread;
rtsp_server_thread = nullptr;
delete[] sessions;
sessions = nullptr;

View File

@ -12,7 +12,7 @@
#include <StreamReplicator.hh>
RTSPServerThread::RTSPServerThread(int port) :
terminate(0)
terminate_(false), scheduler_watch_var_(0)
{
//unsigned short rtsp_over_http_port = 0;
//const char *realm = "ZoneMinder";
@ -34,9 +34,15 @@ RTSPServerThread::RTSPServerThread(int port) :
}
const char *prefix = rtspServer->rtspURLPrefix();
delete[] prefix;
} // end RTSPServerThread::RTSPServerThread
thread_ = std::thread(&RTSPServerThread::Run, this);
}
RTSPServerThread::~RTSPServerThread() {
Stop();
if (thread_.joinable())
thread_.join();
if (rtspServer) {
Medium::close(rtspServer);
} // end if rtsp_server
@ -49,25 +55,26 @@ RTSPServerThread::~RTSPServerThread() {
delete scheduler;
}
int RTSPServerThread::run() {
Debug(1, "RTSPServerThread::run()");
if ( rtspServer )
env->taskScheduler().doEventLoop(&terminate); // does not return
void RTSPServerThread::Run() {
Debug(1, "RTSPServerThread::Run()");
if (rtspServer)
env->taskScheduler().doEventLoop(&scheduler_watch_var_); // does not return
Debug(1, "RTSPServerThread::done()");
return 0;
} // end in RTSPServerThread::run()
}
void RTSPServerThread::stop() {
void RTSPServerThread::Stop() {
Debug(1, "RTSPServerThread::stop()");
terminate = 1;
terminate_ = true;
{
std::lock_guard<std::mutex> lck(scheduler_watch_var_mutex_);
scheduler_watch_var_ = 1;
}
for ( std::list<FramedSource *>::iterator it = sources.begin(); it != sources.end(); ++it ) {
(*it)->stopGettingFrames();
}
} // end RTSPServerThread::stop()
bool RTSPServerThread::stopped() const {
return terminate ? true : false;
} // end RTSPServerThread::stopped()
}
ServerMediaSession *RTSPServerThread::addSession(std::string &streamname) {
ServerMediaSession *sms = ServerMediaSession::createNew(*env, streamname.c_str());

View File

@ -3,9 +3,9 @@
#include "zm_config.h"
#include "zm_ffmpeg.h"
#include "zm_thread.h"
#include "zm_rtsp_server_server_media_subsession.h"
#include "zm_rtsp_server_fifo_source.h"
#include <atomic>
#include <list>
#include <memory>
@ -15,10 +15,14 @@
class Monitor;
class RTSPServerThread : public Thread {
class RTSPServerThread {
private:
std::shared_ptr<Monitor> monitor_;
char terminate;
std::thread thread_;
std::atomic<bool> terminate_;
std::mutex scheduler_watch_var_mutex_;
char scheduler_watch_var_;
TaskScheduler* scheduler;
UsageEnvironment* env;
@ -34,9 +38,9 @@ class RTSPServerThread : public Thread {
void removeSession(ServerMediaSession *sms);
void addStream(std::string &streamname, AVStream *, AVStream *);
FramedSource *addFifo(ServerMediaSession *sms, std::string fifo);
int run();
void stop();
bool stopped() const;
void Run();
void Stop();
bool IsStopped() const { return terminate_; };
private:
const std::string getRtpFormat(AVCodecID codec, bool muxTS);
int addSession(

View File

@ -20,6 +20,7 @@
#include "zm_sdp.h"
#include "zm_config.h"
#include "zm_exception.h"
#include "zm_logger.h"
#if HAVE_LIBAVFORMAT

View File

@ -21,10 +21,11 @@
#include "zm_box.h"
#include "zm_monitor.h"
#include <cmath>
#include <sys/file.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <cmath>
#include <unistd.h>
StreamBase::~StreamBase() {
#if HAVE_LIBAVCODEC

View File

@ -1,317 +0,0 @@
//
// ZoneMinder Thread Class Implementation, $Date$, $Revision$
// Copyright (C) 2001-2008 Philip Coombes
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
//
#include "zm_thread.h"
#include "zm_logger.h"
#include "zm_utils.h"
#include <cerrno>
#include <cstring>
#include <csignal>
#include <sys/time.h>
struct timespec getTimeout( int secs ) {
struct timespec timeout;
struct timeval temp_timeout;
gettimeofday(&temp_timeout, nullptr);
timeout.tv_sec = temp_timeout.tv_sec + secs;
timeout.tv_nsec = temp_timeout.tv_usec*1000;
return timeout;
}
struct timespec getTimeout( double secs ) {
struct timespec timeout;
struct timeval temp_timeout;
gettimeofday( &temp_timeout, nullptr );
timeout.tv_sec = temp_timeout.tv_sec + int(secs);
timeout.tv_nsec = temp_timeout.tv_usec += (long int)(1000000000.0*(secs-int(secs)));
if ( timeout.tv_nsec > 1000000000 ) {
timeout.tv_sec += 1;
timeout.tv_nsec -= 1000000000;
}
return timeout;
}
Mutex::Mutex() {
if ( pthread_mutex_init(&mMutex, nullptr) < 0 )
Error("Unable to create pthread mutex: %s", strerror(errno));
}
Mutex::~Mutex() {
if ( locked() )
Warning("Destroying mutex when locked");
if ( pthread_mutex_destroy(&mMutex) < 0 )
Error("Unable to destroy pthread mutex: %s", strerror(errno));
}
int Mutex::try_lock() {
return pthread_mutex_trylock(&mMutex);
}
void Mutex::lock() {
if ( pthread_mutex_lock(&mMutex) < 0 )
throw ThreadException(stringtf("Unable to lock pthread mutex: %s", strerror(errno)));
//Debug(3, "Lock");
}
bool Mutex::try_lock_for(int secs) {
struct timespec timeout = getTimeout(secs);
return pthread_mutex_timedlock(&mMutex, &timeout) == 0;
}
bool Mutex::try_lock_for(double secs) {
struct timespec timeout = getTimeout(secs);
return pthread_mutex_timedlock(&mMutex, &timeout) == 0;
}
void Mutex::unlock() {
if ( pthread_mutex_unlock(&mMutex) < 0 )
throw ThreadException(stringtf("Unable to unlock pthread mutex: %s", strerror(errno)));
//Debug(3, "unLock");
}
bool Mutex::locked() {
int state = pthread_mutex_trylock(&mMutex);
if ( (state != 0) && (state != EBUSY) )
throw ThreadException(stringtf("Unable to trylock pthread mutex: %s", strerror(errno)));
if ( state != EBUSY )
unlock();
return (state == EBUSY);
}
RecursiveMutex::RecursiveMutex() {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
if ( pthread_mutex_init(&mMutex, &attr) < 0 )
Error("Unable to create pthread mutex: %s", strerror(errno));
}
Condition::Condition( Mutex &mutex ) : mMutex( mutex ) {
if ( pthread_cond_init(&mCondition, nullptr) < 0 )
throw ThreadException(stringtf("Unable to create pthread condition: %s", strerror(errno)));
}
Condition::~Condition() {
if ( pthread_cond_destroy( &mCondition ) < 0 )
Error("Unable to destroy pthread condition: %s", strerror(errno));
}
void Condition::wait() {
// Locking done outside of this function
if ( pthread_cond_wait(&mCondition, mMutex.getMutex()) < 0 )
throw ThreadException(stringtf("Unable to wait pthread condition: %s", strerror(errno)));
}
bool Condition::wait(int secs) {
// Locking done outside of this function
Debug(8, "Waiting for %d seconds", secs);
struct timespec timeout = getTimeout(secs);
if (
( pthread_cond_timedwait(&mCondition, mMutex.getMutex(), &timeout) < 0 )
&&
( errno != ETIMEDOUT )
)
throw ThreadException(stringtf("Unable to timedwait pthread condition: %s", strerror(errno)));
return errno != ETIMEDOUT;
}
bool Condition::wait( double secs ) {
// Locking done outside of this function
struct timespec timeout = getTimeout( secs );
if (
(pthread_cond_timedwait( &mCondition, mMutex.getMutex(), &timeout ) < 0)
&&
(errno != ETIMEDOUT) )
throw ThreadException( stringtf( "Unable to timedwait pthread condition: %s", strerror(errno) ) );
return errno != ETIMEDOUT;
}
void Condition::signal() {
if ( pthread_cond_signal( &mCondition ) < 0 )
throw ThreadException( stringtf( "Unable to signal pthread condition: %s", strerror(errno) ) );
}
void Condition::broadcast() {
if ( pthread_cond_broadcast( &mCondition ) < 0 )
throw ThreadException( stringtf( "Unable to broadcast pthread condition: %s", strerror(errno) ) );
}
template <class T> const T ThreadData<T>::getValue() const {
mMutex.lock();
const T valueCopy = mValue;
mMutex.unlock();
return valueCopy;
}
template <class T> T ThreadData<T>::setValue(const T value) {
mMutex.lock();
const T valueCopy = mValue = value;
mMutex.unlock();
return valueCopy;
}
template <class T> const T ThreadData<T>::getUpdatedValue() const {
Debug(8, "Waiting for value update, %p", this);
mMutex.lock();
mChanged = false;
mCondition.wait();
const T valueCopy = mValue;
mMutex.unlock();
Debug(9, "Got value update, %p", this);
return valueCopy;
}
template <class T> const T ThreadData<T>::getUpdatedValue(double secs) const {
Debug(8, "Waiting for value update, %.2f secs, %p", secs, this);
mMutex.lock();
mChanged = false;
//do {
mCondition.wait(secs);
//} while ( !mChanged );
const T valueCopy = mValue;
mMutex.unlock();
Debug(9, "Got value update, %p", this);
return valueCopy;
}
template <class T> const T ThreadData<T>::getUpdatedValue(int secs) const {
Debug(8, "Waiting for value update, %d secs, %p", secs, this);
mMutex.lock();
mChanged = false;
//do {
mCondition.wait(secs);
//} while ( !mChanged );
const T valueCopy = mValue;
mMutex.unlock();
Debug(9, "Got value update, %p", this);
return valueCopy;
}
template <class T> void ThreadData<T>::updateValueSignal(const T value) {
Debug(8, "Updating value with signal, %p", this);
mMutex.lock();
mValue = value;
mChanged = true;
mCondition.signal();
mMutex.unlock();
Debug(9, "Updated value, %p", this);
}
template <class T> void ThreadData<T>::updateValueBroadcast( const T value ) {
Debug(8, "Updating value with broadcast, %p", this);
mMutex.lock();
mValue = value;
mChanged = true;
mCondition.broadcast();
mMutex.unlock();
Debug(9, "Updated value, %p", this);
}
Thread::Thread() :
mThreadCondition( mThreadMutex ),
mPid( -1 ),
mStarted( false ),
mRunning( false )
{
Debug( 1, "Creating thread" );
}
Thread::~Thread() {
Debug( 1, "Destroying thread %d", mPid );
if ( mStarted ) {
Warning("You should really join the thread before destroying it");
join();
}
}
void *Thread::mThreadFunc( void *arg ) {
Debug( 2, "Invoking thread" );
Thread *thisPtr = (Thread *)arg;
thisPtr->status = 0;
try {
thisPtr->mThreadMutex.lock();
thisPtr->mPid = thisPtr->id();
thisPtr->mThreadCondition.signal();
thisPtr->mThreadMutex.unlock();
thisPtr->mRunning = true;
Debug(2,"Runnning");
thisPtr->status = thisPtr->run();
thisPtr->mRunning = false;
Debug( 2, "Exiting thread, status %p", (void *)&(thisPtr->status) );
return (void *)&(thisPtr->status);
} catch ( const ThreadException &e ) {
Error( "%s", e.getMessage().c_str() );
thisPtr->mRunning = false;
Debug( 2, "Exiting thread after exception, status %p", (void *)-1 );
return (void *)-1;
}
}
void Thread::start() {
Debug(4, "Starting thread" );
if ( isThread() )
throw ThreadException("Can't self start thread");
mThreadMutex.lock();
if ( !mStarted ) {
pthread_attr_t threadAttrs;
pthread_attr_init( &threadAttrs );
pthread_attr_setscope( &threadAttrs, PTHREAD_SCOPE_SYSTEM );
mStarted = true;
if ( pthread_create( &mThread, &threadAttrs, mThreadFunc, this ) < 0 )
throw ThreadException( stringtf( "Can't create thread: %s", strerror(errno) ) );
pthread_attr_destroy( &threadAttrs );
} else {
Error( "Attempt to start already running thread %d", mPid );
}
mThreadCondition.wait();
mThreadMutex.unlock();
Debug(4, "Started thread %d", mPid);
}
void Thread::join() {
Debug(1, "Joining thread %d", mPid);
if ( isThread() )
throw ThreadException( "Can't self join thread" );
mThreadMutex.lock();
if ( mPid >= 0 ) {
if ( mStarted ) {
void *threadStatus = 0;
if ( pthread_join( mThread, &threadStatus ) < 0 )
throw ThreadException( stringtf( "Can't join sender thread: %s", strerror(errno) ) );
mStarted = false;
Debug( 1, "Thread %d exited, status %p", mPid, threadStatus );
} else {
Warning( "Attempt to join already finished thread %d", mPid );
}
} else {
Warning( "Attempt to join non-started thread %d", mPid );
}
mThreadMutex.unlock();
Debug( 1, "Joined thread %d", mPid );
}
void Thread::kill( int signal ) {
pthread_kill( mThread, signal );
}
// Some explicit template instantiations
#include "zm_threaddata.cpp"

View File

@ -1,266 +0,0 @@
//
// ZoneMinder Thread Class Interface, $Date$, $Revision$
// Copyright (C) 2001-2008 Philip Coombes
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
//
#ifndef ZM_THREAD_H
#define ZM_THREAD_H
#include "zm_config.h"
#include "zm_exception.h"
#include "zm_utils.h"
#include <pthread.h>
#include <unistd.h>
#ifdef HAVE_SYS_SYSCALL_H
#include <sys/syscall.h>
#endif // HAVE_SYS_SYSCALL_H
#ifdef __FreeBSD__
#include <sys/thr.h>
#endif
class ThreadException : public Exception {
private:
#ifndef SOLARIS
pid_t pid() {
pid_t tid;
#ifdef __FreeBSD__
long lwpid;
thr_self(&lwpid);
tid = lwpid;
#else
#ifdef __FreeBSD_kernel__
if ( (syscall(SYS_thr_self, &tid)) < 0 ) // Thread/Process id
# else
tid=syscall(SYS_gettid);
#endif
#endif
return tid;
}
#else
pthread_t pid() { return( pthread_self() ); }
#endif
public:
explicit ThreadException(const std::string &message) :
Exception(stringtf("(%d) ", (long int)pid())+message)
{
}
};
class Mutex {
friend class Condition;
protected:
pthread_mutex_t mMutex;
public:
Mutex();
~Mutex();
private:
pthread_mutex_t *getMutex() {
return &mMutex;
}
public:
int try_lock();
void lock();
bool try_lock_for(int secs);
bool try_lock_for(double secs);
void unlock();
bool locked();
};
class RecursiveMutex : public Mutex {
public:
RecursiveMutex();
};
class ScopedMutex {
private:
Mutex &mMutex;
public:
explicit ScopedMutex( Mutex &mutex ) : mMutex( mutex ) {
mMutex.lock();
}
~ScopedMutex() {
mMutex.unlock();
}
private:
ScopedMutex( const ScopedMutex & );
};
class Condition {
private:
Mutex &mMutex;
pthread_cond_t mCondition;
public:
explicit Condition(Mutex &mutex);
~Condition();
void wait();
bool wait(int secs);
bool wait(double secs);
void signal();
void broadcast();
};
class Semaphore : public Condition {
private:
Mutex mMutex;
public:
Semaphore() : Condition(mMutex) {
}
void wait() {
mMutex.lock();
Condition::wait();
mMutex.unlock();
}
bool wait(int secs) {
mMutex.lock();
bool result = Condition::wait(secs);
mMutex.unlock();
return result;
}
bool wait(double secs) {
mMutex.lock();
bool result = Condition::wait(secs);
mMutex.unlock();
return result;
}
void signal() {
mMutex.lock();
Condition::signal();
mMutex.unlock();
}
void broadcast() {
mMutex.lock();
Condition::broadcast();
mMutex.unlock();
}
};
template <class T> class ThreadData {
private:
T mValue;
mutable bool mChanged;
mutable Mutex mMutex;
mutable Condition mCondition;
public:
__attribute__((used)) ThreadData() :
mValue(0), mCondition(mMutex)
{
mChanged = false;
}
explicit __attribute__((used)) ThreadData(T value) :
mValue(value), mCondition(mMutex) {
mChanged = false;
}
__attribute__((used)) operator T() const {
return getValue();
}
__attribute__((used)) const T operator=( const T value ) {
return setValue(value);
}
__attribute__((used)) const T getValueImmediate() const {
return mValue;
}
__attribute__((used)) T setValueImmediate( const T value ) {
return mValue = value;
}
__attribute__((used)) const T getValue() const;
__attribute__((used)) T setValue( const T value );
__attribute__((used)) const T getUpdatedValue() const;
__attribute__((used)) const T getUpdatedValue(double secs) const;
__attribute__((used)) const T getUpdatedValue(int secs) const;
__attribute__((used)) void updateValueSignal(const T value);
__attribute__((used)) void updateValueBroadcast(const T value);
};
class Thread {
public:
typedef void *(*ThreadFunc)( void * );
protected:
pthread_t mThread;
Mutex mThreadMutex;
Condition mThreadCondition;
#ifndef SOLARIS
pid_t mPid;
#else
pthread_t mPid;
#endif
bool mStarted;
bool mRunning;
int status; // Used in various functions to get around return a local variable
protected:
Thread();
virtual ~Thread();
#ifndef SOLARIS
pid_t id() const {
pid_t tid;
#ifdef __FreeBSD__
long lwpid;
thr_self(&lwpid);
tid = lwpid;
#else
#ifdef __FreeBSD_kernel__
if ( (syscall(SYS_thr_self, &tid)) < 0 ) // Thread/Process id
#else
tid=syscall(SYS_gettid);
#endif
#endif
return tid;
}
#else
pthread_t id() const {
return pthread_self();
}
#endif
void exit( int p_status = 0 ) {
//INFO( "Exiting" );
pthread_exit( (void *)&p_status );
}
static void *mThreadFunc( void *arg );
public:
virtual int run() = 0;
void start();
void join();
void kill( int signal );
bool isThread() {
return( mPid > -1 && pthread_equal( pthread_self(), mThread ) );
}
bool isStarted() const { return mStarted; }
bool isRunning() const { return mRunning; }
};
#endif // ZM_THREAD_H

View File

@ -1,21 +0,0 @@
//
// ZoneMinder Explicit Thread Template Class Instantiations, $Date$, $Revision$
// Copyright (C) 2001-2008 Philip Coombes
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
//
template class ThreadData<bool>;
template class ThreadData<int>;

View File

@ -1,119 +0,0 @@
//
// ZoneMinder Timer Class Implementation, $Date$, $Revision$
// Copyright (C) 2001-2008 Philip Coombes
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
//
#include "zm_timer.h"
#include "zm_logger.h"
int Timer::TimerThread::mNextTimerId = 0;
Timer::TimerThread::TimerThread( Timer &timer, int duration, bool repeat ) :
mTimerId( 0 ),
mTimer( timer ),
mDuration( duration ),
mRepeat( repeat ),
mReset( false ),
mExpiryFlag( true )
{
mAccessMutex.lock();
mTimerId = mNextTimerId++;
Debug( 5, "Creating timer %d for %d seconds%s", mTimerId, mDuration, mRepeat?", repeating":"" );
mAccessMutex.unlock();
}
Timer::TimerThread::~TimerThread()
{
cancel();
}
void Timer::TimerThread::cancel()
{
mAccessMutex.lock();
if ( mRunning )
{
Debug( 4, "Cancelling timer %d", mTimerId );
mRepeat = false;
mReset = false;
mExpiryFlag.updateValueSignal( false );
}
mAccessMutex.unlock();
}
void Timer::TimerThread::reset()
{
mAccessMutex.lock();
if ( mRunning )
{
Debug( 4, "Resetting timer" );
mReset = true;
mExpiryFlag.updateValueSignal( false );
}
else
{
Error( "Attempting to reset expired timer %d", mTimerId );
}
mAccessMutex.unlock();
}
int Timer::TimerThread::run()
{
Debug( 4, "Starting timer %d for %d seconds", mTimerId, mDuration );
bool timerExpired = false;
do
{
mAccessMutex.lock();
mReset = false;
mExpiryFlag.setValue( true );
mAccessMutex.unlock();
timerExpired = mExpiryFlag.getUpdatedValue( mDuration );
mAccessMutex.lock();
if ( timerExpired )
{
Debug( 4, "Timer %d expired", mTimerId );
mTimer.expire();
}
else
{
Debug( 4, "Timer %d %s", mTimerId, mReset?"reset":"cancelled" );
}
mAccessMutex.unlock();
} while ( mRepeat || (mReset && !timerExpired) );
return( timerExpired );
}
Timer::Timer( int timeout, bool repeat ) : mTimerThread( *this, timeout, repeat )
{
mTimerThread.start();
}
Timer::~Timer()
{
//cancel();
}
void Timer::Timer::cancel()
{
mTimerThread.cancel();
}
void Timer::Timer::reset()
{
mTimerThread.reset();
}

View File

@ -1,110 +0,0 @@
//
// ZoneMinder Timer Class Interface, $Date$, $Revision$
// Copyright (C) 2001-2008 Philip Coombes
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
//
#ifndef ZM_TIMER_H
#define ZM_TIMER_H
#include "zm_exception.h"
#include "zm_thread.h"
#include "zm_utils.h"
#ifdef HAVE_SYS_SYSCALL_H
#include <sys/syscall.h>
#endif // HAVE_SYS_SYSCALL_H
class Timer
{
private:
class TimerException : public Exception
{
private:
#ifndef SOLARIS
pid_t pid() {
pid_t tid;
#ifdef __FreeBSD__
long lwpid;
thr_self(&lwpid);
tid = lwpid;
#else
#ifdef __FreeBSD_kernel__
if ( (syscall(SYS_thr_self, &tid)) < 0 ) // Thread/Process id
#else
tid=syscall(SYS_gettid);
#endif
#endif
return tid;
}
#else
pthread_t pid() { return( pthread_self() ); }
#endif
public:
explicit TimerException( const std::string &message ) : Exception( stringtf("(%d) ", (long int)pid())+message ) {
}
};
class TimerThread : public Thread
{
private:
typedef ThreadData<bool> ExpiryFlag;
private:
static int mNextTimerId;
private:
int mTimerId;
Timer &mTimer;
int mDuration;
int mRepeat;
int mReset;
ExpiryFlag mExpiryFlag;
Mutex mAccessMutex;
private:
void quit()
{
cancel();
}
public:
TimerThread( Timer &timer, int timeout, bool repeat );
~TimerThread();
void cancel();
void reset();
int run();
};
protected:
TimerThread mTimerThread;
protected:
Timer( int timeout, bool repeat=false );
public:
virtual ~Timer();
protected:
virtual void expire()=0;
public:
void cancel();
void reset();
};
#endif // ZM_TIMER_H

View File

@ -66,6 +66,7 @@ possible, this should run at more or less constant speed.
#include "zm_utils.h"
#include <getopt.h>
#include <iostream>
#include <unistd.h>
void Usage() {
fprintf(stderr, "zmc -d <device_path> or -r <proto> -H <host> -P <port> -p <path> or -f <file_path> or -m <monitor_id>\n");

View File

@ -25,6 +25,7 @@
#include "zm_eventstream.h"
#include "zm_fifo_stream.h"
#include <string>
#include <unistd.h>
bool ValidateAccess(User *user, int mon_id) {
bool allowed = true;

View File

@ -93,6 +93,7 @@ Options for use with monitors:
#include "zm_monitor.h"
#include "zm_local_camera.h"
#include <getopt.h>
#include <unistd.h>
void Usage(int status=-1) {
fputs(