Make cron.js a worker process of camera.js

Old connection of cron.js to camera.js through socket.io has been disabled. Old cron.js file has been disabled. File will remain to keep old systems from having looping errors, the file will just post a notice.
cron-as-worker-process
Moe 2022-06-29 12:29:20 -07:00
parent 2680e0a9ea
commit e1d83e7c14
5 changed files with 681 additions and 665 deletions

View File

@ -89,6 +89,8 @@ require('./libs/ffmpeg.js')(s,config,lang, async () => {
require('./libs/onvifDeviceManager.js')(s,config,lang,app,io) require('./libs/onvifDeviceManager.js')(s,config,lang,app,io)
//alternate logins //alternate logins
require('./libs/auth/logins.js')(s,config,lang,app) require('./libs/auth/logins.js')(s,config,lang,app)
//cron
require('./libs/cron.js')(s,config,lang)
//on-start actions, daemon(s) starter //on-start actions, daemon(s) starter
await require('./libs/startup.js')(s,config,lang) await require('./libs/startup.js')(s,config,lang)
//p2p, commander //p2p, commander

613
cron.js
View File

@ -1,606 +1,9 @@
process.on('uncaughtException', function (err) { const { parentPort, isMainThread } = require('worker_threads');
console.error('uncaughtException',err); if(isMainThread){
}); console.log(`Shinobi now runs cron.js as a worker process from camera.js.`)
const fs = require('fs'); console.error(`Shinobi now runs cron.js as a worker process from camera.js.`)
const path = require('path'); setInterval(() => {
const moment = require('moment'); // console.log(`Please turn off cron.js process.`)
const exec = require('child_process').exec; },1000 * 60 * 60 * 24 * 7)
const spawn = require('child_process').spawn; return;
const config = require(process.cwd() + '/conf.json')
//set option defaults
s = {
mainDirectory: process.cwd(),
utcOffset: moment().utcOffset()
};
if(config.cron===undefined)config.cron={};
if(config.cron.deleteOld===undefined)config.cron.deleteOld=true;
if(config.cron.deleteOrphans===undefined)config.cron.deleteOrphans=false;
if(config.cron.deleteNoVideo===undefined)config.cron.deleteNoVideo=true;
if(config.cron.deleteNoVideoRecursion===undefined)config.cron.deleteNoVideoRecursion=false;
if(config.cron.deleteOverMax===undefined)config.cron.deleteOverMax=true;
if(config.cron.deleteLogs===undefined)config.cron.deleteLogs=true;
if(config.cron.deleteTimelpaseFrames===undefined)config.cron.deleteTimelpaseFrames=true;
if(config.cron.deleteEvents===undefined)config.cron.deleteEvents=true;
if(config.cron.deleteFileBins===undefined)config.cron.deleteFileBins=true;
if(config.cron.interval===undefined)config.cron.interval=1;
if(config.databaseType===undefined){config.databaseType='mysql'}
if(config.databaseLogs===undefined){config.databaseLogs=false}
if(config.useUTC===undefined){config.useUTC=false}
if(config.debugLog===undefined){config.debugLog=false}
if(!config.ip||config.ip===''||config.ip.indexOf('0.0.0.0')>-1)config.ip='localhost';
if(!config.videosDir)config.videosDir = s.mainDirectory + '/videos/';
if(!config.binDir){config.binDir = s.mainDirectory + '/fileBin/'}
const {
checkCorrectPathEnding,
generateRandomId,
formattedTime,
localToUtc,
} = require('./libs/basic/utils.js')(s.mainDirectory)
const {
sqlDate,
knexQuery,
knexQueryPromise,
initiateDatabaseEngine
} = require('./libs/sql/utils.js')(s,config)
var theCronInterval = null
const overlapLocks = {}
const alreadyDeletedRowsWithNoVideosOnStart = {}
const videoDirectory = checkCorrectPathEnding(config.videosDir)
const fileBinDirectory = checkCorrectPathEnding(config.binDir)
s.debugLog = function(arg1,arg2){
if(config.debugLog === true){
if(!arg2)arg2 = ''
console.log(arg1,arg2)
}
} }
const connectToMainProcess = () => {
const io = require('socket.io-client')('ws://'+config.ip+':'+config.port,{
transports:['websocket']
});
io.on('connect',function(d){
postMessage({
f: 'init',
time: moment()
})
})
io.on('f',function(d){
//command from main process
switch(d.f){
case'start':case'restart':
setIntervalForCron()
break;
case'stop':
clearCronInterval()
break;
}
})
return io
}
const postMessage = (x) => {
x.cronKey = config.cron.key;
return io.emit('cron',x)
}
const sendToWebSocket = (x,y) => {
//emulate master socket emitter
postMessage({f:'s.tx',data:x,to:y})
}
const deleteVideo = (x) => {
postMessage({f:'s.deleteVideo',file:x})
}
const deleteFileBinEntry = (x) => {
postMessage({f:'s.deleteFileBinEntry',file:x})
}
const setDiskUsedForGroup = (groupKey,size,target,videoRow) => {
postMessage({f:'s.setDiskUsedForGroup', ke: groupKey, size: size, target: target, videoRow: videoRow})
}
const getVideoDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid};
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'/'
}else{
return videoDirectory + e.ke + '/' + e.id + '/'
}
}
const getTimelapseFrameDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid}
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details&&e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'_timelapse/'
}else{
return videoDirectory + e.ke + '/' + e.id + '_timelapse/'
}
}
const getFileBinDirectory = function(e){
if(e.mid && !e.id){e.id = e.mid}
return fileBinDirectory + e.ke + '/' + e.id + '/'
}
//filters set by the user in their dashboard
//deleting old videos is part of the filter - config.cron.deleteOld
const checkFilterRules = function(v){
return new Promise((resolve,reject) => {
//filters
v.d.filters = v.d.filters ? v.d.filters : {}
s.debugLog('Checking Basic Filters...')
var keys = Object.keys(v.d.filters)
if(keys.length>0){
keys.forEach(function(m,current){
// b = filter
var b = v.d.filters[m];
s.debugLog(b)
if(b.enabled==="1"){
const whereQuery = [
['ke','=',v.ke],
['status','!=',"0"],
['details','NOT LIKE','%"archived":"1"%'],
]
b.where.forEach(function(condition){
if(condition.p1 === 'ke'){condition.p3 = v.ke}
whereQuery.push([condition.p1,condition.p2 || '=',condition.p3])
})
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery,
orderBy: [b.sort_by,b.sort_by_direction.toLowerCase()],
limit: b.limit
},(err,r) => {
if(r && r[0]){
if(r.length > 0 || config.debugLog === true){
postMessage({f:'filterMatch',msg:r.length+' SQL rows match "'+m+'"',ke:v.ke,time:moment()})
}
b.cx={
f:'filters',
name:b.name,
videos:r,
time:moment(),
ke:v.ke,
id:b.id
};
if(b.archive==="1"){
postMessage({f:'filters',ff:'archive',videos:r,time:moment(),ke:v.ke,id:b.id});
}else if(b.delete==="1"){
postMessage({f:'filters',ff:'delete',videos:r,time:moment(),ke:v.ke,id:b.id});
}
if(b.email==="1"){
b.cx.ff='email';
b.cx.delete=b.delete;
b.cx.mail=v.mail;
b.cx.execute=b.execute;
b.cx.query=b.sql;
postMessage(b.cx);
}
if(b.execute&&b.execute!==""){
postMessage({f:'filters',ff:'execute',execute:b.execute,time:moment()});
}
}
})
}
if(current===keys.length-1){
//last filter
resolve()
}
})
}else{
//no filters
resolve()
}
})
}
const deleteVideosByDays = async (v,days,addedQueries) => {
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(days+' DAY')],
addedQueries
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getVideoDirectory(row)
const filename = formattedTime(row.time) + '.' + row.ext
try{
await fs.promises.unlink(dir + filename)
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'video_delete',
filename: filename + '.' + row.ext,
mid: row.mid,
ke: row.ke,
time: row.time,
end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')
},'GRP_' + row.ke)
}catch(err){
console.log('Video Delete Error',row)
console.log(err)
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Videos",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
const deleteOldVideos = async (v) => {
// v = group, admin user
if(config.cron.deleteOld === true){
const daysOldForDeletion = v.d.days && !isNaN(v.d.days) ? parseFloat(v.d.days) : 5
const monitorsIgnored = []
const monitorsResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Monitors",
where: [
['ke','=',v.ke],
]
})
const monitorRows = monitorsResponse.rows
var i;
for (i = 0; i < monitorRows.length; i++) {
const monitor = monitorRows[i]
const monitorId = monitor.mid
const details = JSON.parse(monitor.details);
const monitorsMaxDaysToKeep = !isNaN(details.max_keep_days) ? parseFloat(details.max_keep_days) : null
if(monitorsMaxDaysToKeep){
const { affectedRows } = await deleteVideosByDays(v,monitorsMaxDaysToKeep,['mid','=',monitorId])
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideosByMonitorId',
msg: `${affectedRows} SQL rows older than ${monitorsMaxDaysToKeep} days deleted`,
ke: v.ke,
mid: monitorId,
time: moment(),
})
}
monitorsIgnored.push(['mid','!=',monitorId])
}
}
const { affectedRows } = await deleteVideosByDays(v,daysOldForDeletion,monitorsIgnored)
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideos',
msg: `${affectedRows} SQL rows older than ${daysOldForDeletion} days deleted`,
ke: v.ke,
time: moment(),
})
}
}
}
//database rows with no videos in the filesystem
const deleteRowsWithNoVideo = function(v){
return new Promise((resolve,reject) => {
if(
config.cron.deleteNoVideo===true&&(
config.cron.deleteNoVideoRecursion===true||
(config.cron.deleteNoVideoRecursion===false&&!alreadyDeletedRowsWithNoVideosOnStart[v.ke])
)
){
alreadyDeletedRowsWithNoVideosOnStart[v.ke]=true;
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: [
['ke','=',v.ke],
['status','!=','0'],
['details','NOT LIKE','%"archived":"1"%'],
['time','<', sqlDate('10 MINUTE')],
]
},(err,evs) => {
if(evs && evs[0]){
const videosToDelete = [];
evs.forEach(function(ev){
var filename
var details
try{
details = JSON.parse(ev.details)
}catch(err){
if(details instanceof Object){
details = ev.details
}else{
details = {}
}
}
var dir = getVideoDirectory(ev)
filename = formattedTime(ev.time)+'.'+ev.ext
fileExists = fs.existsSync(dir+filename)
if(fileExists !== true){
deleteVideo(ev)
sendToWebSocket({f:'video_delete',filename:filename+'.'+ev.ext,mid:ev.mid,ke:ev.ke,time:ev.time,end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')},'GRP_'+ev.ke);
}
});
if(videosToDelete.length > 0 || config.debugLog === true){
postMessage({f:'deleteNoVideo',msg:videosToDelete.length+' SQL rows with no file deleted',ke:v.ke,time:moment()})
}
}
setTimeout(function(){
resolve()
},3000)
})
}else{
resolve()
}
})
}
//info about what the application is doing
const deleteOldLogs = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.log_days && !isNaN(v.d.log_days) ? parseFloat(v.d.log_days) : 10
if(config.cron.deleteLogs === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Logs",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return console.error(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteLogs',msg: rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:moment()})
}
})
}else{
resolve()
}
})
}
//still images saved
const deleteOldTimelapseFrames = async function(v){
const daysOldForDeletion = v.d.timelapseFrames_days && !isNaN(v.d.timelapseFrames_days) ? parseFloat(v.d.timelapseFrames_days) : 60
if(config.cron.deleteTimelpaseFrames === true && daysOldForDeletion !== 0){
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion+' DAY')],
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Timelapse Frames",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
const foldersDeletedFrom = [];
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getTimelapseFrameDirectory(row)
const filename = row.filename
const theDate = filename.split('T')[0]
const enclosingFolder = `${dir}/${theDate}/`
try{
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'timelapse_frame_delete',
filename: filename,
mid: row.mid,
ke: groupKey,
time: row.time,
details: row.details,
},'GRP_' + groupKey)
await fs.promises.unlink(`${enclosingFolder}${filename}`)
if(foldersDeletedFrom.indexOf(enclosingFolder) === -1)foldersDeletedFrom.push(enclosingFolder);
}catch(err){
console.log('Timelapse Frame Delete Error',row)
console.log(err)
}
}
for (i = 0; i < foldersDeletedFrom.length; i++) {
const folderPath = foldersDeletedFrom[i];
const folderIsEmpty = (await fs.promises.readdir(folderPath)).filter(file => file.indexOf('.jpg') > -1).length === 0;
if(folderIsEmpty){
await fs.promises.rm(folderPath, { recursive: true, force: true })
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Timelapse Frames",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
return {
ok: false
}
}
//events - motion, object, etc. detections
const deleteOldEvents = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return console.error(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:moment()})
}
})
}else{
resolve()
}
})
}
//event counts
const deleteOldEventCounts = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events Counts",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err && err.code !== 'ER_NO_SUCH_TABLE')return console.error(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:moment()})
}
})
}else{
resolve()
}
})
}
//check for temporary files (special archive)
const deleteOldFileBins = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.fileBin_days && !isNaN(v.d.fileBin_days) ? parseFloat(v.d.fileBin_days) : 10
if(config.cron.deleteFileBins === true && daysOldForDeletion !== 0){
var fileBinQuery = " FROM Files WHERE ke=? AND `time` < ?";
knexQuery({
action: "select",
columns: "*",
table: "Files",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,files) => {
if(files && files[0]){
//delete the files
files.forEach(function(file){
deleteFileBinEntry(file)
})
if(config.debugLog === true){
postMessage({
f: 'deleteFileBins',
msg: files.length + ' files older than ' + daysOldForDeletion + ' days deleted',
ke: v.ke,
time: moment()
})
}
}
resolve()
})
}else{
resolve()
}
})
}
//user processing function
const processUser = async (v) => {
if(!v){
//no user object given, end of group list
return
}
s.debugLog(`Group Key : ${v.ke}`)
s.debugLog(`Owner : ${v.mail}`)
if(!overlapLocks[v.ke]){
s.debugLog(`Checking...`)
overlapLocks[v.ke] = true
v.d = JSON.parse(v.details);
try{
await deleteOldVideos(v)
s.debugLog('--- deleteOldVideos Complete')
await deleteOldTimelapseFrames(v)
s.debugLog('--- deleteOldTimelapseFrames Complete')
await deleteOldLogs(v)
s.debugLog('--- deleteOldLogs Complete')
await deleteOldFileBins(v)
s.debugLog('--- deleteOldFileBins Complete')
await deleteOldEvents(v)
s.debugLog('--- deleteOldEvents Complete')
await deleteOldEventCounts(v)
s.debugLog('--- deleteOldEventCounts Complete')
await checkFilterRules(v)
s.debugLog('--- checkFilterRules Complete')
await deleteRowsWithNoVideo(v)
s.debugLog('--- deleteRowsWithNoVideo Complete')
}catch(err){
console.log(`Failed to Complete User : ${v.mail}`)
console.log(err)
}
//done user, unlock current, and do next
overlapLocks[v.ke] = false;
s.debugLog(`Complete Checking... ${v.mail}`)
}else{
s.debugLog(`Locked, Skipped...`)
}
}
//recursive function
const setIntervalForCron = function(){
clearCronInterval()
// theCronInterval = setInterval(doCronJobs,1000 * 10)
theCronInterval = setInterval(doCronJobs,parseFloat(config.cron.interval)*60000*60)
}
const clearCronInterval = function(){
clearInterval(theCronInterval)
}
const doCronJobs = function(){
postMessage({
f: 'start',
time: moment()
})
knexQuery({
action: "select",
columns: "ke,uid,details,mail",
table: "Users",
where: [
['details','NOT LIKE','%"sub"%'],
]
}, async (err,rows) => {
if(err){
console.error(err)
}
if(rows.length > 0){
var i;
for (i = 0; i < rows.length; i++) {
await processUser(rows[i])
}
}
})
}
initiateDatabaseEngine()
const io = connectToMainProcess()
setIntervalForCron()
doCronJobs()
console.log('Shinobi : cron.js started')

