Shinobi/libs/childNode.js

163 lines
7.0 KiB
JavaScript
Raw Normal View History

const fs = require('fs');
const url = require('url');
const http = require('http');
const https = require('https');
const express = require('express');
const { createWebSocketServer, createWebSocketClient } = require('./basic/websocketTools.js')
2018-10-16 02:40:12 +00:00
module.exports = function(s,config,lang,app,io){
2018-10-18 04:45:15 +00:00
//setup Master for childNodes
if(
config.childNodes.enabled === true &&
config.childNodes.mode === 'master'
){
const {
2021-11-27 01:56:29 +00:00
getIpAddress,
initiateDataConnection,
initiateVideoTransferConnection,
onWebSocketDataFromChildNode,
onDataConnectionDisconnect,
initiateVideoWriteFromChildNode,
initiateTimelapseFrameWriteFromChildNode,
} = require('./childNode/utils.js')(s,config,lang,app,io)
2018-10-18 04:45:15 +00:00
s.childNodes = {};
const childNodesConnectionIndex = {};
const childNodeHTTP = express();
const childNodeServer = http.createServer(app);
const childNodeWebsocket = createWebSocketServer();
const childNodeFileRelay = createWebSocketServer();
childNodeServer.on('upgrade', function upgrade(request, socket, head) {
const pathname = url.parse(request.url).pathname;
if (pathname === '/childNode') {
childNodeWebsocket.handleUpgrade(request, socket, head, function done(ws) {
childNodeWebsocket.emit('connection', ws, request)
})
} else if (pathname === '/childNodeFileRelay') {
childNodeFileRelay.handleUpgrade(request, socket, head, function done(ws) {
childNodeFileRelay.emit('connection', ws, request)
})
} else {
socket.destroy();
}
2018-10-18 04:45:15 +00:00
});
const childNodeBindIP = config.childNodes.ip || config.bindip;
childNodeServer.listen(config.childNodes.port,childNodeBindIP,function(){
console.log(lang.Shinobi+' - CHILD NODE SERVER : ' + config.childNodes.port);
});
2021-11-25 17:20:50 +00:00
//send data to child node function
s.cx = function(data,connectionId){
childNodesConnectionIndex[connectionId].sendJson(data)
}
2018-10-18 04:45:15 +00:00
//child Node Websocket
childNodeWebsocket.on('connection', function (client, req) {
2018-10-18 04:45:15 +00:00
//functions for dispersing work to child servers;
2021-11-27 01:56:29 +00:00
const ipAddress = getIpAddress(req)
const connectionId = s.gid(10);
2021-11-27 01:56:29 +00:00
s.debugLog('Child Node Connection!',new Date(),ipAddress)
client.id = connectionId;
function onAuthenticate(d){
const data = JSON.parse(d);
const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1;
if(!client.shinobiChildAlreadyRegistered && data.f === 'init' && childNodeKeyAccepted){
2021-11-27 01:56:29 +00:00
initiateDataConnection(client,req,data,connectionId);
childNodesConnectionIndex[connectionId] = client;
client.removeListener('message',onAuthenticate)
client.on('message',(d) => {
const data = JSON.parse(d);
onWebSocketDataFromChildNode(client,data)
})
}else{
2021-11-27 01:56:29 +00:00
s.debugLog('Child Node Force Disconnected!',new Date(),ipAddress)
client.destroy()
2018-10-18 04:42:32 +00:00
}
}
client.on('message',onAuthenticate)
client.on('close',() => {
onDataConnectionDisconnect(client, req)
2018-09-28 05:37:08 +00:00
})
2018-10-18 04:45:15 +00:00
})
childNodeFileRelay.on('connection', function (client, req) {
function onAuthenticate(d){
const data = JSON.parse(d);
const childNodeKeyAccepted = config.childNodes.key.indexOf(data.socketKey) > -1;
if(!client.alreadyInitiated && data.fileType && childNodeKeyAccepted){
client.alreadyInitiated = true;
client.removeListener('message',onAuthenticate)
switch(data.fileType){
case'video':
initiateVideoWriteFromChildNode(client,data.options,data.connectionId)
break;
case'timelapseFrame':
initiateTimelapseFrameWriteFromChildNode(client,data.options,data.connectionId)
break;
}
}else{
client.destroy()
}
}
client.on('message',onAuthenticate)
})
2018-10-18 04:45:15 +00:00
}else
//setup Child for childNodes
if(
config.childNodes.enabled === true &&
config.childNodes.mode === 'child' &&
config.childNodes.host
){
2021-11-25 17:20:50 +00:00
const {
initiateConnectionToMasterNode,
onDisconnectFromMasterNode,
onDataFromMasterNode,
} = require('./childNode/childUtils.js')(s,config,lang,app,io)
s.connectedToMasterNode = false;
let childIO;
function createChildNodeConnection(){
childIO = createWebSocketClient('ws://'+config.childNodes.host + '/childNode',{
onMessage: onDataFromMasterNode
})
childIO.on('open', function(){
console.error(new Date(),'Child Nodes : Connected to Master Node! Authenticating...');
initiateConnectionToMasterNode()
})
childIO.on('close',function(){
onDisconnectFromMasterNode()
setTimeout(() => {
console.error(new Date(),'Child Nodes : Connection to Master Node Closed. Attempting Reconnect...');
createChildNodeConnection()
},3000)
})
childIO.on('error',function(err){
console.error(new Date(),'Child Nodes ERROR : ', err.message);
childIO.close()
})
}
createChildNodeConnection()
2021-11-25 17:20:50 +00:00
function sendDataToMasterNode(data){
childIO.send(JSON.stringify(data))
}
2021-11-25 17:20:50 +00:00
s.cx = sendDataToMasterNode;
// replace internal functions with bridges to master node
2021-11-25 17:20:50 +00:00
s.tx = function(x,y){
sendDataToMasterNode({f:'s.tx',data:x,to:y})
}
s.userLog = function(x,y){
sendDataToMasterNode({f:'s.userLog',mon:x,data:y})
}
2018-10-18 04:45:15 +00:00
s.queuedSqlCallbacks = {}
s.sqlQuery = function(query,values,onMoveOn){
var callbackId = s.gid()
if(!values){values=[]}
if(typeof values === 'function'){
var onMoveOn = values;
var values = [];
}
if(typeof onMoveOn === 'function')s.queuedSqlCallbacks[callbackId] = onMoveOn;
2021-11-25 17:20:50 +00:00
sendDataToMasterNode({f:'sql',query:query,values:values,callbackId:callbackId});
2018-10-18 04:42:32 +00:00
}
s.knexQuery = function(options,onMoveOn){
var callbackId = s.gid()
if(typeof onMoveOn === 'function')s.queuedSqlCallbacks[callbackId] = onMoveOn;
2021-11-25 17:20:50 +00:00
sendDataToMasterNode({f:'knex',options:options,callbackId:callbackId});
}
2018-09-28 05:37:08 +00:00
}
}