276 lines
10 KiB
JavaScript
276 lines
10 KiB
JavaScript
const { parentPort } = require('worker_threads');
|
|
const fetch = require('node-fetch');
|
|
const socketIOClient = require('socket.io-client');
|
|
const p2pClientConnectionStaticName = 'Commander'
|
|
const p2pClientConnections = {}
|
|
const runningRequests = {}
|
|
const connectedUserWebSockets = {}
|
|
let connectionToP2PServer
|
|
const s = {
|
|
debugLog: (...args) => {
|
|
parentPort.postMessage({
|
|
f: 'debugLog',
|
|
data: args
|
|
})
|
|
},
|
|
systemLog: (...args) => {
|
|
parentPort.postMessage({
|
|
f: 'systemLog',
|
|
data: args
|
|
})
|
|
},
|
|
}
|
|
parentPort.on('message',(data) => {
|
|
switch(data.f){
|
|
case'init':
|
|
initialize(data.config,data.lang)
|
|
break;
|
|
case'exit':
|
|
s.debugLog('Closing P2P Connection...')
|
|
process.exit(0)
|
|
break;
|
|
}
|
|
})
|
|
|
|
const initialize = (config,lang) => {
|
|
const selectedP2PServerId = config.p2pServerList[config.p2pHostSelected] ? config.p2pHostSelected : Object.keys(config.p2pServerList)[0]
|
|
const selectedHost = config.p2pServerList[selectedP2PServerId].host + ':' + config.p2pServerList[selectedP2PServerId].p2pPort
|
|
const parseJSON = function(string){
|
|
var parsed = string
|
|
try{
|
|
parsed = JSON.parse(string)
|
|
}catch(err){
|
|
|
|
}
|
|
return parsed
|
|
}
|
|
const createQueryStringFromObject = function(obj){
|
|
var queryString = ''
|
|
var keys = Object.keys(obj)
|
|
keys.forEach(function(key){
|
|
var value = obj[key]
|
|
queryString += `&${key}=${value}`
|
|
})
|
|
return queryString
|
|
}
|
|
const doRequest = function(url,method,data,callback,onDataReceived){
|
|
var requestEndpoint = `${config.sslEnabled ? `https` : 'http'}://localhost:${config.sslEnabled ? config.ssl.port : config.port}` + url
|
|
if(method === 'GET' && data){
|
|
requestEndpoint += '?' + createQueryStringFromObject(data)
|
|
}
|
|
const theRequest = fetch(requestEndpoint,{
|
|
method: method,
|
|
headers: {
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json'
|
|
},
|
|
body: method !== 'GET' ? JSON.stringify(data ? data : null) : null
|
|
})
|
|
.then(res => {
|
|
res.body.on('data', onDataReceived);
|
|
return res
|
|
});
|
|
if(typeof callback === 'function'){
|
|
theRequest.then(res => res.json())
|
|
.then(json => {
|
|
callback(null,json);
|
|
})
|
|
}
|
|
theRequest.catch((err) => {
|
|
console.error(err);
|
|
if(typeof callback === 'function')callback(err,null);
|
|
});
|
|
return theRequest
|
|
}
|
|
const createShinobiSocketConnection = (connectionId) => {
|
|
const masterConnectionToMachine = socketIOClient(`ws://localhost:${config.port}`, {transports:['websocket']})
|
|
p2pClientConnections[connectionId || p2pClientConnectionStaticName] = masterConnectionToMachine
|
|
return masterConnectionToMachine
|
|
}
|
|
const killAllClientConnections = () => {
|
|
Object.keys(p2pClientConnections).forEach((key) => {
|
|
try{
|
|
p2pClientConnections[key].disconnect()
|
|
}catch(err){
|
|
|
|
}
|
|
setTimeout(() => {
|
|
delete(p2pClientConnections[key])
|
|
},1000)
|
|
})
|
|
}
|
|
//
|
|
function startBridge(noLog){
|
|
console.log('p2p',`Connecting to ${selectedHost}...`)
|
|
if(connectionToP2PServer && connectionToP2PServer.connected){
|
|
connectionToP2PServer.allowDisconnect = true;
|
|
connectionToP2PServer.disconnect()
|
|
}
|
|
connectionToP2PServer = socketIOClient('ws://' + selectedHost, {
|
|
transports:['websocket'],
|
|
reconnection: false
|
|
});
|
|
if(!config.p2pApiKey){
|
|
if(!noLog)s.systemLog('p2p',`Please fill 'p2pApiKey' in your conf.json.`)
|
|
}
|
|
// if(!config.p2pGroupId){
|
|
// if(!noLog)s.systemLog('p2p',`Please fill 'p2pGroupId' in your conf.json.`)
|
|
// }
|
|
connectionToP2PServer.on('connect', () => {
|
|
if(!noLog)s.systemLog('p2p',`Connected ${selectedHost}!`);
|
|
connectionToP2PServer.emit('initMachine',{
|
|
port: config.port,
|
|
apiKey: config.p2pApiKey,
|
|
groupId: config.p2pGroupId,
|
|
targetUserId: config.p2pTargetUserId,
|
|
targetGroupId: config.p2pTargetGroupId
|
|
})
|
|
})
|
|
connectionToP2PServer.on('httpClose',(requestId) => {
|
|
if(runningRequests[requestId] && runningRequests[requestId].abort){
|
|
runningRequests[requestId].abort()
|
|
delete(runningRequests[requestId])
|
|
}
|
|
})
|
|
connectionToP2PServer.on('http',(rawRequest) => {
|
|
runningRequests[rawRequest.rid] = doRequest(
|
|
rawRequest.url,
|
|
rawRequest.method,
|
|
rawRequest.data,
|
|
rawRequest.focus !== 'mp4' && rawRequest.focus !== 'flv' && rawRequest.focus !== 'mjpeg' && rawRequest.focus !== 'h264' ? function(err,json,resp){
|
|
connectionToP2PServer.emit('httpResponse',{
|
|
err: err,
|
|
json: rawRequest.bodyOnEnd ? json : null,
|
|
headers: resp.headers,
|
|
rid: rawRequest.rid
|
|
})
|
|
} : null,
|
|
(data) => {
|
|
if(!rawRequest.bodyOnEnd)connectionToP2PServer.emit('httpResponseChunk',{
|
|
data: data,
|
|
rid: rawRequest.rid
|
|
})
|
|
})
|
|
})
|
|
// const masterConnectionToMachine = createShinobiSocketConnection()
|
|
// masterConnectionToMachine.on('connect', () => {
|
|
// masterConnectionToMachine.emit('f',{
|
|
// f: 'init',
|
|
// auth: config.p2pTargetAuth,
|
|
// ke: config.p2pTargetGroupId,
|
|
// uid: config.p2pTargetUserId
|
|
// })
|
|
// })
|
|
// masterConnectionToMachine.on('f',(data) => {
|
|
// connectionToP2PServer.emit('f',data)
|
|
// })
|
|
|
|
connectionToP2PServer.on('wsInit',(rawRequest) => {
|
|
const user = rawRequest.user
|
|
const clientConnectionToMachine = createShinobiSocketConnection(rawRequest.cnid)
|
|
connectedUserWebSockets[user.auth_token] = user;
|
|
clientConnectionToMachine.on('connect', () => {
|
|
s.debugLog('init',user.auth_token)
|
|
clientConnectionToMachine.emit('f',{
|
|
f: 'init',
|
|
auth: user.auth_token,
|
|
ke: user.ke,
|
|
uid: user.uid,
|
|
ipAddress: rawRequest.ipAddress
|
|
})
|
|
});
|
|
([
|
|
'f',
|
|
]).forEach((target) => {
|
|
connectionToP2PServer.on(target,(data) => {
|
|
clientConnectionToMachine.emit(target,data)
|
|
})
|
|
clientConnectionToMachine.on(target,(data) => {
|
|
connectionToP2PServer.emit(target,{data: data, cnid: rawRequest.cnid})
|
|
})
|
|
})
|
|
});
|
|
([
|
|
'a',
|
|
'r',
|
|
'gps',
|
|
'e',
|
|
'super',
|
|
]).forEach((target) => {
|
|
connectionToP2PServer.on(target,(data) => {
|
|
var clientConnectionToMachine
|
|
if(data.f === 'init'){
|
|
clientConnectionToMachine = createShinobiSocketConnection(data.cnid)
|
|
clientConnectionToMachine.on('connect', () => {
|
|
clientConnectionToMachine.on(target,(fromData) => {
|
|
connectionToP2PServer.emit(target,{data: fromData, cnid: data.cnid})
|
|
})
|
|
clientConnectionToMachine.on('f',(fromData) => {
|
|
connectionToP2PServer.emit('f',{data: fromData, cnid: data.cnid})
|
|
})
|
|
clientConnectionToMachine.emit(target,data)
|
|
});
|
|
}else{
|
|
clientConnectionToMachine = p2pClientConnections[data.cnid]
|
|
clientConnectionToMachine.emit(target,data)
|
|
}
|
|
})
|
|
|
|
});
|
|
config.workerStreamOutHandlers.forEach((target) => {
|
|
connectionToP2PServer.on(target,(initData) => {
|
|
if(connectedUserWebSockets[initData.auth]){
|
|
const clientConnectionToMachine = createShinobiSocketConnection(initData.auth + initData.ke + initData.id)
|
|
clientConnectionToMachine.on('connect', () => {
|
|
clientConnectionToMachine.emit(target,initData)
|
|
});
|
|
clientConnectionToMachine.on('data',(data) => {
|
|
connectionToP2PServer.emit('data',{data: data, cnid: initData.cnid})
|
|
});
|
|
}else{
|
|
s.debugLog('disconnect now!')
|
|
}
|
|
})
|
|
});
|
|
connectionToP2PServer.on('wsDestroyStream',(clientKey) => {
|
|
if(p2pClientConnections[clientKey]){
|
|
p2pClientConnections[clientKey].disconnect();
|
|
}
|
|
delete(p2pClientConnections[clientKey])
|
|
});
|
|
connectionToP2PServer.on('wsDestroy',(rawRequest) => {
|
|
if(p2pClientConnections[rawRequest.cnid]){
|
|
p2pClientConnections[rawRequest.cnid].disconnect();
|
|
}
|
|
delete(p2pClientConnections[rawRequest.cnid])
|
|
});
|
|
|
|
connectionToP2PServer.on('allowDisconnect',(bool) => {
|
|
connectionToP2PServer.allowDisconnect = true;
|
|
connectionToP2PServer.disconnect()
|
|
s.debugLog('p2p','Server Forced Disconnection')
|
|
});
|
|
const onDisconnect = () => {
|
|
if(!noLog)s.systemLog('p2p','P2P Disconnected');
|
|
killAllClientConnections()
|
|
if(!connectionToP2PServer.allowDisconnect){
|
|
if(!noLog)s.systemLog('p2p','Attempting Reconnection...');
|
|
setTimeout(() => {
|
|
startBridge()
|
|
},3000)
|
|
}else{
|
|
if(!noLog)s.systemLog('p2p','Closing Process');
|
|
process.exit()
|
|
}
|
|
}
|
|
connectionToP2PServer.on('error',onDisconnect)
|
|
connectionToP2PServer.on('disconnect',onDisconnect)
|
|
}
|
|
startBridge()
|
|
setInterval(function(){
|
|
if(!connectionToP2PServer || !connectionToP2PServer.connected){
|
|
connectionToP2PServer.connect()
|
|
}
|
|
},1000 * 60 * 15)
|
|
}
|