Shinobi/libs/childNode/utils.js

359 lines
15 KiB
JavaScript

const fs = require('fs');
module.exports = function(s,config,lang,app,io){
const masterDoWorkToo = config.childNodes.masterDoWorkToo;
const maxCpuPercent = config.childNodes.maxCpuPercent || 75;
const maxRamPercent = config.childNodes.maxRamPercent || 75;
function getIpAddress(req){
return (req.headers['cf-connecting-ip'] ||
req.headers["CF-Connecting-IP"] ||
req.headers["'x-forwarded-for"] ||
req.connection.remoteAddress).replace('::ffff:','');
}
function initiateDataConnection(client,req,options,connectionId){
const ipAddress = getIpAddress(req)
const webAddress = ipAddress + ':' + options.port
client.ip = webAddress;
client.shinobiChildAlreadyRegistered = true;
client.sendJson = (data) => {
client.send(JSON.stringify(data))
}
if(!s.childNodes[webAddress]){
s.childNodes[webAddress] = {}
};
const activeNode = s.childNodes[webAddress];
activeNode.dead = false
activeNode.cnid = client.id
activeNode.cpu = 0
activeNode.ip = webAddress
activeNode.activeCameras = {}
activeNode.platform = options.platform
activeNode.coreCount = options.coreCount
activeNode.totalmem = options.totalmem
options.availableHWAccels.forEach(function(accel){
if(config.availableHWAccels.indexOf(accel) === -1)config.availableHWAccels.push(accel)
})
client.sendJson({
f : 'init_success',
childNodes : s.childNodes,
connectionId: connectionId,
})
s.debugLog('Authenticated Child Node!',new Date(),webAddress)
return webAddress
}
function onWebSocketDataFromChildNode(client,data){
const activeMonitor = data.ke && data.mid && s.group[data.ke] ? s.group[data.ke].activeMonitors[data.mid] : null;
const webAddress = client.ip;
switch(data.f){
case'cpu':
s.childNodes[webAddress].cpuUsed = data.used;
s.childNodes[webAddress].cpuPercent = data.percent;
break;
case'ram':
s.childNodes[webAddress].ramUsed = data.used;
s.childNodes[webAddress].ramPercent = data.percent;
break;
case'sql':
s.sqlQuery(data.query,data.values,function(err,rows){
client.sendJson({f:'sqlCallback',rows:rows,err:err,callbackId:data.callbackId});
});
break;
case'knex':
s.knexQuery(data.options,function(err,rows){
client.sendJson({f:'sqlCallback',rows:rows,err:err,callbackId:data.callbackId});
});
break;
case'clearCameraFromActiveList':
if(s.childNodes[webAddress])delete(s.childNodes[webAddress].activeCameras[data.ke + data.id])
break;
case'camera':
s.camera(data.mode,data.data)
break;
case's.tx':
s.tx(data.data,data.to)
break;
case's.userLog':
if(!data.mon || !data.data)return console.log('LOG DROPPED',data.mon,data.data);
s.userLog(data.mon,data.data)
break;
}
}
function onDataConnectionDisconnect(client, req){
const webAddress = client.ip;
console.log('childNodeWebsocket.disconnect',webAddress)
if(s.childNodes[webAddress]){
var monitors = Object.values(s.childNodes[webAddress].activeCameras)
if(monitors && monitors[0]){
var loadCompleted = 0
var loadMonitor = function(monitor){
setTimeout(function(){
var mode = monitor.mode + ''
var cleanMonitor = s.cleanMonitorObject(monitor)
s.camera('stop',Object.assign(cleanMonitor,{}))
delete(s.group[monitor.ke].activeMonitors[monitor.mid].childNode)
delete(s.group[monitor.ke].activeMonitors[monitor.mid].childNodeId)
setTimeout(function(){
s.camera(mode,cleanMonitor)
++loadCompleted
if(monitors[loadCompleted]){
loadMonitor(monitors[loadCompleted])
}
},1000)
},2000)
}
loadMonitor(monitors[loadCompleted])
}
s.childNodes[webAddress].activeCameras = {}
s.childNodes[webAddress].dead = true
}
}
function initiateFileWriteFromChildNode(client,data,connectionId,onFinish){
const response = {ok: true}
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const writeDirectory = data.writeDirectory
const fileWritePath = writeDirectory + filename
const writeStream = fs.createWriteStream(fileWritePath)
if (!fs.existsSync(writeDirectory)) {
fs.mkdirSync(writeDirectory, {recursive: true}, (err) => {s.debugLog(err)})
}
activeMonitor.childNodeStreamWriters[filename] = writeStream
client.on('message',(d) => {
writeStream.write(d)
})
client.on('close',(d) => {
setTimeout(() => {
// response.fileWritePath = fileWritePath
// response.writeData = data
// response.childNodeId = connectionId
try{
activeMonitor.childNodeStreamWriters[filename].end();
}catch(err){
}
setTimeout(() => {
delete(activeMonitor.childNodeStreamWriters[filename])
},100)
onFinish(response)
},2000)
})
}
function initiateVideoWriteFromChildNode(client,data,connectionId){
return new Promise((resolve,reject) => {
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const monitorConfig = s.group[groupKey].rawMonitorConfigurations[monitorId]
const videoDirectory = s.getVideoDirectory(monitorConfig)
data.writeDirectory = videoDirectory
initiateFileWriteFromChildNode(client,data,connectionId,(response) => {
//delete video file from child node
s.cx({
f: 'delete',
file: filename,
ke: data.ke,
mid: data.mid
},connectionId)
//
s.txWithSubPermissions({
f:'video_build_success',
hrefNoAuth:'/videos/'+data.ke+'/'+data.mid+'/'+filename,
filename:filename,
mid:data.mid,
ke:data.ke,
time:data.time,
size:data.filesize,
end:data.end
},'GRP_'+data.ke,'video_view')
//save database row
var insert = {
startTime : data.time,
filesize : data.filesize,
endTime : data.end,
dir : videoDirectory,
file : filename,
filename : filename,
filesizeMB : parseFloat((data.filesize/1048576).toFixed(2))
}
s.insertDatabaseRow(monitorConfig,insert)
s.insertCompletedVideoExtensions.forEach(function(extender){
extender(monitorConfig,insert)
})
//purge over max
s.purgeDiskForGroup(data.ke)
//send new diskUsage values
s.setDiskUsedForGroup(data.ke,insert.filesizeMB)
clearTimeout(activeMonitor.recordingChecker)
clearTimeout(activeMonitor.streamChecker)
resolve(response)
})
})
}
function initiateTimelapseFrameWriteFromChildNode(client,data,connectionId){
return new Promise((resolve,reject) => {
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const currentDate = data.currentDate
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const monitorConfig = s.group[groupKey].rawMonitorConfigurations[monitorId]
const timelapseFrameDirectory = s.getTimelapseFrameDirectory(monitorConfig) + currentDate + `/`
const fileWritePath = timelapseFrameDirectory + filename
const writeStream = fs.createWriteStream(fileWritePath)
data.writeDirectory = timelapseFrameDirectory
initiateFileWriteFromChildNode(client,data,connectionId,(response) => {
s.cx({
f: 'deleteTimelapseFrame',
file: filename,
currentDate: currentDate,
ke: groupKey,
mid: monitorId
},connectionId)
s.insertTimelapseFrameDatabaseRow({
ke: groupKey
},data.queryInfo)
resolve(response)
})
})
}
function getActiveCameraCount(){
let theCount = 0
Object.keys(s.group).forEach(function(groupKey){
const theGroup = s.group[groupKey]
Object.keys(theGroup.activeMonitors).forEach(function(groupKey){
const activeMonitor = theGroup.activeMonitors[monitorId]
if(
// watching
activeMonitor.statusCode === 2 ||
// recording
activeMonitor.statusCode === 3 ||
// starting
activeMonitor.statusCode === 1 ||
// started
activeMonitor.statusCode === 9
//// Idle, in memory
// activeMonitor.statusCode === 6
){
++theCount
}
})
})
return theCount
}
function bindMonitorToChildNode(options){
const groupKey = options.ke
const monitorId = options.mid
const childNodeSelected = options.childNodeId
const theChildNode = s.childNodes[childNodeSelected]
const activeMonitor = s.group[groupKey].activeMonitors[monitorId];
const monitorConfig = Object.assign({},s.group[groupKey].rawMonitorConfigurations[monitorId])
theChildNode.activeCameras[groupKey + monitorId] = monitorConfig;
activeMonitor.childNode = childNodeSelected
activeMonitor.childNodeId = theChildNode.cnid;
}
function getNodeWithHighestCpuAndRamUse(){
var nodeWithLowestCpuUse = 0
var nodeWithLowestRamUse = 0
const childNodeList = Object.keys(s.childNodes)
childNodeList.forEach(function(webAddress){
const theChildNode = s.childNodes[webAddress]
if(
theChildNode.cpuUsed > nodeWithLowestCpuUse &&
theChildNode.ramUsed > nodeWithLowestRamUse
){
nodeWithLowestCpuUse = theChildNode.cpuUsed + 0.2
nodeWithLowestRamUse = theChildNode.ramUsed + 50
}
})
return {
nodeWithLowestCpuUse,
nodeWithLowestRamUse,
}
}
async function selectNodeForOperation(options){
const groupKey = options.ke
const monitorId = options.mid
const childNodeList = Object.keys(s.childNodes)
if(childNodeList.length > 0){
let childNodeFound = false
let childNodeSelected = null;
var nodeWithLowestActiveCamerasCount = 65535
var nodeWithLowestActiveCameras = null
let nodeWithLowestCpuPercent = 100
let nodeWithLowestRamPercent = 100
let {
nodeWithLowestCpuUse,
nodeWithLowestRamUse,
} = getNodeWithHighestCpuAndRamUse();
childNodeList.forEach(function(webAddress){
const theChildNode = s.childNodes[webAddress]
delete(theChildNode.activeCameras[groupKey + monitorId])
const nodeCameraCount = Object.keys(theChildNode.activeCameras).length
if(
// child node is connected and available
!theChildNode.dead &&
// // look for child node with least number of running cameras
// nodeCameraCount < nodeWithLowestActiveCamerasCount &&
// look for child node with CPU usage below 75% (default)
theChildNode.cpuUsed < nodeWithLowestCpuUse &&
theChildNode.cpuPercent < maxCpuPercent &&
theChildNode.cpuPercent < nodeWithLowestCpuPercent &&
// look for child node with RAM usage below 75% (default)
theChildNode.ramUsed < nodeWithLowestRamUse &&
theChildNode.ramPercent < maxRamPercent &&
theChildNode.ramPercent < nodeWithLowestRamPercent
){
// nodeWithLowestActiveCamerasCount = nodeCameraCount
childNodeSelected = `${webAddress}`
nodeWithLowestCpuUse = theChildNode.cpuUsed
nodeWithLowestCpuPercent = theChildNode.cpuPercent
nodeWithLowestRamUse = theChildNode.ramUsed
nodeWithLowestRamPercent = theChildNode.ramPercent
}
})
if(childNodeSelected && masterDoWorkToo){
// const nodeCameraCount = getActiveCameraCount()
const masterNodeHw = await getHwUsage();
if(
// nodeCameraCount < nodeWithLowestActiveCamerasCount &&
masterNodeHw.cpuUsed < nodeWithLowestCpuUse &&
masterNodeHw.cpuPercent < maxCpuPercent &&
masterNodeHw.cpuPercent < nodeWithLowestCpuPercent &&
// look for child node with RAM usage below 75% (default)
masterNodeHw.ramUsed < nodeWithLowestRamUse &&
masterNodeHw.ramPercent < maxRamPercent &&
masterNodeHw.ramPercent < nodeWithLowestRamPercent
){
// nodeWithLowestActiveCamerasCount = nodeCameraCount
// release child node selection and use master node
childNodeSelected = null
}
}
return childNodeSelected;
}
}
async function getHwUsage(){
const percent = await s.cpuUsage();
const use = s.coreCount * (percent / 100)
const ram = await s.ramUsage()
return {
ramUsed: ram.used,
ramPercent: ram.percent,
cpuUsed: use,
cpuPercent: percent,
}
}
return {
getIpAddress,
initiateDataConnection,
onWebSocketDataFromChildNode,
onDataConnectionDisconnect,
initiateVideoWriteFromChildNode,
initiateTimelapseFrameWriteFromChildNode,
selectNodeForOperation,
bindMonitorToChildNode,
}
}