63
libs/cron.js Normal file
View File

@ -0,0 +1,63 @@
const { Worker } = require('worker_threads');
const moment = require('moment');
module.exports = (s,config,lang) => {
const {
legacyFilterEvents
} = require('./events/utils.js')(s,config,lang)
if(config.doCronAsWorker===undefined)config.doCronAsWorker = true;
const startWorker = () => {
const pathToWorkerScript = __dirname + `/cron/worker.js`
const workerProcess = new Worker(pathToWorkerScript)
workerProcess.on('message',function(data){
if(data.time === 'moment()')data.time = moment();
switch(data.f){
case'debugLog':
s.debugLog(...data.data)
break;
case'systemLog':
s.systemLog(...data.data)
break;
case'filters':
legacyFilterEvents(data.ff,data)
break;
case's.tx':
s.tx(data.data,data.to)
break;
case's.deleteVideo':
s.deleteVideo(data.file)
break;
case's.deleteFileBinEntry':
s.deleteFileBinEntry(data.file)
break;
case's.setDiskUsedForGroup':
function doOnMain(){
s.setDiskUsedForGroup(data.ke,data.size,data.target || undefined)
}
if(data.videoRow){
let storageIndex = s.getVideoStorageIndex(data.videoRow);
if(storageIndex){
s.setDiskUsedForGroupAddStorage(data.ke,{
size: data.size,
storageIndex: storageIndex
})
}else{
doOnMain()
}
}else{
doOnMain()
}
break;
default:
s.systemLog('CRON.js MESSAGE : ',data)
break;
}
})
setTimeout(() => {
workerProcess.postMessage({
f: 'init',
})
},2000)
return workerProcess
}
if(config.doCronAsWorker === true)startWorker()
}

