Child Node use plain buffer data for timelapse frame transfer to Master Node

montage-api
Moe 2021-11-26 17:47:20 -08:00
parent a717084e7d
commit ffbfd57f27
5 changed files with 92 additions and 91 deletions

View File

@ -16,6 +16,7 @@ module.exports = function(s,config,lang,app,io){
onWebSocketDataFromChildNode,
onDataConnectionDisconnect,
initiateVideoWriteFromChildNode,
initiateTimelapseFrameWriteFromChildNode,
} = require('./childNode/utils.js')(s,config,lang,app,io)
s.childNodes = {};
const childNodesConnectionIndex = {};
@ -84,6 +85,9 @@ module.exports = function(s,config,lang,app,io){
case'video':
initiateVideoWriteFromChildNode(client,data.options,data.connectionId)
break;
case'timelapseFrame':
initiateTimelapseFrameWriteFromChildNode(client,data.options,data.connectionId)
break;
}
}else{
client.destroy()

View File

@ -79,7 +79,7 @@ module.exports = function(s,config,lang,app,io){
cpu: parseFloat(cpu)
})
}
function sendVideoToMasterNode(filePath,options){
function createFileTransferToMasterNode(filePath,transferInfo,fileType){
const response = {ok: true}
return new Promise((resolve,reject) => {
const fileTransferConnection = createWebSocketClient('ws://'+config.childNodes.host + '/childNodeFileRelay',{
@ -87,23 +87,17 @@ module.exports = function(s,config,lang,app,io){
})
fileTransferConnection.on('open', function(){
fileTransferConnection.send(JSON.stringify({
fileType: 'video',
options: options,
fileType: fileType || 'video',
options: transferInfo,
socketKey: config.childNodes.key,
connectionId: s.childNodeIdOnMasterNode,
}))
setTimeout(() => {
fs.createReadStream(filePath,{ highWaterMark: 500 })
.on('data',function(data){
console.log('Send Video Chunk',data.length)
fileTransferConnection.send(data)
})
.on('close',function(){
// clearTimeout(s.group[e.ke].activeMonitors[e.id].recordingChecker)
// clearTimeout(s.group[e.ke].activeMonitors[e.id].streamChecker)
// s.cx(Object.assign({},options,{
// f:'created_file',
// }))
fileTransferConnection.close()
resolve(response)
})
@ -111,6 +105,19 @@ module.exports = function(s,config,lang,app,io){
})
})
}
async function sendVideoToMasterNode(filePath,options){
const groupKey = options.ke
const monitorId = options.mid
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const response = await createFileTransferToMasterNode(filePath,options,'video');
clearTimeout(activeMonitor.recordingChecker);
clearTimeout(activeMonitor.streamChecker);
return response;
}
async function sendTimelapseFrameToMasterNode(filePath,options){
const response = await createFileTransferToMasterNode(filePath,options,'timelapseFrame');
return response;
}
return {
onDataFromMasterNode,
initiateConnectionToMasterNode,
@ -118,5 +125,6 @@ module.exports = function(s,config,lang,app,io){
destroyAllMonitorProcesses,
sendCurrentCpuUsage,
sendVideoToMasterNode,
sendTimelapseFrameToMasterNode,
}
}

View File

@ -64,36 +64,6 @@ module.exports = function(s,config,lang,app,io){
if(!data.mon || !data.data)return console.log('LOG DROPPED',data.mon,data.data);
s.userLog(data.mon,data.data)
break;
case'open_timelapse_file_transfer':
var location = s.getTimelapseFrameDirectory(data.d) + `${data.currentDate}/`
if(!fs.existsSync(location)){
fs.mkdirSync(location)
}
break;
case'created_timelapse_file_chunk':
if(!activeMonitor.childNodeStreamWriters[data.filename]){
var dir = s.getTimelapseFrameDirectory(data.d) + `${data.currentDate}/`
activeMonitor.childNodeStreamWriters[data.filename] = fs.createWriteStream(dir+data.filename)
}
activeMonitor.childNodeStreamWriters[data.filename].write(data.chunk)
break;
case'created_timelapse_file':
if(!activeMonitor.childNodeStreamWriters[data.filename]){
return console.log('FILE NOT EXIST')
}
activeMonitor.childNodeStreamWriters[data.filename].end()
client.sendJson({
f: 'deleteTimelapseFrame',
file: data.filename,
currentDate: data.currentDate,
d: data.d, //monitor config
ke: data.ke,
mid: data.mid
})
s.insertTimelapseFrameDatabaseRow({
ke: data.ke
},data.queryInfo)
break;
}
}
function onDataConnectionDisconnect(client, req){
@ -125,30 +95,46 @@ module.exports = function(s,config,lang,app,io){
s.childNodes[ipAddress].dead = true
}
}
function initiateVideoWriteFromChildNode(client,data,connectionId){
function initiateFileWriteFromChildNode(client,data,connectionId,onFinish){
const response = {ok: true}
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const writeDirectory = data.writeDirectory
const fileWritePath = writeDirectory + filename
const writeStream = fs.createWriteStream(fileWritePath)
if (!fs.existsSync(writeDirectory)) {
fs.mkdirSync(writeDirectory, {recursive: true}, (err) => {s.debugLog(err)})
}
activeMonitor.childNodeStreamWriters[filename] = writeStream
client.on('message',(d) => {
writeStream.write(d)
})
client.on('close',(d) => {
setTimeout(() => {
// response.fileWritePath = fileWritePath
// response.writeData = data
// response.childNodeId = connectionId
activeMonitor.childNodeStreamWriters[filename].end();
setTimeout(() => {
delete(activeMonitor.childNodeStreamWriters[filename])
},100)
onFinish(response)
},2000)
})
}
function initiateVideoWriteFromChildNode(client,data,connectionId){
return new Promise((resolve,reject) => {
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const monitorConfig = s.group[groupKey].rawMonitorConfigurations[monitorId]
const fileWritePath = s.getVideoDirectory(monitorConfig) + filename
const writeStream = fs.createWriteStream(fileWritePath)
const videoDirectory = s.getVideoDirectory(monitorConfig)
if (!fs.existsSync(videoDirectory)) {
fs.mkdirSync(videoDirectory, {recursive: true}, (err) => {s.debugLog(err)})
}
activeMonitor.childNodeStreamWriters[filename] = writeStream
client.on('message',(d) => {
writeStream.write(d)
})
client.on('close',(d) => {
data.writeDirectory = videoDirectory
initiateFileWriteFromChildNode(client,data,connectionId,(response) => {
setTimeout(() => {
if(!activeMonitor.childNodeStreamWriters[filename]){
return console.log('FILE NOT EXIST')
}
activeMonitor.childNodeStreamWriters[filename].end();
//delete video file from child node
s.cx({
f: 'delete',
@ -192,10 +178,40 @@ module.exports = function(s,config,lang,app,io){
})
})
}
function initiateTimelapseFrameWriteFromChildNode(client,data,connectionId){
return new Promise((resolve,reject) => {
const groupKey = data.ke
const monitorId = data.mid
const filename = data.filename
const currentDate = data.currentDate
const activeMonitor = s.group[groupKey].activeMonitors[monitorId]
const monitorConfig = s.group[groupKey].rawMonitorConfigurations[monitorId]
const timelapseFrameDirectory = s.getTimelapseFrameDirectory(monitorConfig) + currentDate + `/`
const fileWritePath = timelapseFrameDirectory + filename
const writeStream = fs.createWriteStream(fileWritePath)
data.writeDirectory = timelapseFrameDirectory
initiateFileWriteFromChildNode(client,data,connectionId,(response) => {
setTimeout(() => {
s.cx({
f: 'deleteTimelapseFrame',
file: filename,
currentDate: currentDate,
ke: groupKey,
mid: monitorId
},connectionId)
s.insertTimelapseFrameDatabaseRow({
ke: groupKey
},data.queryInfo)
resolve(response)
},2000)
})
})
}
return {
initiateDataConnection,
onWebSocketDataFromChildNode,
onDataConnectionDisconnect,
initiateVideoWriteFromChildNode,
initiateTimelapseFrameWriteFromChildNode,
}
}

View File

@ -2,6 +2,9 @@ var fs = require('fs')
var moment = require('moment')
var express = require('express')
module.exports = function(s,config,lang,app,io){
const {
sendTimelapseFrameToMasterNode,
} = require('./childNode/childUtils.js')(s,config,lang)
const timelapseFramesCache = {}
const timelapseFramesCacheTimeouts = {}
s.getTimelapseFrameDirectory = function(e){
@ -22,8 +25,8 @@ module.exports = function(s,config,lang,app,io){
if(e.details && e.details.dir && e.details.dir !== ''){
details.dir = e.details.dir
}
var timeNow = eventTime || new Date()
var queryInfo = {
const timeNow = eventTime || new Date()
const queryInfo = {
ke: e.ke,
mid: e.id,
details: s.s(details),
@ -32,45 +35,16 @@ module.exports = function(s,config,lang,app,io){
time: timeNow
}
if(config.childNodes.enabled === true && config.childNodes.mode === 'child' && config.childNodes.host){
var currentDate = s.formattedTime(queryInfo.time,'YYYY-MM-DD')
s.cx({
f: 'open_timelapse_file_transfer',
var currentDate = s.formattedTime(timeNow,'YYYY-MM-DD')
const childNodeData = {
ke: e.ke,
mid: e.id,
d: s.group[e.ke].rawMonitorConfigurations[e.id],
time: formattedTime,
filename: filename,
currentDate: currentDate,
queryInfo: queryInfo
})
var formattedTime = s.timeObject(timeNow).format()
fs.createReadStream(filePath,{ highWaterMark: 500 })
.on('data',function(data){
s.cx({
f: 'created_timelapse_file_chunk',
ke: e.ke,
mid: e.id,
time: formattedTime,
filesize: e.filesize,
chunk: data,
d: s.group[e.ke].rawMonitorConfigurations[e.id],
filename: filename,
currentDate: currentDate,
queryInfo: queryInfo
})
})
.on('close',function(){
s.cx({
f: 'created_timelapse_file',
ke: e.ke,
mid: e.id,
time: formattedTime,
filesize: e.filesize,
d: s.group[e.ke].rawMonitorConfigurations[e.id],
filename: filename,
currentDate: currentDate,
queryInfo: queryInfo
})
})
}
sendTimelapseFrameToMasterNode(filePath,childNodeData)
}else{
s.insertTimelapseFrameDatabaseRow(e,queryInfo,filePath)
}

View File

@ -142,7 +142,6 @@ module.exports = function(s,config,lang){
mid: e.mid,
ke: e.ke,
filename: k.filename,
d: s.cleanMonitorObject(s.group[e.ke].rawMonitorConfigurations[e.id]),
filesize: k.filesize,
time: s.timeObject(k.startTime).format('YYYY-MM-DD HH:mm:ss'),
end: s.timeObject(k.endTime).format('YYYY-MM-DD HH:mm:ss')