Start modifying Child Node connectivity to use Data Port methodology

montage-api
Moe 2021-11-24 23:59:13 -08:00
parent 9784a853ed
commit d908ff22d2
5 changed files with 321 additions and 237 deletions

View File

@ -1,5 +1,5 @@
const WebSocket = require('cws');
function createWebSocketServer(options){
const WebSocket = require('cws');
const theWebSocket = new WebSocket.Server(options ? options : {
noServer: true
});
@ -14,6 +14,19 @@ function createWebSocketServer(options){
};
return theWebSocket
}
function createWebSocketClient(connectionHost,options){
const clientConnection = WebSocket(connectionHost, options.engineOptions);
if(options.onMessage){
const onMessage = options.onMessage;
clientConnection.on('message', message => {
const data = JSON.parse(message);
onMessage(received);
});
}
return clientConnection
}
module.exports = {
createWebSocketServer,
createWebSocketClient,
}

View File

@ -1,213 +1,89 @@
var fs = require('fs');
var http = require('http');
var https = require('https');
var express = require('express');
const fs = require('fs');
const http = require('http');
const https = require('https');
const express = require('express');
const { createWebSocketServer, createWebSocketClient } = require('./basic/websocketTools.js')
module.exports = function(s,config,lang,app,io){
const { cameraDestroy } = require('./monitor/utils.js')(s,config,lang)
//setup Master for childNodes
if(config.childNodes.enabled === true && config.childNodes.mode === 'master'){
const {
initiateDataConnection,
initiateVideoTransferConnection,
onWebSocketData,
onDataConnectionDisconnect,
} = require('./childNode/utils.js')(s,config,lang,app,io)
const {
onDataFromMasterNode,
} = require('./childNode/childUtils.js')(s,config,lang,app,io)
s.childNodes = {};
var childNodeHTTP = express();
var childNodeServer = http.createServer(app);
var childNodeWebsocket = new (require('socket.io'))()
childNodeServer.listen(config.childNodes.port,config.bindip,function(){
console.log(lang.Shinobi+' - CHILD NODE PORT : '+config.childNodes.port);
const childNodesConnectionIndex = {};
const childNodeHTTP = express();
const childNodeServer = http.createServer(app);
const childNodeWebsocket = createWebSocketServer();
childNodeServer.on('upgrade', function upgrade(request, socket, head) {
const pathname = url.parse(request.url).pathname;
if (pathname === '/childNode') {
childNodeWebsocket.handleUpgrade(request, socket, head, function done(ws) {
childNodeWebsocket.emit('connection', ws, request)
})
} else if (pathname.indexOf('/socket.io') > -1) {
return;
} else {
socket.destroy();
}
});
const childNodeBindIP = config.childNodes.ip || config.bindip;
childNodeServer.listen(config.childNodes.port,childNodeBindIP,function(){
console.log(lang.Shinobi+' - CHILD NODE SERVER : '+childNodeBindIP + ':' + config.childNodes.port);
});
s.debugLog('childNodeWebsocket.attach(childNodeServer)')
childNodeWebsocket.attach(childNodeServer,{
path:'/socket.io',
transports: ['websocket']
});
//send data to child node function (experimental)
s.cx = function(z,y,x){
s.cx = function(data,connectionId,x){
if(!z.mid && !z.d){
console.error('Missing ID')
}else if(x){
x.broadcast.to(y).emit('c',z)
childNodesConnectionIndex[y].sendJson(data)
}else{
childNodeWebsocket.to(y).emit('c',z)
}
}
//child Node Websocket
childNodeWebsocket.on('connection', function (cn) {
childNodeWebsocket.on('connection', function (client, req) {
//functions for dispersing work to child servers;
var ipAddress
cn.on('c',function(d){
if(config.childNodes.key.indexOf(d.socketKey) > -1){
if(!cn.shinobi_child&&d.f=='init'){
ipAddress = cn.request.connection.remoteAddress.replace('::ffff:','')+':'+d.port
cn.ip = ipAddress
cn.shinobi_child = 1
cn.tx = function(z){
cn.emit('c',z)
}
if(!s.childNodes[cn.ip]){
s.childNodes[cn.ip] = {}
};
s.childNodes[cn.ip].dead = false
s.childNodes[cn.ip].cnid = cn.id
s.childNodes[cn.ip].cpu = 0
s.childNodes[cn.ip].ip = ipAddress
s.childNodes[cn.ip].activeCameras = {}
d.availableHWAccels.forEach(function(accel){
if(config.availableHWAccels.indexOf(accel) === -1)config.availableHWAccels.push(accel)
})
cn.tx({
f : 'init_success',
childNodes : s.childNodes
})
s.childNodes[cn.ip].coreCount = d.coreCount
}else{
switch(d.f){
case'cpu':
s.childNodes[ipAddress].cpu = d.cpu;
break;
case'sql':
s.sqlQuery(d.query,d.values,function(err,rows){
cn.emit('c',{f:'sqlCallback',rows:rows,err:err,callbackId:d.callbackId});
});
break;
case'knex':
s.knexQuery(d.options,function(err,rows){
cn.emit('c',{f:'sqlCallback',rows:rows,err:err,callbackId:d.callbackId});
});
break;
case'clearCameraFromActiveList':
if(s.childNodes[ipAddress])delete(s.childNodes[ipAddress].activeCameras[d.ke + d.id])
break;
case'camera':
s.camera(d.mode,d.data)
break;
case's.tx':
s.tx(d.data,d.to)
break;
case's.userLog':
if(!d.mon || !d.data)return console.log('LOG DROPPED',d.mon,d.data);
s.userLog(d.mon,d.data)
break;
case'open_timelapse_file_transfer':
var location = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/`
if(!fs.existsSync(location)){
fs.mkdirSync(location)
}
break;
case'created_timelapse_file_chunk':
if(!s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename]){
var dir = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/`
s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename] = fs.createWriteStream(dir+d.filename)
}
s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename].write(d.chunk)
break;
case'created_timelapse_file':
if(!s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename]){
return console.log('FILE NOT EXIST')
}
s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename].end()
cn.tx({
f: 'deleteTimelapseFrame',
file: d.filename,
currentDate: d.currentDate,
d: d.d, //monitor config
ke: d.ke,
mid: d.mid
})
s.insertTimelapseFrameDatabaseRow({
ke: d.ke
},d.queryInfo)
break;
case'created_file_chunk':
if(!s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename]){
d.dir = s.getVideoDirectory(s.group[d.ke].rawMonitorConfigurations[d.mid])
if (!fs.existsSync(d.dir)) {
fs.mkdirSync(d.dir, {recursive: true}, (err) => {s.debugLog(err)})
}
s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename] = fs.createWriteStream(d.dir+d.filename)
}
s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename].write(d.chunk)
break;
case'created_file':
if(!s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename]){
return console.log('FILE NOT EXIST')
}
s.group[d.ke].activeMonitors[d.mid].childNodeStreamWriters[d.filename].end();
cn.tx({
f:'delete',
file:d.filename,
ke:d.ke,
mid:d.mid
})
s.txWithSubPermissions({
f:'video_build_success',
hrefNoAuth:'/videos/'+d.ke+'/'+d.mid+'/'+d.filename,
filename:d.filename,
mid:d.mid,
ke:d.ke,
time:d.time,
size:d.filesize,
end:d.end
},'GRP_'+d.ke,'video_view')
//save database row
var insert = {
startTime : d.time,
filesize : d.filesize,
endTime : d.end,
dir : s.getVideoDirectory(d.d),
file : d.filename,
filename : d.filename,
filesizeMB : parseFloat((d.filesize/1048576).toFixed(2))
}
s.insertDatabaseRow(d.d,insert)
s.insertCompletedVideoExtensions.forEach(function(extender){
extender(d.d,insert)
})
//purge over max
s.purgeDiskForGroup(d.ke)
//send new diskUsage values
s.setDiskUsedForGroup(d.ke,insert.filesizeMB)
clearTimeout(s.group[d.ke].activeMonitors[d.mid].recordingChecker)
clearTimeout(s.group[d.ke].activeMonitors[d.mid].streamChecker)
break;
}
}
}
})
cn.on('disconnect',function(){
console.log('childNodeWebsocket.disconnect',ipAddress)
if(s.childNodes[ipAddress]){
var monitors = Object.values(s.childNodes[ipAddress].activeCameras)
if(monitors && monitors[0]){
var loadCompleted = 0
var loadMonitor = function(monitor){
setTimeout(function(){
var mode = monitor.mode + ''
var cleanMonitor = s.cleanMonitorObject(monitor)
s.camera('stop',Object.assign(cleanMonitor,{}))
delete(s.group[monitor.ke].activeMonitors[monitor.mid].childNode)
delete(s.group[monitor.ke].activeMonitors[monitor.mid].childNodeId)
setTimeout(function(){
s.camera(mode,cleanMonitor)
++loadCompleted
if(monitors[loadCompleted]){
loadMonitor(monitors[loadCompleted])
}
},1000)
},2000)
}
loadMonitor(monitors[loadCompleted])
}
s.childNodes[ipAddress].activeCameras = {}
s.childNodes[ipAddress].dead = true
const connectionId = s.gid(10);
client.id = connectionId;
function onConnection(d){
const data = JSON.parse(d);
const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1;
if(!client.shinobiChildAlreadyRegistered && data.f === 'init' && childNodeKeyAccepted){
const connectionAddress = initiateDataConnection(client,req,data);
childNodesConnectionIndex[connectionId] = client;
client.removeListener('message',onConnection)
client.on('message',(d) => {
const data = JSON.parse(d);
onWebSocketData(client,data)
})
}else{
client.destroy()
}
}
client.on('message',onConnection)
client.on('disconnect',() => {
onDataConnectionDisconnect(client, req)
})
})
}else
//setup Child for childNodes
if(config.childNodes.enabled === true && config.childNodes.mode === 'child' && config.childNodes.host){
s.connected = false;
childIO = require('socket.io-client')('ws://'+config.childNodes.host,{
transports: ['websocket']
});
s.cx = function(x){x.socketKey = config.childNodes.key;childIO.emit('c',x)}
s.connectedToMasterNode = false;
const childIO = createWebSocketClient('ws://'+config.childNodes.host,{
onMessage: onDataFromMasterNode
})
s.cx = function(data){
x.socketKey = config.childNodes.key;
childIO.send(JSON.stringify(data));
}
s.tx = function(x,y){s.cx({f:'s.tx',data:x,to:y})}
s.userLog = function(x,y){s.cx({f:'s.userLog',mon:x,data:y})}
s.queuedSqlCallbacks = {}
@ -218,14 +94,12 @@ module.exports = function(s,config,lang,app,io){
var onMoveOn = values;
var values = [];
}
if(typeof onMoveOn !== 'function'){onMoveOn=function(){}}
s.queuedSqlCallbacks[callbackId] = onMoveOn
if(typeof onMoveOn === 'function')s.queuedSqlCallbacks[callbackId] = onMoveOn;
s.cx({f:'sql',query:query,values:values,callbackId:callbackId});
}
s.knexQuery = function(options,onMoveOn){
var callbackId = s.gid()
if(typeof onMoveOn !== 'function'){onMoveOn=function(){}}
s.queuedSqlCallbacks[callbackId] = onMoveOn
if(typeof onMoveOn === 'function')s.queuedSqlCallbacks[callbackId] = onMoveOn;
s.cx({f:'knex',options:options,callbackId:callbackId});
}
setInterval(async () => {
@ -244,49 +118,8 @@ module.exports = function(s,config,lang,app,io){
availableHWAccels : config.availableHWAccels
})
})
childIO.on('c', function (d) {
switch(d.f){
case'sqlCallback':
if(s.queuedSqlCallbacks[d.callbackId]){
s.queuedSqlCallbacks[d.callbackId](d.err,d.rows)
delete(s.queuedSqlCallbacks[d.callbackId])
}
break;
case'init_success':
s.connected=true;
s.other_helpers=d.child_helpers;
break;
case'kill':
s.initiateMonitorObject(d.d);
cameraDestroy(d.d)
var childNodeIp = s.group[d.d.ke].activeMonitors[d.d.id]
break;
case'sync':
s.initiateMonitorObject(d.sync);
Object.keys(d.sync).forEach(function(v){
s.group[d.sync.ke].activeMonitors[d.sync.mid][v]=d.sync[v];
});
break;
case'delete'://delete video
s.file('delete',s.dir.videos+d.ke+'/'+d.mid+'/'+d.file)
break;
case'deleteTimelapseFrame'://delete video
var filePath = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/` + d.file
s.file('delete',filePath)
break;
case'insertCompleted'://close video
s.insertCompletedVideo(d.d,d.k)
break;
case'cameraStop'://start camera
s.camera('stop',d.d)
break;
case'cameraStart'://start or record camera
s.camera(d.mode,d.d)
break;
}
})
childIO.on('disconnect',function(d){
s.connected = false;
s.connectedToMasterNode = false;
var groupKeys = Object.keys(s.group)
groupKeys.forEach(function(groupKey){
var activeMonitorKeys = Object.keys(s.group[groupKey].activeMonitors)

View File

@ -0,0 +1,48 @@
module.exports = function(s,config,lang,app,io){
const queuedSqlCallbacks = s.queuedSqlCallbacks;
function onDataFromMasterNode(d) {
switch(d.f){
case'sqlCallback':
const callbackId = d.callbackId;
if(queuedSqlCallbacks[callbackId]){
queuedSqlCallbacks[callbackId](d.err,d.rows)
delete(queuedSqlCallbacks[callbackId])
}
break;
case'init_success':
s.connected=true;
s.other_helpers=d.child_helpers;
break;
case'kill':
s.initiateMonitorObject(d.d);
cameraDestroy(d.d)
var childNodeIp = s.group[d.d.ke].activeMonitors[d.d.id]
break;
case'sync':
s.initiateMonitorObject(d.sync);
Object.keys(d.sync).forEach(function(v){
s.group[d.sync.ke].activeMonitors[d.sync.mid][v]=d.sync[v];
});
break;
case'delete'://delete video
s.file('delete',s.dir.videos+d.ke+'/'+d.mid+'/'+d.file)
break;
case'deleteTimelapseFrame'://delete video
var filePath = s.getTimelapseFrameDirectory(d.d) + `${d.currentDate}/` + d.file
s.file('delete',filePath)
break;
case'insertCompleted'://close video
s.insertCompletedVideo(d.d,d.k)
break;
case'cameraStop'://start camera
s.camera('stop',d.d)
break;
case'cameraStart'://start or record camera
s.camera(d.mode,d.d)
break;
}
}
return {
onDataFromMasterNode,
}
}

192
libs/childNode/utils.js Normal file
View File

@ -0,0 +1,192 @@
module.exports = function(s,config,lang,app,io){
function getIpAddress(req){
return (req.headers['cf-connecting-ip'] ||
req.headers["CF-Connecting-IP"] ||
req.headers["'x-forwarded-for"] ||
req.connection.remoteAddress).replace('::ffff:','');
}
function initiateDataConnection(client,req,options){
const ipAddress = getIpAddress(req) + ':' + options.port
client.ip = ipAddress;
client.shinobiChildAlreadyRegistered = true;
client.sendJson = (data) => {
const dataString = JSON.stringify(data);
client.send(dataString)
}
if(!s.childNodes[ipAddress]){
s.childNodes[ipAddress] = {}
};
const activeNode = s.childNodes[ipAddress];
activeNode.dead = false
activeNode.cnid = client.id
activeNode.cpu = 0
activeNode.ip = ipAddress
activeNode.activeCameras = {}
options.availableHWAccels.forEach(function(accel){
if(config.availableHWAccels.indexOf(accel) === -1)config.availableHWAccels.push(accel)
})
client.sendJson({
f : 'init_success',
childNodes : s.childNodes
})
activeNode.coreCount = options.coreCount
return {
ipAddress,
}
}
function initiateVideoTransferConnection(){
}
function onWebSocketData(client,data){
const ipAddress = client.ip;
switch(data.f){
case'cpu':
s.childNodes[ipAddress].cpu = data.cpu;
break;
case'sql':
s.sqlQuery(data.query,data.values,function(err,rows){
client.sendJson({f:'sqlCallback',rows:rows,err:err,callbackId:data.callbackId});
});
break;
case'knex':
s.knexQuery(data.options,function(err,rows){
client.sendJson({f:'sqlCallback',rows:rows,err:err,callbackId:data.callbackId});
});
break;
case'clearCameraFromActiveList':
if(s.childNodes[ipAddress])delete(s.childNodes[ipAddress].activeCameras[data.ke + data.id])
break;
case'camera':
s.camera(data.mode,data.data)
break;
case's.tx':
s.tx(data.data,data.to)
break;
case's.userLog':
if(!data.mon || !data.data)return console.log('LOG DROPPED',data.mon,data.data);
s.userLog(data.mon,data.data)
break;
case'open_timelapse_file_transfer':
var location = s.getTimelapseFrameDirectory(data.d) + `${data.currentDate}/`
if(!fs.existsSync(location)){
fs.mkdirSync(location)
}
break;
case'created_timelapse_file_chunk':
const activeMonitor = s.group[data.ke].activeMonitors[data.mid];
if(!activeMonitor.childNodeStreamWriters[data.filename]){
var dir = s.getTimelapseFrameDirectory(data.d) + `${data.currentDate}/`
activeMonitor.childNodeStreamWriters[data.filename] = fs.createWriteStream(dir+data.filename)
}
activeMonitor.childNodeStreamWriters[data.filename].write(data.chunk)
break;
case'created_timelapse_file':
const activeMonitor = s.group[data.ke].activeMonitors[data.mid];
if(!activeMonitor.childNodeStreamWriters[data.filename]){
return console.log('FILE NOT EXIST')
}
activeMonitor.childNodeStreamWriters[data.filename].end()
client.sendJson({
f: 'deleteTimelapseFrame',
file: data.filename,
currentDate: data.currentDate,
d: data.d, //monitor config
ke: data.ke,
mid: data.mid
})
s.insertTimelapseFrameDatabaseRow({
ke: data.ke
},data.queryInfo)
break;
case'created_file_chunk':
const activeMonitor = s.group[data.ke].activeMonitors[data.mid];
if(!activeMonitor.childNodeStreamWriters[data.filename]){
data.dir = s.getVideoDirectory(s.group[data.ke].rawMonitorConfigurations[data.mid])
if (!fs.existsSync(data.dir)) {
fs.mkdirSync(data.dir, {recursive: true}, (err) => {s.debugLog(err)})
}
activeMonitor.childNodeStreamWriters[data.filename] = fs.createWriteStream(data.dir+data.filename)
}
activeMonitor.childNodeStreamWriters[data.filename].write(data.chunk)
break;
case'created_file':
const activeMonitor = s.group[data.ke].activeMonitors[data.mid];
if(!activeMonitor.childNodeStreamWriters[data.filename]){
return console.log('FILE NOT EXIST')
}
activeMonitor.childNodeStreamWriters[data.filename].end();
client.sendJson({
f:'delete',
file:data.filename,
ke:data.ke,
mid:data.mid
})
s.txWithSubPermissions({
f:'video_build_success',
hrefNoAuth:'/videos/'+data.ke+'/'+data.mid+'/'+data.filename,
filename:data.filename,
mid:data.mid,
ke:data.ke,
time:data.time,
size:data.filesize,
end:data.end
},'GRP_'+data.ke,'video_view')
//save database row
var insert = {
startTime : data.time,
filesize : data.filesize,
endTime : data.end,
dir : s.getVideoDirectory(data.d),
file : data.filename,
filename : data.filename,
filesizeMB : parseFloat((data.filesize/1048576).toFixed(2))
}
s.insertDatabaseRow(data.d,insert)
s.insertCompletedVideoExtensions.forEach(function(extender){
extender(data.d,insert)
})
//purge over max
s.purgeDiskForGroup(data.ke)
//send new diskUsage values
s.setDiskUsedForGroup(data.ke,insert.filesizeMB)
clearTimeout(activeMonitor.recordingChecker)
clearTimeout(activeMonitor.streamChecker)
break;
}
}
function onDataConnectionDisconnect(client, req){
const ipAddress = client.ip;
console.log('childNodeWebsocket.disconnect',ipAddress)
if(s.childNodes[ipAddress]){
var monitors = Object.values(s.childNodes[ipAddress].activeCameras)
if(monitors && monitors[0]){
var loadCompleted = 0
var loadMonitor = function(monitor){
setTimeout(function(){
var mode = monitor.mode + ''
var cleanMonitor = s.cleanMonitorObject(monitor)
s.camera('stop',Object.assign(cleanMonitor,{}))
delete(s.group[monitor.ke].activeMonitors[monitor.mid].childNode)
delete(s.group[monitor.ke].activeMonitors[monitor.mid].childNodeId)
setTimeout(function(){
s.camera(mode,cleanMonitor)
++loadCompleted
if(monitors[loadCompleted]){
loadMonitor(monitors[loadCompleted])
}
},1000)
},2000)
}
loadMonitor(monitors[loadCompleted])
}
s.childNodes[ipAddress].activeCameras = {}
s.childNodes[ipAddress].dead = true
}
}
return {
initiateDataConnection,
initiateVideoTransferConnection,
onWebSocketData,
onDataConnectionDisconnect,
}
}

View File

@ -1,6 +1,4 @@
const {
createWebSocketServer,
} = require('./basic/websocketServer.js')
const { createWebSocketServer } = require('./basic/websocketTools.js')
module.exports = function(s,config,lang,app,io){
const {
triggerEvent,