608
libs/cron/worker.js Normal file
View File

@ -0,0 +1,608 @@
const fs = require('fs');
const path = require('path');
const moment = require('moment');
const exec = require('child_process').exec;
const spawn = require('child_process').spawn;
const { parentPort, isMainThread } = require('worker_threads');
const config = require(process.cwd() + '/conf.json')
process.on('uncaughtException', function (err) {
errorLog('uncaughtException',err);
});
if(isMainThread){
console.log(`Shinobi now runs cron.js as child process.`)
console.error(`Shinobi now runs cron.js as child process.`)
setInterval(() => {
// console.log(`Please turn off cron.js process.`)
},1000 * 60 * 60 * 24 * 7)
return;
}
function setDefaultConfigOptions(){
if(config.cron===undefined)config.cron={};
if(config.cron.deleteOld===undefined)config.cron.deleteOld=true;
if(config.cron.deleteOrphans===undefined)config.cron.deleteOrphans=false;
if(config.cron.deleteNoVideo===undefined)config.cron.deleteNoVideo=true;
if(config.cron.deleteNoVideoRecursion===undefined)config.cron.deleteNoVideoRecursion=false;
if(config.cron.deleteOverMax===undefined)config.cron.deleteOverMax=true;
if(config.cron.deleteLogs===undefined)config.cron.deleteLogs=true;
if(config.cron.deleteTimelpaseFrames===undefined)config.cron.deleteTimelpaseFrames=true;
if(config.cron.deleteEvents===undefined)config.cron.deleteEvents=true;
if(config.cron.deleteFileBins===undefined)config.cron.deleteFileBins=true;
if(config.cron.interval===undefined)config.cron.interval=1;
if(config.databaseType===undefined){config.databaseType='mysql'}
if(config.databaseLogs===undefined){config.databaseLogs=false}
if(config.debugLog===undefined){config.debugLog=false}
if(!config.ip||config.ip===''||config.ip.indexOf('0.0.0.0')>-1)config.ip='localhost';
if(!config.videosDir)config.videosDir = process.cwd() + '/videos/';
if(!config.binDir){config.binDir = process.cwd() + '/fileBin/'}
}
parentPort.on('message',(data) => {
switch(data.f){
case'init':
setDefaultConfigOptions()
beginProcessing()
break;
case'start':case'restart':
setIntervalForCron()
break;
case'stop':
clearCronInterval()
break;
}
})
function debugLog(...args){
if(config.debugLog === true){
console.log(...([`CRON.js DEBUG LOG ${new Date()}`].concat(args)))
}
}
function normalLog(...args){
console.log(...([`CRON.js LOG ${new Date()}`].concat(args)))
}
function errorLog(...args){
console.error(...([`CRON.js ERROR LOG ${new Date()}`].concat(args)))
}
const s = {
debugLog,
}
function beginProcessing(){
normalLog(`Worker Processing!`)
const {
checkCorrectPathEnding,
generateRandomId,
formattedTime,
localToUtc,
} = require('../basic/utils.js')(process.cwd())
const {
sqlDate,
knexQuery,
knexQueryPromise,
initiateDatabaseEngine
} = require('../sql/utils.js')(s,config)
var theCronInterval = null
const overlapLocks = {}
const alreadyDeletedRowsWithNoVideosOnStart = {}
const videoDirectory = checkCorrectPathEnding(config.videosDir)
const fileBinDirectory = checkCorrectPathEnding(config.binDir)
const postMessage = (data) => {
parentPort.postMessage(data)
}
const sendToWebSocket = (x,y) => {
//emulate master socket emitter
postMessage({f:'s.tx',data:x,to:y})
}
const deleteVideo = (x) => {
postMessage({f:'s.deleteVideo',file:x})
}
const deleteFileBinEntry = (x) => {
postMessage({f:'s.deleteFileBinEntry',file:x})
}
const setDiskUsedForGroup = (groupKey,size,target,videoRow) => {
postMessage({f:'s.setDiskUsedForGroup', ke: groupKey, size: size, target: target, videoRow: videoRow})
}
const getVideoDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid};
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'/'
}else{
return videoDirectory + e.ke + '/' + e.id + '/'
}
}
const getTimelapseFrameDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid}
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details&&e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'_timelapse/'
}else{
return videoDirectory + e.ke + '/' + e.id + '_timelapse/'
}
}
const getFileBinDirectory = function(e){
if(e.mid && !e.id){e.id = e.mid}
return fileBinDirectory + e.ke + '/' + e.id + '/'
}
//filters set by the user in their dashboard
//deleting old videos is part of the filter - config.cron.deleteOld
const checkFilterRules = function(v){
return new Promise((resolve,reject) => {
//filters
v.d.filters = v.d.filters ? v.d.filters : {}
debugLog('Checking Basic Filters...')
var keys = Object.keys(v.d.filters)
if(keys.length>0){
keys.forEach(function(m,current){
// b = filter
var b = v.d.filters[m];
debugLog(b)
if(b.enabled==="1"){
const whereQuery = [
['ke','=',v.ke],
['status','!=',"0"],
['details','NOT LIKE','%"archived":"1"%'],
]
b.where.forEach(function(condition){
if(condition.p1 === 'ke'){condition.p3 = v.ke}
whereQuery.push([condition.p1,condition.p2 || '=',condition.p3])
})
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery,
orderBy: [b.sort_by,b.sort_by_direction.toLowerCase()],
limit: b.limit
},(err,r) => {
if(r && r[0]){
if(r.length > 0 || config.debugLog === true){
postMessage({f:'filterMatch',msg:r.length+' SQL rows match "'+m+'"',ke:v.ke,time:'moment()'})
}
b.cx={
f:'filters',
name:b.name,
videos:r,
time:'moment()',
ke:v.ke,
id:b.id
};
if(b.archive==="1"){
postMessage({f:'filters',ff:'archive',videos:r,time:'moment()',ke:v.ke,id:b.id});
}else if(b.delete==="1"){
postMessage({f:'filters',ff:'delete',videos:r,time:'moment()',ke:v.ke,id:b.id});
}
if(b.email==="1"){
b.cx.ff='email';
b.cx.delete=b.delete;
b.cx.mail=v.mail;
b.cx.execute=b.execute;
b.cx.query=b.sql;
postMessage(b.cx);
}
if(b.execute&&b.execute!==""){
postMessage({f:'filters',ff:'execute',execute:b.execute,time:'moment()'});
}
}
})
}
if(current===keys.length-1){
//last filter
resolve()
}
})
}else{
//no filters
resolve()
}
})
}
const deleteVideosByDays = async (v,days,addedQueries) => {
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(days+' DAY')],
addedQueries
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getVideoDirectory(row)
const filename = formattedTime(row.time) + '.' + row.ext
try{
await fs.promises.unlink(dir + filename)
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'video_delete',
filename: filename + '.' + row.ext,
mid: row.mid,
ke: row.ke,
time: row.time,
end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')
},'GRP_' + row.ke)
}catch(err){
normalLog('Video Delete Error',row)
normalLog(err)
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Videos",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
const deleteOldVideos = async (v) => {
// v = group, admin user
if(config.cron.deleteOld === true){
const daysOldForDeletion = v.d.days && !isNaN(v.d.days) ? parseFloat(v.d.days) : 5
const monitorsIgnored = []
const monitorsResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Monitors",
where: [
['ke','=',v.ke],
]
})
const monitorRows = monitorsResponse.rows
var i;
for (i = 0; i < monitorRows.length; i++) {
const monitor = monitorRows[i]
const monitorId = monitor.mid
const details = JSON.parse(monitor.details);
const monitorsMaxDaysToKeep = !isNaN(details.max_keep_days) ? parseFloat(details.max_keep_days) : null
if(monitorsMaxDaysToKeep){
const { affectedRows } = await deleteVideosByDays(v,monitorsMaxDaysToKeep,['mid','=',monitorId])
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideosByMonitorId',
msg: `${affectedRows} SQL rows older than ${monitorsMaxDaysToKeep} days deleted`,
ke: v.ke,
mid: monitorId,
time: 'moment()',
})
}
monitorsIgnored.push(['mid','!=',monitorId])
}
}
const { affectedRows } = await deleteVideosByDays(v,daysOldForDeletion,monitorsIgnored)
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideos',
msg: `${affectedRows} SQL rows older than ${daysOldForDeletion} days deleted`,
ke: v.ke,
time: 'moment()',
})
}
}
}
//database rows with no videos in the filesystem
const deleteRowsWithNoVideo = function(v){
return new Promise((resolve,reject) => {
if(
config.cron.deleteNoVideo===true&&(
config.cron.deleteNoVideoRecursion===true||
(config.cron.deleteNoVideoRecursion===false&&!alreadyDeletedRowsWithNoVideosOnStart[v.ke])
)
){
alreadyDeletedRowsWithNoVideosOnStart[v.ke]=true;
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: [
['ke','=',v.ke],
['status','!=','0'],
['details','NOT LIKE','%"archived":"1"%'],
['time','<', sqlDate('10 MINUTE')],
]
},(err,evs) => {
if(evs && evs[0]){
const videosToDelete = [];
evs.forEach(function(ev){
var filename
var details
try{
details = JSON.parse(ev.details)
}catch(err){
if(details instanceof Object){
details = ev.details
}else{
details = {}
}
}
var dir = getVideoDirectory(ev)
filename = formattedTime(ev.time)+'.'+ev.ext
fileExists = fs.existsSync(dir+filename)
if(fileExists !== true){
deleteVideo(ev)
sendToWebSocket({f:'video_delete',filename:filename+'.'+ev.ext,mid:ev.mid,ke:ev.ke,time:ev.time,end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')},'GRP_'+ev.ke);
}
});
if(videosToDelete.length > 0 || config.debugLog === true){
postMessage({f:'deleteNoVideo',msg:videosToDelete.length+' SQL rows with no file deleted',ke:v.ke,time:'moment()'})
}
}
setTimeout(function(){
resolve()
},3000)
})
}else{
resolve()
}
})
}
//info about what the application is doing
const deleteOldLogs = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.log_days && !isNaN(v.d.log_days) ? parseFloat(v.d.log_days) : 10
if(config.cron.deleteLogs === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Logs",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return errorLog(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteLogs',msg: rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:'moment()'})
}
})
}else{
resolve()
}
})
}
//still images saved
const deleteOldTimelapseFrames = async function(v){
const daysOldForDeletion = v.d.timelapseFrames_days && !isNaN(v.d.timelapseFrames_days) ? parseFloat(v.d.timelapseFrames_days) : 60
if(config.cron.deleteTimelpaseFrames === true && daysOldForDeletion !== 0){
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion+' DAY')],
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Timelapse Frames",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
const foldersDeletedFrom = [];
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getTimelapseFrameDirectory(row)
const filename = row.filename
const theDate = filename.split('T')[0]
const enclosingFolder = `${dir}/${theDate}/`
try{
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'timelapse_frame_delete',
filename: filename,
mid: row.mid,
ke: groupKey,
time: row.time,
details: row.details,
},'GRP_' + groupKey)
await fs.promises.unlink(`${enclosingFolder}${filename}`)
if(foldersDeletedFrom.indexOf(enclosingFolder) === -1)foldersDeletedFrom.push(enclosingFolder);
}catch(err){
normalLog('Timelapse Frame Delete Error',row)
normalLog(err)
}
}
for (i = 0; i < foldersDeletedFrom.length; i++) {
const folderPath = foldersDeletedFrom[i];
const folderIsEmpty = (await fs.promises.readdir(folderPath)).filter(file => file.indexOf('.jpg') > -1).length === 0;
if(folderIsEmpty){
await fs.promises.rm(folderPath, { recursive: true, force: true })
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Timelapse Frames",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
return {
ok: false
}
}
//events - motion, object, etc. detections
const deleteOldEvents = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return errorLog(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:'moment()'})
}
})
}else{
resolve()
}
})
}
//event counts
const deleteOldEventCounts = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events Counts",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err && err.code !== 'ER_NO_SUCH_TABLE')return errorLog(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:'moment()'})
}
})
}else{
resolve()
}
})
}
//check for temporary files (special archive)
const deleteOldFileBins = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.fileBin_days && !isNaN(v.d.fileBin_days) ? parseFloat(v.d.fileBin_days) : 10
if(config.cron.deleteFileBins === true && daysOldForDeletion !== 0){
var fileBinQuery = " FROM Files WHERE ke=? AND `time` < ?";
knexQuery({
action: "select",
columns: "*",
table: "Files",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,files) => {
if(files && files[0]){
//delete the files
files.forEach(function(file){
deleteFileBinEntry(file)
})
if(config.debugLog === true){
postMessage({
f: 'deleteFileBins',
msg: files.length + ' files older than ' + daysOldForDeletion + ' days deleted',
ke: v.ke,
time: 'moment()'
})
}
}
resolve()
})
}else{
resolve()
}
})
}
//user processing function
const processUser = async (v) => {
if(!v){
//no user object given, end of group list
return
}
debugLog(`Group Key : ${v.ke}`)
debugLog(`Owner : ${v.mail}`)
if(!overlapLocks[v.ke]){
debugLog(`Checking...`)
overlapLocks[v.ke] = true
v.d = JSON.parse(v.details);
try{
await deleteOldVideos(v)
debugLog('--- deleteOldVideos Complete')
await deleteOldTimelapseFrames(v)
debugLog('--- deleteOldTimelapseFrames Complete')
await deleteOldLogs(v)
debugLog('--- deleteOldLogs Complete')
await deleteOldFileBins(v)
debugLog('--- deleteOldFileBins Complete')
await deleteOldEvents(v)
debugLog('--- deleteOldEvents Complete')
await deleteOldEventCounts(v)
debugLog('--- deleteOldEventCounts Complete')
await checkFilterRules(v)
debugLog('--- checkFilterRules Complete')
await deleteRowsWithNoVideo(v)
debugLog('--- deleteRowsWithNoVideo Complete')
}catch(err){
normalLog(`Failed to Complete User : ${v.mail}`)
normalLog(err)
}
//done user, unlock current, and do next
overlapLocks[v.ke] = false;
debugLog(`Complete Checking... ${v.mail}`)
}else{
debugLog(`Locked, Skipped...`)
}
}
//recursive function
const setIntervalForCron = function(){
clearCronInterval()
// theCronInterval = setInterval(doCronJobs,1000 * 10)
theCronInterval = setInterval(doCronJobs,parseFloat(config.cron.interval)*60000*60)
}
const clearCronInterval = function(){
clearInterval(theCronInterval)
}
const doCronJobs = function(){
postMessage({
f: 'start',
time: 'moment()'
})
knexQuery({
action: "select",
columns: "ke,uid,details,mail",
table: "Users",
where: [
['details','NOT LIKE','%"sub"%'],
]
}, async (err,rows) => {
if(err){
errorLog(err)
}
if(rows.length > 0){
var i;
for (i = 0; i < rows.length; i++) {
await processUser(rows[i])
}
}
})
}
initiateDatabaseEngine()
setIntervalForCron()
doCronJobs()
}

