Merge branch 'cron-as-worker-process' into 'dev'

Make cron.js a Worker Process

See merge request Shinobi-Systems/Shinobi!364
merge-requests/367/merge
Moe 2022-08-11 05:50:27 +00:00
commit f10ebdcfb5
6 changed files with 684 additions and 669 deletions

View File

@ -89,6 +89,8 @@ require('./libs/ffmpeg.js')(s,config,lang, async () => {
require('./libs/onvifDeviceManager.js')(s,config,lang,app,io)
//alternate logins
require('./libs/auth/logins.js')(s,config,lang,app)
//cron
require('./libs/cron.js')(s,config,lang)
//on-start actions, daemon(s) starter
await require('./libs/startup.js')(s,config,lang)
//p2p, commander

613
cron.js
View File

@ -1,606 +1,9 @@
process.on('uncaughtException', function (err) {
console.error('uncaughtException',err);
});
const fs = require('fs');
const path = require('path');
const moment = require('moment');
const exec = require('child_process').exec;
const spawn = require('child_process').spawn;
const config = require(process.cwd() + '/conf.json')
//set option defaults
s = {
mainDirectory: process.cwd(),
utcOffset: moment().utcOffset()
};
if(config.cron===undefined)config.cron={};
if(config.cron.deleteOld===undefined)config.cron.deleteOld=true;
if(config.cron.deleteOrphans===undefined)config.cron.deleteOrphans=false;
if(config.cron.deleteNoVideo===undefined)config.cron.deleteNoVideo=true;
if(config.cron.deleteNoVideoRecursion===undefined)config.cron.deleteNoVideoRecursion=false;
if(config.cron.deleteOverMax===undefined)config.cron.deleteOverMax=true;
if(config.cron.deleteLogs===undefined)config.cron.deleteLogs=true;
if(config.cron.deleteTimelpaseFrames===undefined)config.cron.deleteTimelpaseFrames=true;
if(config.cron.deleteEvents===undefined)config.cron.deleteEvents=true;
if(config.cron.deleteFileBins===undefined)config.cron.deleteFileBins=true;
if(config.cron.interval===undefined)config.cron.interval=1;
if(config.databaseType===undefined){config.databaseType='mysql'}
if(config.databaseLogs===undefined){config.databaseLogs=false}
if(config.useUTC===undefined){config.useUTC=false}
if(config.debugLog===undefined){config.debugLog=false}
if(!config.ip||config.ip===''||config.ip.indexOf('0.0.0.0')>-1)config.ip='localhost';
if(!config.videosDir)config.videosDir = s.mainDirectory + '/videos/';
if(!config.binDir){config.binDir = s.mainDirectory + '/fileBin/'}
const {
checkCorrectPathEnding,
generateRandomId,
formattedTime,
localToUtc,
} = require('./libs/basic/utils.js')(s.mainDirectory)
const {
sqlDate,
knexQuery,
knexQueryPromise,
initiateDatabaseEngine
} = require('./libs/sql/utils.js')(s,config)
var theCronInterval = null
const overlapLocks = {}
const alreadyDeletedRowsWithNoVideosOnStart = {}
const videoDirectory = checkCorrectPathEnding(config.videosDir)
const fileBinDirectory = checkCorrectPathEnding(config.binDir)
s.debugLog = function(arg1,arg2){
if(config.debugLog === true){
if(!arg2)arg2 = ''
console.log(arg1,arg2)
}
const { parentPort, isMainThread } = require('worker_threads');
if(isMainThread){
console.log(`Shinobi now runs cron.js as a worker process from camera.js.`)
console.error(`Shinobi now runs cron.js as a worker process from camera.js.`)
setInterval(() => {
// console.log(`Please turn off cron.js process.`)
},1000 * 60 * 60 * 24 * 7)
return;
}
const connectToMainProcess = () => {
const io = require('socket.io-client')('ws://'+config.ip+':'+config.port,{
transports:['websocket']
});
io.on('connect',function(d){
postMessage({
f: 'init',
time: moment()
})
})
io.on('f',function(d){
//command from main process
switch(d.f){
case'start':case'restart':
setIntervalForCron()
break;
case'stop':
clearCronInterval()
break;
}
})
return io
}
const postMessage = (x) => {
x.cronKey = config.cron.key;
return io.emit('cron',x)
}
const sendToWebSocket = (x,y) => {
//emulate master socket emitter
postMessage({f:'s.tx',data:x,to:y})
}
const deleteVideo = (x) => {
postMessage({f:'s.deleteVideo',file:x})
}
const deleteFileBinEntry = (x) => {
postMessage({f:'s.deleteFileBinEntry',file:x})
}
const setDiskUsedForGroup = (groupKey,size,target,videoRow) => {
postMessage({f:'s.setDiskUsedForGroup', ke: groupKey, size: size, target: target, videoRow: videoRow})
}
const getVideoDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid};
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'/'
}else{
return videoDirectory + e.ke + '/' + e.id + '/'
}
}
const getTimelapseFrameDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid}
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details&&e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'_timelapse/'
}else{
return videoDirectory + e.ke + '/' + e.id + '_timelapse/'
}
}
const getFileBinDirectory = function(e){
if(e.mid && !e.id){e.id = e.mid}
return fileBinDirectory + e.ke + '/' + e.id + '/'
}
//filters set by the user in their dashboard
//deleting old videos is part of the filter - config.cron.deleteOld
const checkFilterRules = function(v){
return new Promise((resolve,reject) => {
//filters
v.d.filters = v.d.filters ? v.d.filters : {}
s.debugLog('Checking Basic Filters...')
var keys = Object.keys(v.d.filters)
if(keys.length>0){
keys.forEach(function(m,current){
// b = filter
var b = v.d.filters[m];
s.debugLog(b)
if(b.enabled==="1"){
const whereQuery = [
['ke','=',v.ke],
['status','!=',"0"],
['details','NOT LIKE','%"archived":"1"%'],
]
b.where.forEach(function(condition){
if(condition.p1 === 'ke'){condition.p3 = v.ke}
whereQuery.push([condition.p1,condition.p2 || '=',condition.p3])
})
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery,
orderBy: [b.sort_by,b.sort_by_direction.toLowerCase()],
limit: b.limit
},(err,r) => {
if(r && r[0]){
if(r.length > 0 || config.debugLog === true){
postMessage({f:'filterMatch',msg:r.length+' SQL rows match "'+m+'"',ke:v.ke,time:moment()})
}
b.cx={
f:'filters',
name:b.name,
videos:r,
time:moment(),
ke:v.ke,
id:b.id
};
if(b.archive==="1"){
postMessage({f:'filters',ff:'archive',videos:r,time:moment(),ke:v.ke,id:b.id});
}else if(b.delete==="1"){
postMessage({f:'filters',ff:'delete',videos:r,time:moment(),ke:v.ke,id:b.id});
}
if(b.email==="1"){
b.cx.ff='email';
b.cx.delete=b.delete;
b.cx.mail=v.mail;
b.cx.execute=b.execute;
b.cx.query=b.sql;
postMessage(b.cx);
}
if(b.execute&&b.execute!==""){
postMessage({f:'filters',ff:'execute',execute:b.execute,time:moment()});
}
}
})
}
if(current===keys.length-1){
//last filter
resolve()
}
})
}else{
//no filters
resolve()
}
})
}
const deleteVideosByDays = async (v,days,addedQueries) => {
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(days+' DAY')],
addedQueries
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getVideoDirectory(row)
const filename = formattedTime(row.time) + '.' + row.ext
try{
await fs.promises.unlink(dir + filename)
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'video_delete',
filename: filename + '.' + row.ext,
mid: row.mid,
ke: row.ke,
time: row.time,
end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')
},'GRP_' + row.ke)
}catch(err){
console.log('Video Delete Error',row)
console.log(err)
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Videos",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
const deleteOldVideos = async (v) => {
// v = group, admin user
if(config.cron.deleteOld === true){
const daysOldForDeletion = v.d.days && !isNaN(v.d.days) ? parseFloat(v.d.days) : 5
const monitorsIgnored = []
const monitorsResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Monitors",
where: [
['ke','=',v.ke],
]
})
const monitorRows = monitorsResponse.rows
var i;
for (i = 0; i < monitorRows.length; i++) {
const monitor = monitorRows[i]
const monitorId = monitor.mid
const details = JSON.parse(monitor.details);
const monitorsMaxDaysToKeep = !isNaN(details.max_keep_days) ? parseFloat(details.max_keep_days) : null
if(monitorsMaxDaysToKeep){
const { affectedRows } = await deleteVideosByDays(v,monitorsMaxDaysToKeep,['mid','=',monitorId])
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideosByMonitorId',
msg: `${affectedRows} SQL rows older than ${monitorsMaxDaysToKeep} days deleted`,
ke: v.ke,
mid: monitorId,
time: moment(),
})
}
monitorsIgnored.push(['mid','!=',monitorId])
}
}
const { affectedRows } = await deleteVideosByDays(v,daysOldForDeletion,monitorsIgnored)
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideos',
msg: `${affectedRows} SQL rows older than ${daysOldForDeletion} days deleted`,
ke: v.ke,
time: moment(),
})
}
}
}
//database rows with no videos in the filesystem
const deleteRowsWithNoVideo = function(v){
return new Promise((resolve,reject) => {
if(
config.cron.deleteNoVideo===true&&(
config.cron.deleteNoVideoRecursion===true||
(config.cron.deleteNoVideoRecursion===false&&!alreadyDeletedRowsWithNoVideosOnStart[v.ke])
)
){
alreadyDeletedRowsWithNoVideosOnStart[v.ke]=true;
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: [
['ke','=',v.ke],
['status','!=','0'],
['details','NOT LIKE','%"archived":"1"%'],
['time','<', sqlDate('10 MINUTE')],
]
},(err,evs) => {
if(evs && evs[0]){
const videosToDelete = [];
evs.forEach(function(ev){
var filename
var details
try{
details = JSON.parse(ev.details)
}catch(err){
if(details instanceof Object){
details = ev.details
}else{
details = {}
}
}
var dir = getVideoDirectory(ev)
filename = formattedTime(ev.time)+'.'+ev.ext
fileExists = fs.existsSync(dir+filename)
if(fileExists !== true){
deleteVideo(ev)
sendToWebSocket({f:'video_delete',filename:filename+'.'+ev.ext,mid:ev.mid,ke:ev.ke,time:ev.time,end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')},'GRP_'+ev.ke);
}
});
if(videosToDelete.length > 0 || config.debugLog === true){
postMessage({f:'deleteNoVideo',msg:videosToDelete.length+' SQL rows with no file deleted',ke:v.ke,time:moment()})
}
}
setTimeout(function(){
resolve()
},3000)
})
}else{
resolve()
}
})
}
//info about what the application is doing
const deleteOldLogs = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.log_days && !isNaN(v.d.log_days) ? parseFloat(v.d.log_days) : 10
if(config.cron.deleteLogs === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Logs",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return console.error(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteLogs',msg: rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:moment()})
}
})
}else{
resolve()
}
})
}
//still images saved
const deleteOldTimelapseFrames = async function(v){
const daysOldForDeletion = v.d.timelapseFrames_days && !isNaN(v.d.timelapseFrames_days) ? parseFloat(v.d.timelapseFrames_days) : 60
if(config.cron.deleteTimelpaseFrames === true && daysOldForDeletion !== 0){
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion+' DAY')],
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Timelapse Frames",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
const foldersDeletedFrom = [];
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getTimelapseFrameDirectory(row)
const filename = row.filename
const theDate = filename.split('T')[0]
const enclosingFolder = `${dir}/${theDate}/`
try{
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'timelapse_frame_delete',
filename: filename,
mid: row.mid,
ke: groupKey,
time: row.time,
details: row.details,
},'GRP_' + groupKey)
await fs.promises.unlink(`${enclosingFolder}${filename}`)
if(foldersDeletedFrom.indexOf(enclosingFolder) === -1)foldersDeletedFrom.push(enclosingFolder);
}catch(err){
console.log('Timelapse Frame Delete Error',row)
console.log(err)
}
}
for (i = 0; i < foldersDeletedFrom.length; i++) {
const folderPath = foldersDeletedFrom[i];
const folderIsEmpty = (await fs.promises.readdir(folderPath)).filter(file => file.indexOf('.jpg') > -1).length === 0;
if(folderIsEmpty){
await fs.promises.rm(folderPath, { recursive: true, force: true })
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Timelapse Frames",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
return {
ok: false
}
}
//events - motion, object, etc. detections
const deleteOldEvents = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return console.error(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:moment()})
}
})
}else{
resolve()
}
})
}
//event counts
const deleteOldEventCounts = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events Counts",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err && err.code !== 'ER_NO_SUCH_TABLE')return console.error(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:moment()})
}
})
}else{
resolve()
}
})
}
//check for temporary files (special archive)
const deleteOldFileBins = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.fileBin_days && !isNaN(v.d.fileBin_days) ? parseFloat(v.d.fileBin_days) : 10
if(config.cron.deleteFileBins === true && daysOldForDeletion !== 0){
var fileBinQuery = " FROM Files WHERE ke=? AND `time` < ?";
knexQuery({
action: "select",
columns: "*",
table: "Files",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,files) => {
if(files && files[0]){
//delete the files
files.forEach(function(file){
deleteFileBinEntry(file)
})
if(config.debugLog === true){
postMessage({
f: 'deleteFileBins',
msg: files.length + ' files older than ' + daysOldForDeletion + ' days deleted',
ke: v.ke,
time: moment()
})
}
}
resolve()
})
}else{
resolve()
}
})
}
//user processing function
const processUser = async (v) => {
if(!v){
//no user object given, end of group list
return
}
s.debugLog(`Group Key : ${v.ke}`)
s.debugLog(`Owner : ${v.mail}`)
if(!overlapLocks[v.ke]){
s.debugLog(`Checking...`)
overlapLocks[v.ke] = true
v.d = JSON.parse(v.details);
try{
await deleteOldVideos(v)
s.debugLog('--- deleteOldVideos Complete')
await deleteOldTimelapseFrames(v)
s.debugLog('--- deleteOldTimelapseFrames Complete')
await deleteOldLogs(v)
s.debugLog('--- deleteOldLogs Complete')
await deleteOldFileBins(v)
s.debugLog('--- deleteOldFileBins Complete')
await deleteOldEvents(v)
s.debugLog('--- deleteOldEvents Complete')
await deleteOldEventCounts(v)
s.debugLog('--- deleteOldEventCounts Complete')
await checkFilterRules(v)
s.debugLog('--- checkFilterRules Complete')
await deleteRowsWithNoVideo(v)
s.debugLog('--- deleteRowsWithNoVideo Complete')
}catch(err){
console.log(`Failed to Complete User : ${v.mail}`)
console.log(err)
}
//done user, unlock current, and do next
overlapLocks[v.ke] = false;
s.debugLog(`Complete Checking... ${v.mail}`)
}else{
s.debugLog(`Locked, Skipped...`)
}
}
//recursive function
const setIntervalForCron = function(){
clearCronInterval()
// theCronInterval = setInterval(doCronJobs,1000 * 10)
theCronInterval = setInterval(doCronJobs,parseFloat(config.cron.interval)*60000*60)
}
const clearCronInterval = function(){
clearInterval(theCronInterval)
}
const doCronJobs = function(){
postMessage({
f: 'start',
time: moment()
})
knexQuery({
action: "select",
columns: "ke,uid,details,mail",
table: "Users",
where: [
['details','NOT LIKE','%"sub"%'],
]
}, async (err,rows) => {
if(err){
console.error(err)
}
if(rows.length > 0){
var i;
for (i = 0; i < rows.length; i++) {
await processUser(rows[i])
}
}
})
}
initiateDatabaseEngine()
const io = connectToMainProcess()
setIntervalForCron()
doCronJobs()
console.log('Shinobi : cron.js started')

