Child Node use plain buffer data for video transfer to Master Node

montage-api
Moe 2021-11-26 14:54:04 -08:00
parent 6ec1fcb9fe
commit a717084e7d
4 changed files with 146 additions and 82 deletions

View File

@ -13,20 +13,26 @@ module.exports = function(s,config,lang,app,io){
const {
initiateDataConnection,
initiateVideoTransferConnection,
onWebSocketData,
onWebSocketDataFromChildNode,
onDataConnectionDisconnect,
initiateVideoWriteFromChildNode,
} = require('./childNode/utils.js')(s,config,lang,app,io)
s.childNodes = {};
const childNodesConnectionIndex = {};
const childNodeHTTP = express();
const childNodeServer = http.createServer(app);
const childNodeWebsocket = createWebSocketServer();
const childNodeFileRelay = createWebSocketServer();
childNodeServer.on('upgrade', function upgrade(request, socket, head) {
const pathname = url.parse(request.url).pathname;
if (pathname === '/childNode') {
childNodeWebsocket.handleUpgrade(request, socket, head, function done(ws) {
childNodeWebsocket.emit('connection', ws, request)
})
} else if (pathname === '/childNodeFileRelay') {
childNodeFileRelay.handleUpgrade(request, socket, head, function done(ws) {
childNodeFileRelay.emit('connection', ws, request)
})
} else if (pathname.indexOf('/socket.io') > -1) {
return;
} else {
@ -47,26 +53,44 @@ module.exports = function(s,config,lang,app,io){
console.log('Connection to ws 8288')
const connectionId = s.gid(10);
client.id = connectionId;
function onConnection(d){
function onAuthenticate(d){
const data = JSON.parse(d);
const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1;
if(!client.shinobiChildAlreadyRegistered && data.f === 'init' && childNodeKeyAccepted){
const connectionAddress = initiateDataConnection(client,req,data);
const connectionAddress = initiateDataConnection(client,req,data,connectionId);
childNodesConnectionIndex[connectionId] = client;
client.removeListener('message',onConnection)
client.removeListener('message',onAuthenticate)
client.on('message',(d) => {
const data = JSON.parse(d);
onWebSocketData(client,data)
onWebSocketDataFromChildNode(client,data)
})
}else{
client.destroy()
}
}
client.on('message',onConnection)
client.on('message',onAuthenticate)
client.on('disconnect',() => {
onDataConnectionDisconnect(client, req)
})
})
childNodeFileRelay.on('connection', function (client, req) {
function onAuthenticate(d){
const data = JSON.parse(d);
const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1;
if(!client.alreadyInitiated && data.fileType && childNodeKeyAccepted){
client.alreadyInitiated = true;
client.removeListener('message',onAuthenticate)
switch(data.fileType){
case'video':
initiateVideoWriteFromChildNode(client,data.options,data.connectionId)
break;
}
}else{
client.destroy()
}
}
client.on('message',onAuthenticate)
})
}else
//setup Child for childNodes
if(

View File

@ -1,3 +1,5 @@
const fs = require('fs');
const { createWebSocketClient } = require('../basic/websocketTools.js')
module.exports = function(s,config,lang,app,io){
const { cameraDestroy } = require('../monitor/utils.js')(s,config,lang)
var checkCpuInterval = null;
@ -12,7 +14,8 @@ module.exports = function(s,config,lang,app,io){
break;
case'init_success':
s.connectedToMasterNode = true;
s.other_helpers=d.child_helpers;
s.other_helpers = d.child_helpers;
s.childNodeIdOnMasterNode = d.connectionId
break;
case'kill':
s.initiateMonitorObject(d.d);
@ -28,7 +31,7 @@ module.exports = function(s,config,lang,app,io){
s.file('delete',s.dir.videos+d.ke+'/'+d.mid+'/'+d.file)
break;
case'deleteTimelapseFrame'://delete video
var filePath = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/` + d.file
var filePath = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/` + d.file
s.file('delete',filePath)
break;
case'insertCompleted'://close video
@ -76,11 +79,44 @@ module.exports = function(s,config,lang,app,io){
cpu: parseFloat(cpu)
})
}
function sendVideoToMasterNode(filePath,options){
const response = {ok: true}
return new Promise((resolve,reject) => {
const fileTransferConnection = createWebSocketClient('ws://'+config.childNodes.host + '/childNodeFileRelay',{
onMessage: () => {}
})
fileTransferConnection.on('open', function(){
fileTransferConnection.send(JSON.stringify({
fileType: 'video',
options: options,
socketKey: config.childNodes.key,
connectionId: s.childNodeIdOnMasterNode,
}))
setTimeout(() => {
fs.createReadStream(filePath,{ highWaterMark: 500 })
.on('data',function(data){
console.log('Send Video Chunk',data.length)
fileTransferConnection.send(data)
})
.on('close',function(){
// clearTimeout(s.group[e.ke].activeMonitors[e.id].recordingChecker)
// clearTimeout(s.group[e.ke].activeMonitors[e.id].streamChecker)
// s.cx(Object.assign({},options,{
// f:'created_file',
// }))
fileTransferConnection.close()
resolve(response)
})
},2000)
})
})
}
return {
onDataFromMasterNode,
initiateConnectionToMasterNode,
onDisconnectFromMasterNode,
destroyAllMonitorProcesses,
sendCurrentCpuUsage,
sendVideoToMasterNode,
}
}

View File

@ -6,7 +6,7 @@ module.exports = function(s,config,lang,app,io){
req.headers["'x-forwarded-for"] ||
req.connection.remoteAddress).replace('::ffff:','');
}
function initiateDataConnection(client,req,options){
function initiateDataConnection(client,req,options,connectionId){
const ipAddress = getIpAddress(req) + ':' + options.port
client.ip = ipAddress;
client.shinobiChildAlreadyRegistered = true;
@ -27,16 +27,14 @@ module.exports = function(s,config,lang,app,io){
})
client.sendJson({
f : 'init_success',
childNodes : s.childNodes
childNodes : s.childNodes,
connectionId: connectionId,
})
activeNode.coreCount = options.coreCount
console.log('Initiated Child Node : ', ipAddress)
return ipAddress
}
function initiateVideoTransferConnection(){
}
function onWebSocketData(client,data){
function onWebSocketDataFromChildNode(client,data){
const activeMonitor = data.ke && data.mid && s.group[data.ke] ? s.group[data.ke].activeMonitors[data.mid] : null;
const ipAddress = client.ip;
switch(data.f){
@ -96,58 +94,6 @@ module.exports = function(s,config,lang,app,io){
ke: data.ke
},data.queryInfo)
break;
case'created_file_chunk':
if(!activeMonitor.childNodeStreamWriters[data.filename]){
data.dir = s.getVideoDirectory(s.group[data.ke].rawMonitorConfigurations[data.mid])
if (!fs.existsSync(data.dir)) {
fs.mkdirSync(data.dir, {recursive: true}, (err) => {s.debugLog(err)})
}
activeMonitor.childNodeStreamWriters[data.filename] = fs.createWriteStream(data.dir+data.filename)
}
activeMonitor.childNodeStreamWriters[data.filename].write(data.chunk)
break;
case'created_file':
if(!activeMonitor.childNodeStreamWriters[data.filename]){
return console.log('FILE NOT EXIST')
}
activeMonitor.childNodeStreamWriters[data.filename].end();
client.sendJson({
f:'delete',
file:data.filename,
ke:data.ke,
mid:data.mid
})
s.txWithSubPermissions({
f:'video_build_success',
hrefNoAuth:'/videos/'+data.ke+'/'+data.mid+'/'+data.filename,
filename:data.filename,
mid:data.mid,
ke:data.ke,
time:data.time,
size:data.filesize,
end:data.end
},'GRP_'+data.ke,'video_view')
//save database row
var insert = {
startTime : data.time,
filesize : data.filesize,
endTime : data.end,
dir : s.getVideoDirectory(data.d),
file : data.filename,
filename : data.filename,
filesizeMB : parseFloat((data.filesize/1048576).toFixed(2))
}
s.insertDatabaseRow(data.d,insert)
s.insertCompletedVideoExtensions.forEach(function(extender){
extender(data.d,insert)
})
//purge over max
s.purgeDiskForGroup(data.ke)
//send new diskUsage values
s.setDiskUsedForGroup(data.ke,insert.filesizeMB)
clearTimeout(activeMonitor.recordingChecker)
clearTimeout(activeMonitor.streamChecker)
break;
}
}
function onDataConnectionDisconnect(client, req){
@ -179,10 +125,77 @@ module.exports = function(s,config,lang,app,io){
s.childNodes[ipAddress].dead = true
}
}
function initiateVideoWriteFromChildNode(client,data,connectionId){
const response = {ok: true}
return new Promise((resolve,reject) => {
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const monitorConfig = s.group[groupKey].rawMonitorConfigurations[monitorId]
const fileWritePath = s.getVideoDirectory(monitorConfig) + filename
const writeStream = fs.createWriteStream(fileWritePath)
const videoDirectory = s.getVideoDirectory(monitorConfig)
if (!fs.existsSync(videoDirectory)) {
fs.mkdirSync(videoDirectory, {recursive: true}, (err) => {s.debugLog(err)})
}
activeMonitor.childNodeStreamWriters[filename] = writeStream
client.on('message',(d) => {
writeStream.write(d)
})
client.on('close',(d) => {
setTimeout(() => {
if(!activeMonitor.childNodeStreamWriters[filename]){
return console.log('FILE NOT EXIST')
}
activeMonitor.childNodeStreamWriters[filename].end();
//delete video file from child node
s.cx({
f: 'delete',
file: filename,
ke: data.ke,
mid: data.mid
},connectionId)
//
s.txWithSubPermissions({
f:'video_build_success',
hrefNoAuth:'/videos/'+data.ke+'/'+data.mid+'/'+filename,
filename:filename,
mid:data.mid,
ke:data.ke,
time:data.time,
size:data.filesize,
end:data.end
},'GRP_'+data.ke,'video_view')
//save database row
var insert = {
startTime : data.time,
filesize : data.filesize,
endTime : data.end,
dir : s.getVideoDirectory(data.d),
file : filename,
filename : filename,
filesizeMB : parseFloat((data.filesize/1048576).toFixed(2))
}
s.insertDatabaseRow(data.d,insert)
s.insertCompletedVideoExtensions.forEach(function(extender){
extender(data.d,insert)
})
//purge over max
s.purgeDiskForGroup(data.ke)
//send new diskUsage values
s.setDiskUsedForGroup(data.ke,insert.filesizeMB)
clearTimeout(activeMonitor.recordingChecker)
clearTimeout(activeMonitor.streamChecker)
resolve(response)
},2000)
})
})
}
return {
initiateDataConnection,
initiateVideoTransferConnection,
onWebSocketData,
onWebSocketDataFromChildNode,
onDataConnectionDisconnect,
initiateVideoWriteFromChildNode,
}
}

View File

@ -2,6 +2,9 @@ var fs = require('fs');
var exec = require('child_process').exec;
var spawn = require('child_process').spawn;
module.exports = function(s,config,lang){
const {
sendVideoToMasterNode,
} = require('./childNode/childUtils.js')(s,config,lang)
/**
* Gets the video directory of the supplied video or monitor database row.
* @constructor
@ -145,20 +148,8 @@ module.exports = function(s,config,lang){
end: s.timeObject(k.endTime).format('YYYY-MM-DD HH:mm:ss')
}
if(config.childNodes.enabled === true && config.childNodes.mode === 'child' && config.childNodes.host){
fs.createReadStream(k.dir+k.filename,{ highWaterMark: 500 })
.on('data',function(data){
s.cx(Object.assign(response,{
f:'created_file_chunk',
chunk: data,
}))
})
.on('close',function(){
clearTimeout(s.group[e.ke].activeMonitors[e.id].recordingChecker)
clearTimeout(s.group[e.ke].activeMonitors[e.id].streamChecker)
s.cx(Object.assign(response,{
f:'created_file',
}))
})
var filePath = k.dir + k.filename;
sendVideoToMasterNode(filePath,response)
}else{
var href = '/videos/'+e.ke+'/'+e.mid+'/'+k.filename
if(config.useUTC === true)href += '?isUTC=true';