View File

@ -7,9 +7,6 @@ const {
stringToSqlTime, stringToSqlTime,
} = require('./common.js') } = require('./common.js')
module.exports = function(s,config,lang,io){ module.exports = function(s,config,lang,io){
const {
legacyFilterEvents
} = require('./events/utils.js')(s,config,lang)
const { const {
ptzControl ptzControl
} = require('./control/ptz.js')(s,config,lang) } = require('./control/ptz.js')(s,config,lang)
@ -937,63 +934,6 @@ module.exports = function(s,config,lang,io){
} }
}) })
//functions for retrieving cron announcements //functions for retrieving cron announcements
cn.on('cron',function(d){
if(d.f==='init'){
if(config.cron.key){
if(config.cron.key===d.cronKey){
s.cron={started:moment(),last_run:moment(),id:cn.id};
}else{
cn.disconnect()
}
}else{
s.cron={started:moment(),last_run:moment(),id:cn.id};
}
}else{
if(s.cron&&cn.id===s.cron.id){
delete(d.cronKey)
switch(d.f){
case'filters':
legacyFilterEvents(d.ff,d)
break;
case's.tx':
s.tx(d.data,d.to)
break;
case's.deleteVideo':
s.deleteVideo(d.file)
break;
case's.deleteFileBinEntry':
s.deleteFileBinEntry(d.file)
break;
case's.setDiskUsedForGroup':
function doOnMain(){
s.setDiskUsedForGroup(d.ke,d.size,d.target || undefined)
}
if(d.videoRow){
let storageIndex = s.getVideoStorageIndex(d.videoRow);
if(storageIndex){
s.setDiskUsedForGroupAddStorage(d.ke,{
size: d.size,
storageIndex: storageIndex
})
}else{
doOnMain()
}
}else{
doOnMain()
}
break;
case'start':case'end':
d.mid='_cron';s.userLog(d,{type:'cron',msg:d.msg})
break;
default:
s.systemLog('CRON : ',d)
break;
}
}else{
cn.disconnect()
}
}
})
cn.on('disconnect', function () { cn.on('disconnect', function () {
if(cn.socketVideoStream){ if(cn.socketVideoStream){
cn.closeSocketVideoStream() cn.closeSocketVideoStream()