From 623e916d24cec8698486ed32d8123f8a57a7219a Mon Sep 17 00:00:00 2001 From: Moe Date: Wed, 17 Oct 2018 21:42:32 -0700 Subject: [PATCH] Bug fixes for childNodes --- camera.js | 2 + libs/childNode.js | 362 ++++++++++++++++++++++++---------------------- libs/events.js | 15 +- libs/monitor.js | 23 ++- libs/socketio.js | 4 +- libs/startup.js | 3 +- libs/videos.js | 10 +- 7 files changed, 230 insertions(+), 189 deletions(-) diff --git a/camera.js b/camera.js index e42fb169..e2227c8e 100644 --- a/camera.js +++ b/camera.js @@ -12,6 +12,8 @@ var os = require('os'); var io = new (require('socket.io'))() // s = Shinobi s = { + //Total Memory + coreCount : os.cpus().length, //Total Memory totalmem : os.totalmem(), //Check Platform diff --git a/libs/childNode.js b/libs/childNode.js index 6a80802c..19a55472 100644 --- a/libs/childNode.js +++ b/libs/childNode.js @@ -1,184 +1,200 @@ +var fs = require('fs'); var http = require('http'); var https = require('https'); var express = require('express'); module.exports = function(s,config,lang,app,io){ - //setup Master for childNodes - if(config.childNodes.enabled === true && config.childNodes.mode === 'master'){ - s.childNodes = {}; - var childNodeHTTP = express(); - var childNodeServer = http.createServer(app); - var childNodeWebsocket = new (require('socket.io'))() - childNodeServer.listen(config.childNodes.port,config.bindip,function(){ - console.log(lang.Shinobi+' - CHILD NODE PORT : '+config.childNodes.port); - }); - childNodeWebsocket.attach(childNodeServer); - //send data to child node function (experimental) - s.cx = function(z,y,x){if(!z.mid && !z.d){ - var err = new Error(); - console.log(err.stack); - };if(x){return x.broadcast.to(y).emit('c',z)};childNodeWebsocket.to(y).emit('c',z);} - //child Node Websocket - childNodeWebsocket.on('connection', function (cn) { - //functions for dispersing work to child servers; - cn.on('c',function(d){ - if(config.childNodes.key.indexOf(d.socketKey) > -1){ - if(!cn.shinobi_child&&d.f=='init'){ - cn.ip = cn.request.connection.remoteAddress.replace('::ffff:','')+':'+d.port - cn.shinobi_child = 1 - tx = function(z){ - cn.emit('c',z) + try{ + //setup Master for childNodes + if(config.childNodes.enabled === true && config.childNodes.mode === 'master'){ + s.childNodes = {}; + var childNodeHTTP = express(); + var childNodeServer = http.createServer(app); + var childNodeWebsocket = new (require('socket.io'))() + childNodeServer.listen(config.childNodes.port,config.bindip,function(){ + console.log(lang.Shinobi+' - CHILD NODE PORT : '+config.childNodes.port); + }); + console.log('childNodeWebsocket.attach(childNodeServer)') + childNodeWebsocket.attach(childNodeServer); + //send data to child node function (experimental) + s.cx = function(z,y,x){if(!z.mid && !z.d){ + var err = new Error(); + console.log(err.stack); + }; + if(x){return x.broadcast.to(y).emit('c',z)};childNodeWebsocket.to(y).emit('c',z);} + //child Node Websocket + childNodeWebsocket.on('connection', function (cn) { + //functions for dispersing work to child servers; + cn.on('c',function(d){ + if(config.childNodes.key.indexOf(d.socketKey) > -1){ + if(!cn.shinobi_child&&d.f=='init'){ + cn.ip = cn.request.connection.remoteAddress.replace('::ffff:','')+':'+d.port + cn.shinobi_child = 1 + tx = function(z){ + cn.emit('c',z) + } + if(!s.childNodes[cn.ip]){ + s.childNodes[cn.ip] = {} + }; + s.childNodes[cn.ip].cnid = cn.id + s.childNodes[cn.ip].cpu = 0 + s.childNodes[cn.ip].activeCameras = {} + d.availableHWAccels.forEach(function(accel){ + if(config.availableHWAccels.indexOf(accel) === -1)config.availableHWAccels.push(accel) + }) + + tx({ + f : 'init_success', + childNodes : s.childNodes + }) + s.childNodes[cn.ip].coreCount = d.coreCount + }else{ + switch(d.f){ + case'cpu': + s.childNodes[cn.ip].cpu = d.cpu; + break; + case'sql': + s.sqlQuery(d.query,d.values,function(err,rows){ + cn.emit('c',{f:'sqlCallback',rows:rows,err:err,callbackId:d.callbackId}); + }); + break; + case'camera': + s.camera(d.mode,d.data) + break; + case's.tx': + s.tx(d.data,d.to) + break; + case's.userLog': + if(!d.mon || !d.data)return console.log('LOG DROPPED',d.mon,d.data); + s.userLog(d.mon,d.data) + break; + case'created_file_chunk': + if(!s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename]){ + d.dir = s.getVideoDirectory(s.group[d.ke].mon_conf[d.mid]) + s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename] = fs.createWriteStream(d.dir+d.filename) + } + s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename].write(d.chunk) + break; + case'created_file': + if(!s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename]){ + return console.log('FILE NOT EXIST') + } + s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename].end(); + tx({ + f:'delete', + file:d.filename, + ke:d.ke, + mid:d.mid + }); + s.txWithSubPermissions({ + f:'video_build_success', + hrefNoAuth:'/videos/'+d.ke+'/'+d.mid+'/'+d.filename, + filename:d.filename, + mid:d.mid, + ke:d.ke, + time:d.startTime, + size:d.filesize, + end:d.endTime + },'GRP_'+d.ke,'video_view'); + clearTimeout(s.group[d.ke].mon[d.mid].recordingChecker) + clearTimeout(s.group[d.ke].mon[d.mid].streamChecker) + break; + } } - if(!s.childNodes[cn.ip]){ - s.childNodes[cn.ip] = {} - }; - s.childNodes[cn.ip].cnid = cn.id - s.childNodes[cn.ip].cpu = 0 - s.childNodes[cn.ip].activeCameras = {} - tx({ - f : 'init_success', - childNodes : s.childNodes + } + }) + cn.on('disconnect',function(){ + console.log('childNodeWebsocket.disconnect') + + if(s.childNodes[cn.ip]){ + var activeCameraKeys = Object.keys(s.childNodes[cn.ip].activeCameras) + activeCameraKeys.forEach(function(key){ + var monitor = s.childNodes[cn.ip].activeCameras[key] + s.camera('stop',s.cleanMonitorObject(monitor)) + delete(s.group[monitor.ke].mon[monitor.mid].childNode) + delete(s.group[monitor.ke].mon[monitor.mid].childNodeId) + setTimeout(function(){ + s.camera(monitor.mode,s.cleanMonitorObject(monitor)) + },1300) }) - }else{ - switch(d.f){ - case'cpu': - s.childNodes[cn.ip].cpu = d.cpu; - break; - case'sql': - s.sqlQuery(d.query,d.values,function(err,rows){ - cn.emit('c',{f:'sqlCallback',rows:rows,err:err,callbackId:d.callbackId}); - }); - break; - case'camera': - s.camera(d.mode,d.data) - break; - case's.tx': - s.tx(d.data,d.to) - break; - case's.userLog': - if(!d.mon || !d.data)return console.log('LOG DROPPED',d.mon,d.data); - s.userLog(d.mon,d.data) - break; - case'created_file_chunk': - if(!s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename]){ - d.dir = s.getVideoDirectory(s.group[d.ke].mon_conf[d.mid]) - s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename] = fs.createWriteStream(d.dir+d.filename) - } - s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename].write(d.chunk) - break; - case'created_file': - if(!s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename]){ - return console.log('FILE NOT EXIST') - } - s.group[d.ke].mon[d.mid].childNodeStreamWriters[d.filename].end(); - tx({ - f:'delete', - file:d.filename, - ke:d.ke, - mid:d.mid - }); - s.txWithSubPermissions({ - f:'video_build_success', - hrefNoAuth:'/videos/'+d.ke+'/'+d.mid+'/'+d.filename, - filename:d.filename, - mid:d.mid, - ke:d.ke, - time:d.startTime, - size:d.filesize, - end:d.endTime - },'GRP_'+d.ke,'video_view'); - clearTimeout(s.group[d.ke].mon[d.mid].recordingChecker) - clearTimeout(s.group[d.ke].mon[d.mid].streamChecker) - break; - } + delete(s.childNodes[cn.ip]); } - } + }) }) - cn.on('disconnect',function(){ - if(s.childNodes[cn.ip]){ - var activeCameraKeys = Object.keys(s.childNodes[cn.ip].activeCameras) - activeCameraKeys.forEach(function(key){ - var monitor = s.childNodes[cn.ip].activeCameras[key] - s.camera('stop',s.cleanMonitorObject(monitor)) - delete(s.group[monitor.ke].mon[monitor.mid].childNode) - delete(s.group[monitor.ke].mon[monitor.mid].childNodeId) - setTimeout(function(){ - s.camera(monitor.mode,s.cleanMonitorObject(monitor)) - },1300) - }) - delete(s.childNodes[cn.ip]); - } - }) - }) - }else - //setup Child for childNodes - if(config.childNodes.enabled === true && config.childNodes.mode === 'child' && config.childNodes.host){ - s.connected = false; - childIO = require('socket.io-client')('ws://'+config.childNodes.host); - s.cx = function(x){x.socketKey = config.childNodes.key;childIO.emit('c',x)} - s.tx = function(x,y){s.cx({f:'s.tx',data:x,to:y})} - s.userLog = function(x,y){s.cx({f:'s.userLog',mon:x,data:y})} - s.queuedSqlCallbacks = {} - s.sqlQuery = function(query,values,onMoveOn){ - var callbackId = s.gid() - if(!values){values=[]} - if(typeof values === 'function'){ - var onMoveOn = values; - var values = []; - } - if(typeof onMoveOn !== 'function'){onMoveOn=function(){}} - s.queuedSqlCallbacks[callbackId] = onMoveOn - s.cx({f:'sql',query:query,values:values,callbackId:callbackId}); - } - setInterval(function(){ - s.cpuUsage(function(cpu){ - io.emit('c',{f:'cpu',cpu:parseFloat(cpu)}); - }) - },2000); - childIO.on('connect', function(d){ - console.log('CHILD CONNECTION SUCCESS') - s.cx({ - f : 'init', - port : config.port - }) - }) - childIO.on('c', function (d) { - switch(d.f){ - case'sqlCallback': - if(s.queuedSqlCallbacks[d.callbackId]){ - s.queuedSqlCallbacks[d.callbackId](d.err,d.rows) - delete(s.queuedSqlCallbacks[d.callbackId]) - } - break; - case'init_success': - s.connected=true; - s.other_helpers=d.child_helpers; - break; - case'kill': - s.initiateMonitorObject(d.d); - s.cameraDestroy(s.group[d.d.ke].mon[d.d.id].spawn,d.d) - break; - case'sync': - s.initiateMonitorObject(d.sync); - Object.keys(d.sync).forEach(function(v){ - s.group[d.sync.ke].mon[d.sync.mid][v]=d.sync[v]; - }); - break; - case'delete'://delete video - s.file('delete',s.dir.videos+d.ke+'/'+d.mid+'/'+d.file) - break; - case'insertCompleted'://close video - s.insertCompletedVideo(d.d,d.k) - break; - case'cameraStop'://start camera - s.camera('stop',d.d) - break; - case'cameraStart'://start or record camera - s.camera(d.mode,d.d) - break; - } - }) - childIO.on('disconnect',function(d){ + }else + //setup Child for childNodes + if(config.childNodes.enabled === true && config.childNodes.mode === 'child' && config.childNodes.host){ s.connected = false; - }) + childIO = require('socket.io-client')('ws://'+config.childNodes.host); + s.cx = function(x){x.socketKey = config.childNodes.key;childIO.emit('c',x)} + s.tx = function(x,y){s.cx({f:'s.tx',data:x,to:y})} + s.userLog = function(x,y){s.cx({f:'s.userLog',mon:x,data:y})} + s.queuedSqlCallbacks = {} + s.sqlQuery = function(query,values,onMoveOn){ + var callbackId = s.gid() + if(!values){values=[]} + if(typeof values === 'function'){ + var onMoveOn = values; + var values = []; + } + if(typeof onMoveOn !== 'function'){onMoveOn=function(){}} + s.queuedSqlCallbacks[callbackId] = onMoveOn + s.cx({f:'sql',query:query,values:values,callbackId:callbackId}); + } + setInterval(function(){ + s.cpuUsage(function(cpu){ + io.emit('c',{f:'cpu',cpu:parseFloat(cpu)}); + }) + },2000); + childIO.on('connect', function(d){ + console.log('CHILD CONNECTION SUCCESS') + s.cx({ + f : 'init', + port : config.port, + coreCount : s.coreCount, + availableHWAccels : config.availableHWAccels + }) + }) + childIO.on('c', function (d) { + switch(d.f){ + case'sqlCallback': + if(s.queuedSqlCallbacks[d.callbackId]){ + s.queuedSqlCallbacks[d.callbackId](d.err,d.rows) + delete(s.queuedSqlCallbacks[d.callbackId]) + } + break; + case'init_success': + s.connected=true; + s.other_helpers=d.child_helpers; + break; + case'kill': + s.initiateMonitorObject(d.d); + s.cameraDestroy(s.group[d.d.ke].mon[d.d.id].spawn,d.d) + break; + case'sync': + s.initiateMonitorObject(d.sync); + Object.keys(d.sync).forEach(function(v){ + s.group[d.sync.ke].mon[d.sync.mid][v]=d.sync[v]; + }); + break; + case'delete'://delete video + s.file('delete',s.dir.videos+d.ke+'/'+d.mid+'/'+d.file) + break; + case'insertCompleted'://close video + s.insertCompletedVideo(d.d,d.k) + break; + case'cameraStop'://start camera + s.camera('stop',d.d) + break; + case'cameraStart'://start or record camera + s.camera(d.mode,d.d) + break; + } + }) + childIO.on('disconnect',function(d){ + s.connected = false; + }) + } + }catch(err){ + console.log(err) } } diff --git a/libs/events.js b/libs/events.js index e471bc85..e22819e9 100644 --- a/libs/events.js +++ b/libs/events.js @@ -332,12 +332,23 @@ module.exports = function(s,config,lang){ s.group[d.ke].mon[d.id].eventBasedRecording.allowEnd=true; },detector_timeout * 950 * 60) if(!s.group[d.ke].mon[d.id].eventBasedRecording.process){ + if(!d.auth){ + d.auth = s.gid(60) + } + if(!s.api[d.auth]){ + s.api[d.auth] = { + system: 1, + ip: '0.0.0.0', + details: {}, + lang: lang + } + } s.group[d.ke].mon[d.id].eventBasedRecording.allowEnd = false; var runRecord = function(){ var filename = s.formattedTime()+'.mp4' s.userLog(d,{type:"Traditional Recording",msg:"Started"}) //-t 00:'+s.timeObject(new Date(detector_timeout * 1000 * 60)).format('mm:ss')+' - s.group[d.ke].mon[d.id].eventBasedRecording.process = spawn(config.ffmpegDir,s.splitForFFPMEG(('-loglevel warning -analyzeduration 1000000 -probesize 1000000 -re -i "'+s.dir.streams+d.ke+'/'+d.id+'/detectorStream.m3u8" -t 00:'+s.timeObject(new Date(detector_timeout * 1000 * 60)).format('mm:ss')+' -c:v copy -strftime 1 "'+s.getVideoDirectory(d.mon) + filename + '"').replace(/\s+/g,' ').trim())) + s.group[d.ke].mon[d.id].eventBasedRecording.process = spawn(config.ffmpegDir,s.splitForFFPMEG(('-loglevel warning -analyzeduration 1000000 -probesize 1000000 -re -i http://'+config.ip+':'+config.port+'/'+d.auth+'/hls/'+d.ke+'/'+d.id+'/detectorStream.m3u8 -t 00:'+s.timeObject(new Date(detector_timeout * 1000 * 60)).format('mm:ss')+' -c:v copy -strftime 1 "'+s.getVideoDirectory(d.mon) + filename + '"').replace(/\s+/g,' ').trim())) var ffmpegError=''; var error s.group[d.ke].mon[d.id].eventBasedRecording.process.stderr.on('data',function(data){ @@ -353,7 +364,7 @@ module.exports = function(s,config,lang){ file : filename }) s.userLog(d,{type:"Traditional Recording",msg:"Detector Recording Complete"}) - delete(s.group[d.ke].users[d.auth]) + delete(s.api[d.auth]) s.userLog(d,{type:"Traditional Recording",msg:'Clear Recorder Process'}) delete(s.group[d.ke].mon[d.id].eventBasedRecording.process) delete(s.group[d.ke].mon[d.id].eventBasedRecording.timeout) diff --git a/libs/monitor.js b/libs/monitor.js index 4cfa59d8..e4864970 100644 --- a/libs/monitor.js +++ b/libs/monitor.js @@ -232,6 +232,9 @@ module.exports = function(s,config,lang){ delete(s.group[e.ke].mon[e.id].lastJpegDetectorFrame); clearTimeout(s.group[e.ke].mon[e.id].recordingSnapper); clearInterval(s.group[e.ke].mon[e.id].getMonitorCpuUsage); + if(s.group[e.ke].mon[e.id].onChildNodeExit){ + s.group[e.ke].mon[e.id].onChildNodeExit() + } if(s.group[e.ke].mon[e.id].mp4frag){ var mp4FragChannels = Object.keys(s.group[e.ke].mon[e.id].mp4frag) mp4FragChannels.forEach(function(channel){ @@ -1108,17 +1111,28 @@ module.exports = function(s,config,lang){ } try{ if(config.childNodes.enabled === true && config.childNodes.mode === 'master'){ + var copiedMonitorObject = s.cleanMonitorObject(s.group[e.ke].mon_conf[e.id]) var childNodeList = Object.keys(s.childNodes) if(childNodeList.length > 0){ e.childNodeFound = false + var selectNode = function(ip){ + e.childNodeFound = true + e.childNodeSelected = ip + // s.childNodes[ip].coreCount + s.group[e.ke].mon[e.id].onChildNodeExit = function(){ + if(s.childNodes[ip])delete(s.childNodes[ip].activeCameras[e.ke+e.id]) + } + } + var nodeWithLowestActiveCamerasCount = 65535 + var nodeWithLowestActiveCameras = null childNodeList.forEach(function(ip){ - if(e.childNodeFound === false && s.childNodes[ip].cpu < 80){ - e.childNodeFound = true - e.childNodeSelected = ip + if(Object.keys(s.childNodes[ip].activeCameras).length < nodeWithLowestActiveCamerasCount){ + nodeWithLowestActiveCameras = ip } }) + if(nodeWithLowestActiveCameras)selectNode(nodeWithLowestActiveCameras) if(e.childNodeFound === true){ - s.childNodes[e.childNodeSelected].activeCameras[e.ke+e.id] = s.cleanMonitorObject(s.group[e.ke].mon_conf[e.id]); + s.childNodes[e.childNodeSelected].activeCameras[e.ke+e.id] = copiedMonitorObject s.group[e.ke].mon[e.id].childNode = e.childNodeSelected s.group[e.ke].mon[e.id].childNodeId = s.childNodes[e.childNodeSelected].cnid; s.cx({f:'sync',sync:s.group[e.ke].mon_conf[e.id],ke:e.ke,mid:e.id},s.group[e.ke].mon[e.id].childNodeId); @@ -1138,7 +1152,6 @@ module.exports = function(s,config,lang){ } } s.fatalCameraError = function(e,errorMessage){ - s.debugLog(errorMessage) clearTimeout(s.group[e.ke].mon[e.id].err_fatal_timeout); ++e.errorFatalCount; if(s.group[e.ke].mon[e.id].isStarted === true){ diff --git a/libs/socketio.js b/libs/socketio.js index 25e2de05..de8e735a 100644 --- a/libs/socketio.js +++ b/libs/socketio.js @@ -261,7 +261,7 @@ module.exports = function(s,config,lang,io){ tx({time:toUTC(),buffer:buffer}) }) } - if(s.group[d.ke]&&s.group[d.ke].users&&s.group[d.ke].users[d.auth]){ + if(s.group[d.ke] && s.group[d.ke].users && s.group[d.ke].users[d.auth]){ d.success(s.group[d.ke].users[d.auth]); }else{ s.sqlQuery('SELECT ke,uid,auth,mail,details FROM Users WHERE ke=? AND auth=? AND uid=?',[d.ke,d.auth,d.uid],function(err,r) { @@ -452,7 +452,7 @@ module.exports = function(s,config,lang,io){ apis:rrr, os:{ platform:s.platform, - cpuCount:os.cpus().length, + cpuCount:s.coreCount, totalmem:s.totalmem } }) diff --git a/libs/startup.js b/libs/startup.js index 83f7a5b2..453c5ec8 100644 --- a/libs/startup.js +++ b/libs/startup.js @@ -24,11 +24,10 @@ module.exports = function(s,config,lang,io){ if(!orphanedVideosForMonitors[monitor.ke])orphanedVideosForMonitors[monitor.ke] = {} if(!orphanedVideosForMonitors[monitor.ke][monitor.mid])orphanedVideosForMonitors[monitor.ke][monitor.mid] = 0 s.initiateMonitorObject(monitor) - s.orphanedVideoCheck(monitor,4,function(orphanedFilesCount){ + s.orphanedVideoCheck(monitor,2,function(orphanedFilesCount){ if(orphanedFilesCount){ orphanedVideosForMonitors[monitor.ke][monitor.mid] += orphanedFilesCount } - monitor.details = monitor.details s.group[monitor.ke].mon_conf[monitor.mid] = monitor var monObj = Object.assign(monitor,{id : monitor.mid}) s.camera(monitor.mode,monObj) diff --git a/libs/videos.js b/libs/videos.js index 392fa587..41cd397c 100644 --- a/libs/videos.js +++ b/libs/videos.js @@ -146,6 +146,10 @@ module.exports = function(s,config,lang){ size:k.filesize, end:k.endTime },'GRP_'+e.ke,'video_view') + //purge over max + s.purgeDiskForGroup(e) + //send new diskUsage values + s.setDiskUsedForGroup(e,k.filesizeMB) } s.insertCompletedVideoExtensions.forEach(function(extender){ extender(e,k) @@ -171,10 +175,6 @@ module.exports = function(s,config,lang){ }) }) - //purge over max - s.purgeDiskForGroup(e) - //send new diskUsage values - s.setDiskUsedForGroup(e,k.filesizeMB) } } } @@ -276,7 +276,7 @@ module.exports = function(s,config,lang){ var videosDirectory = s.getVideoDirectory(monitor)// + s.formattedTime(video.time) + '.' + video.ext fs.readdir(videosDirectory,function(err,files){ if(files && files.length > 0){ - var fiveRecentFiles = files.sort().slice(0,config.orphanedVideoCheckMax) + var fiveRecentFiles = files.slice(files.length - checkMax,files.length) var completedFile = 0 var orphanedFilesCount = 0 var fileComplete = function(){