more stable p2p v2 connection to server+

cron-as-worker-process
Moe 2022-06-22 23:55:04 -07:00
parent 7cebb5eafc
commit c4632a0564
1 changed files with 107 additions and 31 deletions

View File

@ -28,7 +28,7 @@ parentPort.on('message',(data) => {
config = Object.assign({},data.config) config = Object.assign({},data.config)
lang = Object.assign({},data.lang) lang = Object.assign({},data.lang)
remoteConnectionPort = config.ssl ? config.ssl.port || 443 : config.port || 8080 remoteConnectionPort = config.ssl ? config.ssl.port || 443 : config.port || 8080
initialize(config,data.lang) initialize()
break; break;
case'exit': case'exit':
s.debugLog('Closing P2P Connection...') s.debugLog('Closing P2P Connection...')
@ -43,6 +43,11 @@ var onClosedTimeout = null
let stayDisconnected = false let stayDisconnected = false
const requestConnections = {} const requestConnections = {}
const requestConnectionsData = {} const requestConnectionsData = {}
function getRequestConnection(requestId){
return requestConnections[requestId] || {
write: () => {}
}
}
function clearAllTimeouts(){ function clearAllTimeouts(){
clearInterval(heartbeatTimer) clearInterval(heartbeatTimer)
clearTimeout(heartBeatCheckTimout) clearTimeout(heartBeatCheckTimout)
@ -50,7 +55,7 @@ function clearAllTimeouts(){
} }
function startConnection(p2pServerAddress,subscriptionId){ function startConnection(p2pServerAddress,subscriptionId){
console.log('P2P : Connecting to Konekta P2P Server...') console.log('P2P : Connecting to Konekta P2P Server...')
let tunnelToShinobi let tunnelToP2P
stayDisconnected = false stayDisconnected = false
const allMessageHandlers = [] const allMessageHandlers = []
async function startWebsocketConnection(key,callback){ async function startWebsocketConnection(key,callback){
@ -60,28 +65,28 @@ function startConnection(p2pServerAddress,subscriptionId){
return new Promise((resolve,reject) => { return new Promise((resolve,reject) => {
try{ try{
stayDisconnected = true stayDisconnected = true
if(tunnelToShinobi)tunnelToShinobi.close() if(tunnelToP2P)tunnelToP2P.close()
}catch(err){ }catch(err){
console.log(err) console.log(err)
} }
tunnelToShinobi = new WebSocket(p2pServerAddress || 'ws://172.16.101.218:81'); tunnelToP2P = new WebSocket(p2pServerAddress);
stayDisconnected = false; stayDisconnected = false;
tunnelToShinobi.on('open', function(){ tunnelToP2P.on('open', function(){
resolve(tunnelToShinobi) resolve(tunnelToP2P)
}) })
tunnelToShinobi.on('error', (err) => { tunnelToP2P.on('error', (err) => {
console.log(`P2P tunnelToShinobi Error : `,err) console.log(`P2P tunnelToP2P Error : `,err)
console.log(`P2P Restarting...`) console.log(`P2P Restarting...`)
// disconnectedConnection() // disconnectedConnection()
}) })
tunnelToShinobi.on('close', () => { tunnelToP2P.on('close', () => {
console.log(`P2P Connection Closed!`) console.log(`P2P Connection Closed!`)
clearAllTimeouts() clearAllTimeouts()
onClosedTimeout = setTimeout(() => { onClosedTimeout = setTimeout(() => {
disconnectedConnection(); disconnectedConnection();
},5000) },5000)
}); });
tunnelToShinobi.onmessage = function(event){ tunnelToP2P.onmessage = function(event){
const data = bson.deserialize(Buffer.from(event.data)) const data = bson.deserialize(Buffer.from(event.data))
allMessageHandlers.forEach((handler) => { allMessageHandlers.forEach((handler) => {
if(data.f === handler.key){ if(data.f === handler.key){
@ -92,8 +97,8 @@ function startConnection(p2pServerAddress,subscriptionId){
clearInterval(socketCheckTimer) clearInterval(socketCheckTimer)
socketCheckTimer = setInterval(() => { socketCheckTimer = setInterval(() => {
s.debugLog('Tunnel Ready State :',tunnelToShinobi.readyState) s.debugLog('Tunnel Ready State :',tunnelToP2P.readyState)
if(tunnelToShinobi.readyState !== 1){ if(tunnelToP2P.readyState !== 1){
s.debugLog('Tunnel NOT Ready! Reconnecting...') s.debugLog('Tunnel NOT Ready! Reconnecting...')
disconnectedConnection() disconnectedConnection()
} }
@ -107,7 +112,7 @@ function startConnection(p2pServerAddress,subscriptionId){
if(stayDisconnected)return; if(stayDisconnected)return;
s.debugLog('RESTARTING!') s.debugLog('RESTARTING!')
setTimeout(() => { setTimeout(() => {
if(tunnelToShinobi && tunnelToShinobi.readyState !== 1)startWebsocketConnection() if(tunnelToP2P && tunnelToP2P.readyState !== 1)startWebsocketConnection()
},2000) },2000)
} }
s.debugLog(p2pServerAddress) s.debugLog(p2pServerAddress)
@ -123,11 +128,11 @@ function startConnection(p2pServerAddress,subscriptionId){
}) })
}, 1000 * 10) }, 1000 * 10)
setTimeout(() => { setTimeout(() => {
if(tunnelToShinobi.readyState !== 1)refreshHeartBeatCheck() if(tunnelToP2P.readyState !== 1)refreshHeartBeatCheck()
},5000) },5000)
} }
function sendDataToTunnel(data){ function sendDataToTunnel(data){
tunnelToShinobi.send( tunnelToP2P.send(
bson.serialize(data) bson.serialize(data)
) )
} }
@ -145,31 +150,43 @@ function startConnection(p2pServerAddress,subscriptionId){
rid: requestId rid: requestId
}) })
} }
function createRemoteSocket(host,port,requestId){ async function createRemoteSocket(host,port,requestId,initData){
// if(requestConnections[requestId]){ // if(requestConnections[requestId]){
// remotesocket.off('data') // remotesocket.off('data')
// remotesocket.off('drain') // remotesocket.off('drain')
// remotesocket.off('close') // remotesocket.off('close')
// requestConnections[requestId].end() // requestConnections[requestId].end()
// } // }
const responseTunnel = await getResponseTunnel(requestId)
let remotesocket = new net.Socket(); let remotesocket = new net.Socket();
remotesocket.connect(port || remoteConnectionPort, host || 'localhost'); remotesocket.on('ready',() => {
requestConnections[requestId] = remotesocket remotesocket.write(initData.buffer)
})
remotesocket.on('data', function(data) { remotesocket.on('data', function(data) {
requestConnectionsData[requestId] = data.toString() requestConnectionsData[requestId] = data.toString()
outboundMessage('data',data,requestId) responseTunnel.send('data',data)
}) })
remotesocket.on('drain', function() { remotesocket.on('drain', function() {
outboundMessage('resume',{},requestId) responseTunnel.send('resume',{})
}); });
remotesocket.on('close', function() { remotesocket.on('close', function() {
delete(requestConnectionsData[requestId]) delete(requestConnectionsData[requestId])
outboundMessage('end',{},requestId) responseTunnel.send('end',{})
setTimeout(() => {
if(
responseTunnel &&
(responseTunnel.readyState === 0 || responseTunnel.readyState === 1)
){
responseTunnel.close()
}
},5000)
}); });
remotesocket.connect(port || remoteConnectionPort, host || 'localhost');
requestConnections[requestId] = remotesocket
return remotesocket return remotesocket
} }
function writeToServer(data,requestId){ function writeToServer(data,requestId){
var flushed = requestConnections[requestId].write(data.buffer) var flushed = getRequestConnection(requestId).write(data.buffer)
if (!flushed) { if (!flushed) {
outboundMessage('pause',{},requestId) outboundMessage('pause',{},requestId)
} }
@ -182,18 +199,14 @@ function startConnection(p2pServerAddress,subscriptionId){
} }
// onIncomingMessage('connect',(data,requestId) => { // onIncomingMessage('connect',(data,requestId) => {
// console.log('New Request Incoming',requestId) // console.log('New Request Incoming',requestId)
// createRemoteSocket('172.16.101.94', 8080, requestId) // await createRemoteSocket('172.16.101.94', 8080, requestId)
// }) // })
onIncomingMessage('connect',(data,requestId) => { onIncomingMessage('connect',async (data,requestId) => {
// const hostParts = data.host.split(':') // const hostParts = data.host.split(':')
// const host = hostParts[0] // const host = hostParts[0]
// const port = parseInt(hostParts[1]) || 80 // const port = parseInt(hostParts[1]) || 80
s.debugLog('New Request Incoming', null, null, requestId) s.debugLog('New Request Incoming', null, null, requestId)
const socket = createRemoteSocket(null, null, requestId) const socket = await createRemoteSocket(null, null, requestId, data.init)
socket.on('ready',() => {
s.debugLog('READY')
writeToServer(data.init,requestId)
})
}) })
onIncomingMessage('data',writeToServer) onIncomingMessage('data',writeToServer)
onIncomingMessage('shell',function(data,requestId){ onIncomingMessage('shell',function(data,requestId){
@ -246,10 +259,73 @@ function startConnection(p2pServerAddress,subscriptionId){
stayDisconnected = data && !data.retryLater stayDisconnected = data && !data.retryLater
}) })
} }
const responseTunnels = {}
function initialize(config,lang){ async function getResponseTunnel(originalRequestId){
return responseTunnels[originalRequestId] || await createResponseTunnel(originalRequestId)
}
function createResponseTunnel(originalRequestId){
const responseTunnelMessageHandlers = []
function onMessage(key,callback){
responseTunnelMessageHandlers.push({
key: key,
callback: callback,
})
}
return new Promise((resolve,reject) => {
const responseTunnel = new WebSocket(config.selectedHost);
function sendToResponseTunnel(data){
responseTunnel.send(
bson.serialize(data)
)
}
function sendData(key,data){
sendToResponseTunnel({
f: key,
data: data,
rid: originalRequestId
})
}
responseTunnel.on('open', function(){
sendToResponseTunnel({
responseTunnel: originalRequestId,
subscriptionId: config.p2pApiKey,
})
})
responseTunnel.on('close', function(){
delete(responseTunnels[originalRequestId])
})
onMessage('ready', function(){
const finalData = {
onMessage,
send: sendData,
sendRaw: sendToResponseTunnel,
close: responseTunnel.close
}
responseTunnels[originalRequestId] = finalData;
resolve(finalData)
})
responseTunnel.onmessage = function(event){
const data = bson.deserialize(Buffer.from(event.data))
responseTunnelMessageHandlers.forEach((handler) => {
if(data.f === handler.key){
handler.callback(data.data,data.rid)
}
})
}
})
}
function closeResponseTunnel(originalRequestId){
// also should be handled server side
try{
responseTunnels[originalRequestId].close()
}catch(err){
s.debugLog('closeResponseTunnel',err)
}
}
function initialize(){
const selectedP2PServerId = config.p2pServerList[config.p2pHostSelected] ? config.p2pHostSelected : Object.keys(config.p2pServerList)[0] const selectedP2PServerId = config.p2pServerList[config.p2pHostSelected] ? config.p2pHostSelected : Object.keys(config.p2pServerList)[0]
const p2pServerDetails = config.p2pServerList[selectedP2PServerId] const p2pServerDetails = config.p2pServerList[selectedP2PServerId]
const selectedHost = 'ws://' + p2pServerDetails.host + ':' + p2pServerDetails.p2pPort const selectedHost = 'ws://' + p2pServerDetails.host + ':' + p2pServerDetails.p2pPort
config.selectedHost = selectedHost
startConnection(selectedHost,config.p2pApiKey) startConnection(selectedHost,config.p2pApiKey)
} }