View File

@ -216,12 +216,5 @@ module.exports = function(s){
}
]
}
if(config.cron.key === 'change_this_to_something_very_random__just_anything_other_than_this'){
console.error('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
console.error('!! Change your cron key in your conf.json. !!')
console.error(`!! If you're running Shinobi remotely you should do this now. !!`)
console.error('!! You can do this in the Super User panel or from terminal. !!')
console.error('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
}
return config
}

63
libs/cron.js Normal file
View File

@ -0,0 +1,63 @@
const { Worker } = require('worker_threads');
const moment = require('moment');
module.exports = (s,config,lang) => {
const {
legacyFilterEvents
} = require('./events/utils.js')(s,config,lang)
if(config.doCronAsWorker===undefined)config.doCronAsWorker = true;
const startWorker = () => {
const pathToWorkerScript = __dirname + `/cron/worker.js`
const workerProcess = new Worker(pathToWorkerScript)
workerProcess.on('message',function(data){
if(data.time === 'moment()')data.time = moment();
switch(data.f){
case'debugLog':
s.debugLog(...data.data)
break;
case'systemLog':
s.systemLog(...data.data)
break;
case'filters':
legacyFilterEvents(data.ff,data)
break;
case's.tx':
s.tx(data.data,data.to)
break;
case's.deleteVideo':
s.deleteVideo(data.file)
break;
case's.deleteFileBinEntry':
s.deleteFileBinEntry(data.file)
break;
case's.setDiskUsedForGroup':
function doOnMain(){
s.setDiskUsedForGroup(data.ke,data.size,data.target || undefined)
}
if(data.videoRow){
let storageIndex = s.getVideoStorageIndex(data.videoRow);
if(storageIndex){
s.setDiskUsedForGroupAddStorage(data.ke,{
size: data.size,
storageIndex: storageIndex
})
}else{
doOnMain()
}
}else{
doOnMain()
}
break;
default:
s.systemLog('CRON.js MESSAGE : ',data)
break;
}
})
setTimeout(() => {
workerProcess.postMessage({
f: 'init',
})
},2000)
return workerProcess
}
if(config.doCronAsWorker === true)startWorker()
}

608
libs/cron/worker.js Normal file
View File

@ -0,0 +1,608 @@
const fs = require('fs');
const path = require('path');
const moment = require('moment');
const exec = require('child_process').exec;
const spawn = require('child_process').spawn;
const { parentPort, isMainThread } = require('worker_threads');
const config = require(process.cwd() + '/conf.json')
process.on('uncaughtException', function (err) {
errorLog('uncaughtException',err);
});
if(isMainThread){
console.log(`Shinobi now runs cron.js as child process.`)
console.error(`Shinobi now runs cron.js as child process.`)
setInterval(() => {
// console.log(`Please turn off cron.js process.`)
},1000 * 60 * 60 * 24 * 7)
return;
}
function setDefaultConfigOptions(){
if(config.cron===undefined)config.cron={};
if(config.cron.deleteOld===undefined)config.cron.deleteOld=true;
if(config.cron.deleteOrphans===undefined)config.cron.deleteOrphans=false;
if(config.cron.deleteNoVideo===undefined)config.cron.deleteNoVideo=true;
if(config.cron.deleteNoVideoRecursion===undefined)config.cron.deleteNoVideoRecursion=false;
if(config.cron.deleteOverMax===undefined)config.cron.deleteOverMax=true;
if(config.cron.deleteLogs===undefined)config.cron.deleteLogs=true;
if(config.cron.deleteTimelpaseFrames===undefined)config.cron.deleteTimelpaseFrames=true;
if(config.cron.deleteEvents===undefined)config.cron.deleteEvents=true;
if(config.cron.deleteFileBins===undefined)config.cron.deleteFileBins=true;
if(config.cron.interval===undefined)config.cron.interval=1;
if(config.databaseType===undefined){config.databaseType='mysql'}
if(config.databaseLogs===undefined){config.databaseLogs=false}
if(config.debugLog===undefined){config.debugLog=false}
if(!config.ip||config.ip===''||config.ip.indexOf('0.0.0.0')>-1)config.ip='localhost';
if(!config.videosDir)config.videosDir = process.cwd() + '/videos/';
if(!config.binDir){config.binDir = process.cwd() + '/fileBin/'}
}
parentPort.on('message',(data) => {
switch(data.f){
case'init':
setDefaultConfigOptions()
beginProcessing()
break;
case'start':case'restart':
setIntervalForCron()
break;
case'stop':
clearCronInterval()
break;
}
})
function debugLog(...args){
if(config.debugLog === true){
console.log(...([`CRON.js DEBUG LOG ${new Date()}`].concat(args)))
}
}
function normalLog(...args){
console.log(...([`CRON.js LOG ${new Date()}`].concat(args)))
}
function errorLog(...args){
console.error(...([`CRON.js ERROR LOG ${new Date()}`].concat(args)))
}
const s = {
debugLog,
}
function beginProcessing(){
normalLog(`Worker Processing!`)
const {
checkCorrectPathEnding,
generateRandomId,
formattedTime,
localToUtc,
} = require('../basic/utils.js')(process.cwd())
const {
sqlDate,
knexQuery,
knexQueryPromise,
initiateDatabaseEngine
} = require('../sql/utils.js')(s,config)
var theCronInterval = null
const overlapLocks = {}
const alreadyDeletedRowsWithNoVideosOnStart = {}
const videoDirectory = checkCorrectPathEnding(config.videosDir)
const fileBinDirectory = checkCorrectPathEnding(config.binDir)
const postMessage = (data) => {
parentPort.postMessage(data)
}
const sendToWebSocket = (x,y) => {
//emulate master socket emitter
postMessage({f:'s.tx',data:x,to:y})
}
const deleteVideo = (x) => {
postMessage({f:'s.deleteVideo',file:x})
}
const deleteFileBinEntry = (x) => {
postMessage({f:'s.deleteFileBinEntry',file:x})
}
const setDiskUsedForGroup = (groupKey,size,target,videoRow) => {
postMessage({f:'s.setDiskUsedForGroup', ke: groupKey, size: size, target: target, videoRow: videoRow})
}
const getVideoDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid};
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'/'
}else{
return videoDirectory + e.ke + '/' + e.id + '/'
}
}
const getTimelapseFrameDirectory = function(e){
if(e.mid&&!e.id){e.id=e.mid}
if(e.details&&(e.details instanceof Object)===false){
try{e.details=JSON.parse(e.details)}catch(err){}
}
if(e.details&&e.details.dir&&e.details.dir!==''){
return checkCorrectPathEnding(e.details.dir)+e.ke+'/'+e.id+'_timelapse/'
}else{
return videoDirectory + e.ke + '/' + e.id + '_timelapse/'
}
}
const getFileBinDirectory = function(e){
if(e.mid && !e.id){e.id = e.mid}
return fileBinDirectory + e.ke + '/' + e.id + '/'
}
//filters set by the user in their dashboard
//deleting old videos is part of the filter - config.cron.deleteOld
const checkFilterRules = function(v){
return new Promise((resolve,reject) => {
//filters
v.d.filters = v.d.filters ? v.d.filters : {}
debugLog('Checking Basic Filters...')
var keys = Object.keys(v.d.filters)
if(keys.length>0){
keys.forEach(function(m,current){
// b = filter
var b = v.d.filters[m];
debugLog(b)
if(b.enabled==="1"){
const whereQuery = [
['ke','=',v.ke],
['status','!=',"0"],
['details','NOT LIKE','%"archived":"1"%'],
]
b.where.forEach(function(condition){
if(condition.p1 === 'ke'){condition.p3 = v.ke}
whereQuery.push([condition.p1,condition.p2 || '=',condition.p3])
})
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery,
orderBy: [b.sort_by,b.sort_by_direction.toLowerCase()],
limit: b.limit
},(err,r) => {
if(r && r[0]){
if(r.length > 0 || config.debugLog === true){
postMessage({f:'filterMatch',msg:r.length+' SQL rows match "'+m+'"',ke:v.ke,time:'moment()'})
}
b.cx={
f:'filters',
name:b.name,
videos:r,
time:'moment()',
ke:v.ke,
id:b.id
};
if(b.archive==="1"){
postMessage({f:'filters',ff:'archive',videos:r,time:'moment()',ke:v.ke,id:b.id});
}else if(b.delete==="1"){
postMessage({f:'filters',ff:'delete',videos:r,time:'moment()',ke:v.ke,id:b.id});
}
if(b.email==="1"){
b.cx.ff='email';
b.cx.delete=b.delete;
b.cx.mail=v.mail;
b.cx.execute=b.execute;
b.cx.query=b.sql;
postMessage(b.cx);
}
if(b.execute&&b.execute!==""){
postMessage({f:'filters',ff:'execute',execute:b.execute,time:'moment()'});
}
}
})
}
if(current===keys.length-1){
//last filter
resolve()
}
})
}else{
//no filters
resolve()
}
})
}
const deleteVideosByDays = async (v,days,addedQueries) => {
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(days+' DAY')],
addedQueries
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Videos",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getVideoDirectory(row)
const filename = formattedTime(row.time) + '.' + row.ext
try{
await fs.promises.unlink(dir + filename)
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'video_delete',
filename: filename + '.' + row.ext,
mid: row.mid,
ke: row.ke,
time: row.time,
end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')
},'GRP_' + row.ke)
}catch(err){
normalLog('Video Delete Error',row)
normalLog(err)
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Videos",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
const deleteOldVideos = async (v) => {
// v = group, admin user
if(config.cron.deleteOld === true){
const daysOldForDeletion = v.d.days && !isNaN(v.d.days) ? parseFloat(v.d.days) : 5
const monitorsIgnored = []
const monitorsResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Monitors",
where: [
['ke','=',v.ke],
]
})
const monitorRows = monitorsResponse.rows
var i;
for (i = 0; i < monitorRows.length; i++) {
const monitor = monitorRows[i]
const monitorId = monitor.mid
const details = JSON.parse(monitor.details);
const monitorsMaxDaysToKeep = !isNaN(details.max_keep_days) ? parseFloat(details.max_keep_days) : null
if(monitorsMaxDaysToKeep){
const { affectedRows } = await deleteVideosByDays(v,monitorsMaxDaysToKeep,['mid','=',monitorId])
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideosByMonitorId',
msg: `${affectedRows} SQL rows older than ${monitorsMaxDaysToKeep} days deleted`,
ke: v.ke,
mid: monitorId,
time: 'moment()',
})
}
monitorsIgnored.push(['mid','!=',monitorId])
}
}
const { affectedRows } = await deleteVideosByDays(v,daysOldForDeletion,monitorsIgnored)
const hasDeletedRows = affectedRows && affectedRows.length > 0;
if(hasDeletedRows || config.debugLog === true){
postMessage({
f: 'deleteOldVideos',
msg: `${affectedRows} SQL rows older than ${daysOldForDeletion} days deleted`,
ke: v.ke,
time: 'moment()',
})
}
}
}
//database rows with no videos in the filesystem
const deleteRowsWithNoVideo = function(v){
return new Promise((resolve,reject) => {
if(
config.cron.deleteNoVideo===true&&(
config.cron.deleteNoVideoRecursion===true||
(config.cron.deleteNoVideoRecursion===false&&!alreadyDeletedRowsWithNoVideosOnStart[v.ke])
)
){
alreadyDeletedRowsWithNoVideosOnStart[v.ke]=true;
knexQuery({
action: "select",
columns: "*",
table: "Videos",
where: [
['ke','=',v.ke],
['status','!=','0'],
['details','NOT LIKE','%"archived":"1"%'],
['time','<', sqlDate('10 MINUTE')],
]
},(err,evs) => {
if(evs && evs[0]){
const videosToDelete = [];
evs.forEach(function(ev){
var filename
var details
try{
details = JSON.parse(ev.details)
}catch(err){
if(details instanceof Object){
details = ev.details
}else{
details = {}
}
}
var dir = getVideoDirectory(ev)
filename = formattedTime(ev.time)+'.'+ev.ext
fileExists = fs.existsSync(dir+filename)
if(fileExists !== true){
deleteVideo(ev)
sendToWebSocket({f:'video_delete',filename:filename+'.'+ev.ext,mid:ev.mid,ke:ev.ke,time:ev.time,end: formattedTime(new Date,'YYYY-MM-DD HH:mm:ss')},'GRP_'+ev.ke);
}
});
if(videosToDelete.length > 0 || config.debugLog === true){
postMessage({f:'deleteNoVideo',msg:videosToDelete.length+' SQL rows with no file deleted',ke:v.ke,time:'moment()'})
}
}
setTimeout(function(){
resolve()
},3000)
})
}else{
resolve()
}
})
}
//info about what the application is doing
const deleteOldLogs = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.log_days && !isNaN(v.d.log_days) ? parseFloat(v.d.log_days) : 10
if(config.cron.deleteLogs === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Logs",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return errorLog(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteLogs',msg: rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:'moment()'})
}
})
}else{
resolve()
}
})
}
//still images saved
const deleteOldTimelapseFrames = async function(v){
const daysOldForDeletion = v.d.timelapseFrames_days && !isNaN(v.d.timelapseFrames_days) ? parseFloat(v.d.timelapseFrames_days) : 60
if(config.cron.deleteTimelpaseFrames === true && daysOldForDeletion !== 0){
const groupKey = v.ke;
const whereQuery = [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion+' DAY')],
]
const selectResponse = await knexQueryPromise({
action: "select",
columns: "*",
table: "Timelapse Frames",
where: whereQuery
})
const videoRows = selectResponse.rows
let affectedRows = 0
if(videoRows.length > 0){
const foldersDeletedFrom = [];
let clearSize = 0;
var i;
for (i = 0; i < videoRows.length; i++) {
const row = videoRows[i];
const dir = getTimelapseFrameDirectory(row)
const filename = row.filename
const theDate = filename.split('T')[0]
const enclosingFolder = `${dir}/${theDate}/`
try{
const fileSizeMB = row.size / 1048576;
setDiskUsedForGroup(groupKey,-fileSizeMB,null,row)
sendToWebSocket({
f: 'timelapse_frame_delete',
filename: filename,
mid: row.mid,
ke: groupKey,
time: row.time,
details: row.details,
},'GRP_' + groupKey)
await fs.promises.unlink(`${enclosingFolder}${filename}`)
if(foldersDeletedFrom.indexOf(enclosingFolder) === -1)foldersDeletedFrom.push(enclosingFolder);
}catch(err){
normalLog('Timelapse Frame Delete Error',row)
normalLog(err)
}
}
for (i = 0; i < foldersDeletedFrom.length; i++) {
const folderPath = foldersDeletedFrom[i];
const folderIsEmpty = (await fs.promises.readdir(folderPath)).filter(file => file.indexOf('.jpg') > -1).length === 0;
if(folderIsEmpty){
await fs.promises.rm(folderPath, { recursive: true, force: true })
}
}
const deleteResponse = await knexQueryPromise({
action: "delete",
table: "Timelapse Frames",
where: whereQuery
})
affectedRows = deleteResponse.rows || 0
}
return {
ok: true,
affectedRows: affectedRows,
}
}
return {
ok: false
}
}
//events - motion, object, etc. detections
const deleteOldEvents = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err)return errorLog(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:'moment()'})
}
})
}else{
resolve()
}
})
}
//event counts
const deleteOldEventCounts = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.event_days && !isNaN(v.d.event_days) ? parseFloat(v.d.event_days) : 10
if(config.cron.deleteEvents === true && daysOldForDeletion !== 0){
knexQuery({
action: "delete",
table: "Events Counts",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,rrr) => {
resolve()
if(err && err.code !== 'ER_NO_SUCH_TABLE')return errorLog(err);
if(rrr && rrr > 0 || config.debugLog === true){
postMessage({f:'deleteEvents',msg:rrr + ' SQL rows older than ' + daysOldForDeletion + ' days deleted',ke:v.ke,time:'moment()'})
}
})
}else{
resolve()
}
})
}
//check for temporary files (special archive)
const deleteOldFileBins = function(v){
return new Promise((resolve,reject) => {
const daysOldForDeletion = v.d.fileBin_days && !isNaN(v.d.fileBin_days) ? parseFloat(v.d.fileBin_days) : 10
if(config.cron.deleteFileBins === true && daysOldForDeletion !== 0){
var fileBinQuery = " FROM Files WHERE ke=? AND `time` < ?";
knexQuery({
action: "select",
columns: "*",
table: "Files",
where: [
['ke','=',v.ke],
['time','<', sqlDate(daysOldForDeletion + ' DAY')],
]
},(err,files) => {
if(files && files[0]){
//delete the files
files.forEach(function(file){
deleteFileBinEntry(file)
})
if(config.debugLog === true){
postMessage({
f: 'deleteFileBins',
msg: files.length + ' files older than ' + daysOldForDeletion + ' days deleted',
ke: v.ke,
time: 'moment()'
})
}
}
resolve()
})
}else{
resolve()
}
})
}
//user processing function
const processUser = async (v) => {
if(!v){
//no user object given, end of group list
return
}
debugLog(`Group Key : ${v.ke}`)
debugLog(`Owner : ${v.mail}`)
if(!overlapLocks[v.ke]){
debugLog(`Checking...`)
overlapLocks[v.ke] = true
v.d = JSON.parse(v.details);
try{
await deleteOldVideos(v)
debugLog('--- deleteOldVideos Complete')
await deleteOldTimelapseFrames(v)
debugLog('--- deleteOldTimelapseFrames Complete')
await deleteOldLogs(v)
debugLog('--- deleteOldLogs Complete')
await deleteOldFileBins(v)
debugLog('--- deleteOldFileBins Complete')
await deleteOldEvents(v)
debugLog('--- deleteOldEvents Complete')
await deleteOldEventCounts(v)
debugLog('--- deleteOldEventCounts Complete')
await checkFilterRules(v)
debugLog('--- checkFilterRules Complete')
await deleteRowsWithNoVideo(v)
debugLog('--- deleteRowsWithNoVideo Complete')
}catch(err){
normalLog(`Failed to Complete User : ${v.mail}`)
normalLog(err)
}
//done user, unlock current, and do next
overlapLocks[v.ke] = false;
debugLog(`Complete Checking... ${v.mail}`)
}else{
debugLog(`Locked, Skipped...`)
}
}
//recursive function
const setIntervalForCron = function(){
clearCronInterval()
// theCronInterval = setInterval(doCronJobs,1000 * 10)
theCronInterval = setInterval(doCronJobs,parseFloat(config.cron.interval)*60000*60)
}
const clearCronInterval = function(){
clearInterval(theCronInterval)
}
const doCronJobs = function(){
postMessage({
f: 'start',
time: 'moment()'
})
knexQuery({
action: "select",
columns: "ke,uid,details,mail",
table: "Users",
where: [
['details','NOT LIKE','%"sub"%'],
]
}, async (err,rows) => {
if(err){
errorLog(err)
}
if(rows.length > 0){
var i;
for (i = 0; i < rows.length; i++) {
await processUser(rows[i])
}
}
})
}
initiateDatabaseEngine()
setIntervalForCron()
doCronJobs()
}

