diff --git a/libs/childNode.js b/libs/childNode.js index 470ef017..310e4d88 100644 --- a/libs/childNode.js +++ b/libs/childNode.js @@ -13,20 +13,26 @@ module.exports = function(s,config,lang,app,io){ const { initiateDataConnection, initiateVideoTransferConnection, - onWebSocketData, + onWebSocketDataFromChildNode, onDataConnectionDisconnect, + initiateVideoWriteFromChildNode, } = require('./childNode/utils.js')(s,config,lang,app,io) s.childNodes = {}; const childNodesConnectionIndex = {}; const childNodeHTTP = express(); const childNodeServer = http.createServer(app); const childNodeWebsocket = createWebSocketServer(); + const childNodeFileRelay = createWebSocketServer(); childNodeServer.on('upgrade', function upgrade(request, socket, head) { const pathname = url.parse(request.url).pathname; if (pathname === '/childNode') { childNodeWebsocket.handleUpgrade(request, socket, head, function done(ws) { childNodeWebsocket.emit('connection', ws, request) }) + } else if (pathname === '/childNodeFileRelay') { + childNodeFileRelay.handleUpgrade(request, socket, head, function done(ws) { + childNodeFileRelay.emit('connection', ws, request) + }) } else if (pathname.indexOf('/socket.io') > -1) { return; } else { @@ -47,26 +53,44 @@ module.exports = function(s,config,lang,app,io){ console.log('Connection to ws 8288') const connectionId = s.gid(10); client.id = connectionId; - function onConnection(d){ + function onAuthenticate(d){ const data = JSON.parse(d); const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1; if(!client.shinobiChildAlreadyRegistered && data.f === 'init' && childNodeKeyAccepted){ - const connectionAddress = initiateDataConnection(client,req,data); + const connectionAddress = initiateDataConnection(client,req,data,connectionId); childNodesConnectionIndex[connectionId] = client; - client.removeListener('message',onConnection) + client.removeListener('message',onAuthenticate) client.on('message',(d) => { const data = JSON.parse(d); - onWebSocketData(client,data) + onWebSocketDataFromChildNode(client,data) }) }else{ client.destroy() } } - client.on('message',onConnection) + client.on('message',onAuthenticate) client.on('disconnect',() => { onDataConnectionDisconnect(client, req) }) }) + childNodeFileRelay.on('connection', function (client, req) { + function onAuthenticate(d){ + const data = JSON.parse(d); + const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1; + if(!client.alreadyInitiated && data.fileType && childNodeKeyAccepted){ + client.alreadyInitiated = true; + client.removeListener('message',onAuthenticate) + switch(data.fileType){ + case'video': + initiateVideoWriteFromChildNode(client,data.options,data.connectionId) + break; + } + }else{ + client.destroy() + } + } + client.on('message',onAuthenticate) + }) }else //setup Child for childNodes if( diff --git a/libs/childNode/childUtils.js b/libs/childNode/childUtils.js index c62a363a..43e3e9a9 100644 --- a/libs/childNode/childUtils.js +++ b/libs/childNode/childUtils.js @@ -1,3 +1,5 @@ +const fs = require('fs'); +const { createWebSocketClient } = require('../basic/websocketTools.js') module.exports = function(s,config,lang,app,io){ const { cameraDestroy } = require('../monitor/utils.js')(s,config,lang) var checkCpuInterval = null; @@ -12,7 +14,8 @@ module.exports = function(s,config,lang,app,io){ break; case'init_success': s.connectedToMasterNode = true; - s.other_helpers=d.child_helpers; + s.other_helpers = d.child_helpers; + s.childNodeIdOnMasterNode = d.connectionId break; case'kill': s.initiateMonitorObject(d.d); @@ -28,7 +31,7 @@ module.exports = function(s,config,lang,app,io){ s.file('delete',s.dir.videos+d.ke+'/'+d.mid+'/'+d.file) break; case'deleteTimelapseFrame'://delete video - var filePath = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/` + d.file + var filePath = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/` + d.file s.file('delete',filePath) break; case'insertCompleted'://close video @@ -76,11 +79,44 @@ module.exports = function(s,config,lang,app,io){ cpu: parseFloat(cpu) }) } + function sendVideoToMasterNode(filePath,options){ + const response = {ok: true} + return new Promise((resolve,reject) => { + const fileTransferConnection = createWebSocketClient('ws://'+config.childNodes.host + '/childNodeFileRelay',{ + onMessage: () => {} + }) + fileTransferConnection.on('open', function(){ + fileTransferConnection.send(JSON.stringify({ + fileType: 'video', + options: options, + socketKey: config.childNodes.key, + connectionId: s.childNodeIdOnMasterNode, + })) + setTimeout(() => { + fs.createReadStream(filePath,{ highWaterMark: 500 }) + .on('data',function(data){ + console.log('Send Video Chunk',data.length) + fileTransferConnection.send(data) + }) + .on('close',function(){ + // clearTimeout(s.group[e.ke].activeMonitors[e.id].recordingChecker) + // clearTimeout(s.group[e.ke].activeMonitors[e.id].streamChecker) + // s.cx(Object.assign({},options,{ + // f:'created_file', + // })) + fileTransferConnection.close() + resolve(response) + }) + },2000) + }) + }) + } return { onDataFromMasterNode, initiateConnectionToMasterNode, onDisconnectFromMasterNode, destroyAllMonitorProcesses, sendCurrentCpuUsage, + sendVideoToMasterNode, } } diff --git a/libs/childNode/utils.js b/libs/childNode/utils.js index 68aeba4a..f060d541 100644 --- a/libs/childNode/utils.js +++ b/libs/childNode/utils.js @@ -6,7 +6,7 @@ module.exports = function(s,config,lang,app,io){ req.headers["'x-forwarded-for"] || req.connection.remoteAddress).replace('::ffff:',''); } - function initiateDataConnection(client,req,options){ + function initiateDataConnection(client,req,options,connectionId){ const ipAddress = getIpAddress(req) + ':' + options.port client.ip = ipAddress; client.shinobiChildAlreadyRegistered = true; @@ -27,16 +27,14 @@ module.exports = function(s,config,lang,app,io){ }) client.sendJson({ f : 'init_success', - childNodes : s.childNodes + childNodes : s.childNodes, + connectionId: connectionId, }) activeNode.coreCount = options.coreCount console.log('Initiated Child Node : ', ipAddress) return ipAddress } - function initiateVideoTransferConnection(){ - - } - function onWebSocketData(client,data){ + function onWebSocketDataFromChildNode(client,data){ const activeMonitor = data.ke && data.mid && s.group[data.ke] ? s.group[data.ke].activeMonitors[data.mid] : null; const ipAddress = client.ip; switch(data.f){ @@ -96,58 +94,6 @@ module.exports = function(s,config,lang,app,io){ ke: data.ke },data.queryInfo) break; - case'created_file_chunk': - if(!activeMonitor.childNodeStreamWriters[data.filename]){ - data.dir = s.getVideoDirectory(s.group[data.ke].rawMonitorConfigurations[data.mid]) - if (!fs.existsSync(data.dir)) { - fs.mkdirSync(data.dir, {recursive: true}, (err) => {s.debugLog(err)}) - } - activeMonitor.childNodeStreamWriters[data.filename] = fs.createWriteStream(data.dir+data.filename) - } - activeMonitor.childNodeStreamWriters[data.filename].write(data.chunk) - break; - case'created_file': - if(!activeMonitor.childNodeStreamWriters[data.filename]){ - return console.log('FILE NOT EXIST') - } - activeMonitor.childNodeStreamWriters[data.filename].end(); - client.sendJson({ - f:'delete', - file:data.filename, - ke:data.ke, - mid:data.mid - }) - s.txWithSubPermissions({ - f:'video_build_success', - hrefNoAuth:'/videos/'+data.ke+'/'+data.mid+'/'+data.filename, - filename:data.filename, - mid:data.mid, - ke:data.ke, - time:data.time, - size:data.filesize, - end:data.end - },'GRP_'+data.ke,'video_view') - //save database row - var insert = { - startTime : data.time, - filesize : data.filesize, - endTime : data.end, - dir : s.getVideoDirectory(data.d), - file : data.filename, - filename : data.filename, - filesizeMB : parseFloat((data.filesize/1048576).toFixed(2)) - } - s.insertDatabaseRow(data.d,insert) - s.insertCompletedVideoExtensions.forEach(function(extender){ - extender(data.d,insert) - }) - //purge over max - s.purgeDiskForGroup(data.ke) - //send new diskUsage values - s.setDiskUsedForGroup(data.ke,insert.filesizeMB) - clearTimeout(activeMonitor.recordingChecker) - clearTimeout(activeMonitor.streamChecker) - break; } } function onDataConnectionDisconnect(client, req){ @@ -179,10 +125,77 @@ module.exports = function(s,config,lang,app,io){ s.childNodes[ipAddress].dead = true } } + function initiateVideoWriteFromChildNode(client,data,connectionId){ + const response = {ok: true} + return new Promise((resolve,reject) => { + const groupKey = data.ke + const monitorId = data.mid + const filename = data.filename + const activeMonitor = s.group[groupKey].activeMonitors[monitorId] + const monitorConfig = s.group[groupKey].rawMonitorConfigurations[monitorId] + const fileWritePath = s.getVideoDirectory(monitorConfig) + filename + const writeStream = fs.createWriteStream(fileWritePath) + const videoDirectory = s.getVideoDirectory(monitorConfig) + if (!fs.existsSync(videoDirectory)) { + fs.mkdirSync(videoDirectory, {recursive: true}, (err) => {s.debugLog(err)}) + } + activeMonitor.childNodeStreamWriters[filename] = writeStream + client.on('message',(d) => { + writeStream.write(d) + }) + client.on('close',(d) => { + setTimeout(() => { + if(!activeMonitor.childNodeStreamWriters[filename]){ + return console.log('FILE NOT EXIST') + } + activeMonitor.childNodeStreamWriters[filename].end(); + //delete video file from child node + s.cx({ + f: 'delete', + file: filename, + ke: data.ke, + mid: data.mid + },connectionId) + // + s.txWithSubPermissions({ + f:'video_build_success', + hrefNoAuth:'/videos/'+data.ke+'/'+data.mid+'/'+filename, + filename:filename, + mid:data.mid, + ke:data.ke, + time:data.time, + size:data.filesize, + end:data.end + },'GRP_'+data.ke,'video_view') + //save database row + var insert = { + startTime : data.time, + filesize : data.filesize, + endTime : data.end, + dir : s.getVideoDirectory(data.d), + file : filename, + filename : filename, + filesizeMB : parseFloat((data.filesize/1048576).toFixed(2)) + } + s.insertDatabaseRow(data.d,insert) + s.insertCompletedVideoExtensions.forEach(function(extender){ + extender(data.d,insert) + }) + //purge over max + s.purgeDiskForGroup(data.ke) + //send new diskUsage values + s.setDiskUsedForGroup(data.ke,insert.filesizeMB) + clearTimeout(activeMonitor.recordingChecker) + clearTimeout(activeMonitor.streamChecker) + resolve(response) + },2000) + }) + }) + } return { initiateDataConnection, - initiateVideoTransferConnection, - onWebSocketData, + onWebSocketDataFromChildNode, onDataConnectionDisconnect, + initiateVideoWriteFromChildNode, } } diff --git a/libs/videos.js b/libs/videos.js index 6fd64e28..9001bb90 100644 --- a/libs/videos.js +++ b/libs/videos.js @@ -2,6 +2,9 @@ var fs = require('fs'); var exec = require('child_process').exec; var spawn = require('child_process').spawn; module.exports = function(s,config,lang){ + const { + sendVideoToMasterNode, + } = require('./childNode/childUtils.js')(s,config,lang) /** * Gets the video directory of the supplied video or monitor database row. * @constructor @@ -145,20 +148,8 @@ module.exports = function(s,config,lang){ end: s.timeObject(k.endTime).format('YYYY-MM-DD HH:mm:ss') } if(config.childNodes.enabled === true && config.childNodes.mode === 'child' && config.childNodes.host){ - fs.createReadStream(k.dir+k.filename,{ highWaterMark: 500 }) - .on('data',function(data){ - s.cx(Object.assign(response,{ - f:'created_file_chunk', - chunk: data, - })) - }) - .on('close',function(){ - clearTimeout(s.group[e.ke].activeMonitors[e.id].recordingChecker) - clearTimeout(s.group[e.ke].activeMonitors[e.id].streamChecker) - s.cx(Object.assign(response,{ - f:'created_file', - })) - }) + var filePath = k.dir + k.filename; + sendVideoToMasterNode(filePath,response) }else{ var href = '/videos/'+e.ke+'/'+e.mid+'/'+k.filename if(config.useUTC === true)href += '?isUTC=true';