View File

@ -7,6 +7,9 @@ const {
stringToSqlTime,
} = require('./common.js')
module.exports = function(s,config,lang,io){
const {
ptzControl
} = require('./control/ptz.js')(s,config,lang)
const {
legacyFilterEvents
} = require('./events/utils.js')(s,config,lang)
@ -941,63 +944,6 @@ module.exports = function(s,config,lang,io){
}
})
//functions for retrieving cron announcements
cn.on('cron',function(d){
if(d.f==='init'){
if(config.cron.key){
if(config.cron.key===d.cronKey){
s.cron={started:moment(),last_run:moment(),id:cn.id};
}else{
cn.disconnect()
}
}else{
s.cron={started:moment(),last_run:moment(),id:cn.id};
}
}else{
if(s.cron&&cn.id===s.cron.id){
delete(d.cronKey)
switch(d.f){
case'filters':
legacyFilterEvents(d.ff,d)
break;
case's.tx':
s.tx(d.data,d.to)
break;
case's.deleteVideo':
s.deleteVideo(d.file)
break;
case's.deleteFileBinEntry':
s.deleteFileBinEntry(d.file)
break;
case's.setDiskUsedForGroup':
function doOnMain(){
s.setDiskUsedForGroup(d.ke,d.size,d.target || undefined)
}
if(d.videoRow){
let storageIndex = s.getVideoStorageIndex(d.videoRow);
if(storageIndex){
s.setDiskUsedForGroupAddStorage(d.ke,{
size: d.size,
storageIndex: storageIndex
})
}else{
doOnMain()
}
}else{
doOnMain()
}
break;
case'start':case'end':
d.mid='_cron';s.userLog(d,{type:'cron',msg:d.msg})
break;
default:
s.systemLog('CRON : ',d)
break;
}
}else{
cn.disconnect()
}
}
})
cn.on('disconnect', function () {
if(cn.socketVideoStream){
cn.closeSocketVideoStream()