385 lines
15 KiB
JavaScript
385 lines
15 KiB
JavaScript
"use strict";
|
||
//
|
||
// Shinobi – Plugin Base (Refactored 2025‑05‑24)
|
||
// Copyright (C) 2016‑2025 Moe Alam, moeiscool
|
||
//
|
||
// • Memory‑leak hardening
|
||
// • Clear separation of concerns
|
||
// • Re‑usable helper utilities
|
||
//
|
||
const fs = require("fs");
|
||
const { exec, spawn } = require("child_process");
|
||
const path = require("path");
|
||
const moment = require("moment");
|
||
const express = require("express");
|
||
const http = require("http");
|
||
const { Server: SocketIOServer } = require("socket.io");
|
||
const SocketIOClient = require("socket.io-client");
|
||
const CWSServer = require("cws").Server;
|
||
|
||
// ────────────────────────────────────────────────────────────────────────────────
|
||
// Globals guarded so that multiple require() calls don’t duplicate listeners
|
||
// ────────────────────────────────────────────────────────────────────────────────
|
||
if (!process.__PLUGIN_BASE_LISTENERS__) {
|
||
process.__PLUGIN_BASE_LISTENERS__ = true;
|
||
process.once("uncaughtException", (err) => console.error("uncaughtException", err));
|
||
["SIGINT", "SIGTERM"].forEach((sig) =>
|
||
process.once(sig, () => {
|
||
console.log("\nReceived ", sig, " – exiting cleanly.");
|
||
process.exit(0);
|
||
}),
|
||
);
|
||
}
|
||
|
||
// ────────────────────────────────────────────────────────────────────────────────
|
||
// Utilities
|
||
// ────────────────────────────────────────────────────────────────────────────────
|
||
const bytesToMiB = (b) => (b / (1024 * 1024)).toFixed(2);
|
||
|
||
const ensureTrailing = (str, t = "/") => (str.endsWith(t) ? str : str + t);
|
||
|
||
const gid = (len = 10) => {
|
||
const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
|
||
return Array.from({ length: len }, () => chars[(Math.random() * chars.length) | 0]).join("");
|
||
};
|
||
|
||
const safeJSON = (x) => JSON.stringify(x, null, 2);
|
||
|
||
// ────────────────────────────────────────────────────────────────────────────────
|
||
// Main exported factory
|
||
// ────────────────────────────────────────────────────────────────────────────────
|
||
module.exports = function makePluginBase(__dirname, cfg = {}) {
|
||
const config = { ...cfg }; // don’t mutate caller’s object
|
||
if (!config.plug) throw new Error("config.plug is required");
|
||
|
||
// basic defaults ----------------------------------------------------------------
|
||
Object.assign(config, {
|
||
dirname: ".",
|
||
port: 8080,
|
||
hostPort: 8082,
|
||
systemLog: true,
|
||
connectionType: "websocket",
|
||
streamDir: config.streamDir || (process.platform === "win32" ? config.windowsTempDir : "/dev/shm"),
|
||
});
|
||
config.streamDir = ensureTrailing(config.streamDir || path.join(config.dirname, "streams"));
|
||
|
||
// logging helpers ----------------------------------------------------------------
|
||
const plugLog = (...msg) => console.log(new Date(), config.plug, ...msg);
|
||
const sysLog = (...msg) => config.systemLog && console.log(moment().format(), ...msg);
|
||
const dbgLog = (...msg) => config.debugLog && console.log(new Date(), ...msg);
|
||
|
||
// leak‑detector (does nothing if the lib is missing)
|
||
try {
|
||
require("../libs/basic/leakDetector").start({
|
||
sampleInterval: 15_000,
|
||
absGrowthLimit: 10 * 1024 * 1024,
|
||
consecutiveHits: 4,
|
||
snapshotDir: "/tmp",
|
||
});
|
||
} catch (_) {}
|
||
|
||
// ensure stream directory exists --------------------------------------------------
|
||
fs.mkdirSync(config.streamDir, { recursive: true });
|
||
|
||
// in‑memory state -----------------------------------------------------------------
|
||
const s = {
|
||
group: {},
|
||
monitors: new Map(), // keyed by `${ke}${mid}`
|
||
monitorInfo: new Map(),
|
||
callbacks: new Map(), // Python RPC callbacks with timeout
|
||
ext: {
|
||
cameraInit: new Set(),
|
||
pluginEvent: new Set(),
|
||
cpuUsage: new Set(),
|
||
},
|
||
dir: { streams: config.streamDir },
|
||
isWin: process.platform === "win32",
|
||
getWebsocket: () => io,
|
||
};
|
||
|
||
// ───────────────────────────── image buffering & guard ──────────────────────────
|
||
const imageBuffers = new Map(); // Map<monitorKey,{chunks:Buffer[], ts:number}>
|
||
const MAX_IMG_AGE_MS = 5_000; // drop partial frames after 5 s
|
||
|
||
function appendImageChunk(monitorKey, chunk) {
|
||
const entry = imageBuffers.get(monitorKey) || { chunks: [], ts: Date.now() };
|
||
entry.chunks.push(chunk);
|
||
entry.ts = Date.now();
|
||
imageBuffers.set(monitorKey, entry);
|
||
|
||
// older than MAX_IMG_AGE_MS ➜ discard
|
||
for (const [k, v] of imageBuffers) {
|
||
if (Date.now() - v.ts > MAX_IMG_AGE_MS) imageBuffers.delete(k);
|
||
}
|
||
|
||
if (chunk.at(-2) === 0xff && chunk.at(-1) === 0xd9) {
|
||
const buf = Buffer.concat(entry.chunks);
|
||
imageBuffers.delete(monitorKey);
|
||
return buf;
|
||
}
|
||
return null;
|
||
}
|
||
|
||
// ───────────────────────────────── cpu / nvidia helpers ─────────────────────────
|
||
const getCpuUsage = (cb) => {
|
||
const cmd = (() => {
|
||
switch (process.platform) {
|
||
case "win32":
|
||
return "wmic cpu get loadpercentage /value";
|
||
case "darwin":
|
||
return "ps -A -o %cpu | awk '{s+=$1} END {print s}'";
|
||
case "linux":
|
||
return "top -b -n 2 | awk 'toupper($0) ~ /^.?CPU/ {gsub(\"id,\",\"100\",$8); gsub(\"%\",$8); print 100-$8}' | tail -n 1";
|
||
default:
|
||
return null;
|
||
}
|
||
})();
|
||
|
||
if (!cmd) return cb(0);
|
||
const child = spawn(cmd, { shell: true });
|
||
let out = "";
|
||
child.stdout.on("data", (d) => (out += d.toString()));
|
||
child.on("close", () => cb(parseFloat(out.trim())));
|
||
};
|
||
|
||
// ───────────────────────────────── event dispatch helpers ────────────────────────
|
||
function runExtenders(set, ...args) {
|
||
for (const fn of set) {
|
||
try {
|
||
fn(...args);
|
||
} catch (e) {
|
||
console.error("extender error", e);
|
||
}
|
||
}
|
||
}
|
||
|
||
// ───────────────────────────────── processImage wrapper ─────────────────────────
|
||
let overallProcessingCount = 0;
|
||
async function processImage(buffer, d, tx, frameLocation) {
|
||
const socket = s.getWebsocket();
|
||
++overallProcessingCount;
|
||
socket?.emit("processCount", overallProcessingCount);
|
||
|
||
// async detectObject ➜ user‑provided
|
||
s.detectObject?.(buffer, d, tx, frameLocation, () => {
|
||
--overallProcessingCount;
|
||
socket?.emit("processCount", overallProcessingCount);
|
||
});
|
||
}
|
||
|
||
// placeholder overridable
|
||
s.detectObject = (_buf, _d, _tx, _frameLoc, done) => {
|
||
console.warn("detectObject handler not set");
|
||
done?.();
|
||
};
|
||
|
||
// ─────────────────────────────────── socket.io setup ────────────────────────────
|
||
const app = express();
|
||
const server = http.createServer(app);
|
||
let io; // will hold either server or client instance
|
||
|
||
// reusable cleanup of socket listeners -------------------------------------------
|
||
function cleanupSocket(sock) {
|
||
if (!sock) return;
|
||
sock.removeAllListeners();
|
||
sock.disconnect?.();
|
||
}
|
||
|
||
// server / host mode --------------------------------------------------------------
|
||
function startHost() {
|
||
const sio = new SocketIOServer(server, { transports: ["websocket"] });
|
||
sio.engine.ws = new CWSServer({ noServer: true, perMessageDeflate: false });
|
||
|
||
s.connectedClients = new Map();
|
||
|
||
sio.on("connection", (cn) => {
|
||
plugLog("Client connected", cn.id);
|
||
const tx = (data) => cn.emit("ocv", { ...data, pluginKey: config.key, plug: config.plug });
|
||
s.connectedClients.set(cn.id, { cn, tx });
|
||
|
||
cn.on("f", (d) => mainEventController(d, cn, tx));
|
||
cn.once("disconnect", () => {
|
||
plugLog("Client disconnected", cn.id);
|
||
s.connectedClients.delete(cn.id);
|
||
cleanupSocket(cn);
|
||
});
|
||
});
|
||
|
||
s.disconnectWebSocket = () => {
|
||
for (const { cn } of s.connectedClients.values()) cleanupSocket(cn);
|
||
s.connectedClients.clear();
|
||
};
|
||
|
||
io = sio;
|
||
}
|
||
|
||
// client mode --------------------------------------------------------------------
|
||
function startClient() {
|
||
let retry = 0;
|
||
const MAX_RETRY = parseInt(config.maxRetryConnection, 10) || 5;
|
||
const url = `ws://${config.host || "localhost"}:${config.port}`;
|
||
|
||
function connect() {
|
||
const sock = SocketIOClient(url, { transports: ["websocket"] });
|
||
io = sock;
|
||
|
||
const reconnect = () => {
|
||
if (++retry > MAX_RETRY){
|
||
s.disconnectWebSocket()
|
||
return plugLog("Max retries reached");
|
||
}
|
||
setTimeout(connect, 3_000);
|
||
};
|
||
|
||
sock.once("connect", () => {
|
||
retry = 0;
|
||
plugLog("Connected to host");
|
||
sock.emit("ocv", { f: "init", plug: config.plug, notice: config.notice, type: config.type, connectionType: config.connectionType, pluginKey: config.key });
|
||
});
|
||
|
||
["disconnect", "connect_error", "error"].forEach((ev) => sock.on(ev, reconnect));
|
||
|
||
sock.on("f", (d) => mainEventController(d, null, (data) => sock.emit("ocv", { ...data, pluginKey: config.key, plug: config.plug })));
|
||
|
||
s.disconnectWebSocket = () => cleanupSocket(sock);
|
||
}
|
||
|
||
connect();
|
||
}
|
||
|
||
// choose host vs client -----------------------------------------------------------
|
||
if (config.mode === "host") startHost();
|
||
else startClient();
|
||
|
||
// start http server with retry ----------------------------------------------------
|
||
(function listen(tries = 0) {
|
||
server.once("error", (err) => {
|
||
if (err.code === "EADDRINUSE" && tries < 5) {
|
||
const old = config.hostPort;
|
||
config.hostPort = parseInt(config.hostPort, 10) + 1;
|
||
plugLog(`Port ${old} busy – trying ${config.hostPort}`);
|
||
return listen(tries + 1);
|
||
}
|
||
throw err;
|
||
});
|
||
|
||
server.listen(config.hostPort, () => plugLog(`HTTP server on ${config.hostPort}`));
|
||
})();
|
||
|
||
// ─────────────────────────── main event controller ──────────────────────────────
|
||
function mainEventController(d, cn, tx) {
|
||
switch (d.f) {
|
||
case "init_plugin_as_host": {
|
||
if (!cn) return plugLog("No CN for init_plugin_as_host", d);
|
||
if (d.key !== config.key) {
|
||
plugLog("Plugin key mismatch from", cn.request?.connection?.remoteAddress);
|
||
cn.emit("init", { ok: false });
|
||
cn.disconnect();
|
||
return;
|
||
}
|
||
cn.emit("init", { ok: true, plug: config.plug, notice: config.notice, type: config.type });
|
||
break;
|
||
}
|
||
|
||
case "monitorUpdate": {
|
||
const m = d.monitorConfig;
|
||
const mKey = `${m.ke}${m.mid}`;
|
||
s.monitors.set(mKey, { ...m });
|
||
|
||
const det = m.details;
|
||
const separate = det.detector_use_detect_object === "1";
|
||
const width = parseFloat(separate && det.detector_scale_x_object ? det.detector_scale_x_object : det.detector_scale_x);
|
||
const height = parseFloat(separate && det.detector_scale_y_object ? det.detector_scale_y_object : det.detector_scale_y);
|
||
s.monitorInfo.set(mKey, { separate, width, height });
|
||
|
||
imageBuffers.delete(mKey);
|
||
runExtenders(s.ext.cameraInit, m, cn, tx);
|
||
break;
|
||
}
|
||
|
||
case "frame": {
|
||
const mKey = `${d.id}${d.ke}`;
|
||
const buf = appendImageChunk(mKey, d.frame);
|
||
if (buf) processImage(buf, d, tx);
|
||
break;
|
||
}
|
||
|
||
case "frameFromRam": {
|
||
processImage(d.buffer, d, tx, d.frameLocation);
|
||
break;
|
||
}
|
||
}
|
||
|
||
runExtenders(s.ext.pluginEvent, d, cn, tx);
|
||
}
|
||
|
||
// expose API to caller -----------------------------------------------------------
|
||
s.onCameraInit = (fn) => s.ext.cameraInit.add(fn);
|
||
s.onPluginEventExtender = (fn) => s.ext.pluginEvent.add(fn);
|
||
s.onGetCpuUsageExtensions = new Set();
|
||
|
||
// cpu usage polling for cluster mode --------------------------------------------
|
||
if (config.clusterMode) {
|
||
setInterval(() => {
|
||
if (config.clusterBasedOnGpu) return; // left as exercise to parse nvidia‑smi
|
||
getCpuUsage((pct) => io?.emit("cpuUsage", pct));
|
||
}, 10_000);
|
||
}
|
||
|
||
// python integration (simplified) -----------------------------------------------
|
||
if (config.createPythonBridge) {
|
||
startPythonBridge();
|
||
}
|
||
|
||
function startPythonBridge() {
|
||
const pyPort = config.pythonPort || 7990;
|
||
const script = config.pythonScript || path.join(config.dirname, "pumpkin.py");
|
||
|
||
let pyProc;
|
||
const launch = () => {
|
||
pyProc = spawn("python3", [script, pyPort], { env: { ...process.env, PYTHONUNBUFFERED: 1 } });
|
||
pyProc.stdout.on("data", (d) => dbgLog("PY", d.toString()));
|
||
pyProc.stderr.on("data", (d) => console.error("PYERR", d.toString()));
|
||
pyProc.once("close", () => setTimeout(launch, 3_000));
|
||
};
|
||
launch();
|
||
|
||
// RPC bridge
|
||
s.createCameraBridgeToPython = (uniqueId) => {
|
||
const sock = SocketIOClient(`ws://localhost:${pyPort}`, { transports: ["websocket"] });
|
||
const pending = new Map();
|
||
|
||
function sendToPython(data, cb, timeout = 5_000) {
|
||
const id = data.id || gid();
|
||
data.id = id;
|
||
pending.set(id, cb);
|
||
sock.emit("f", data);
|
||
setTimeout(() => pending.delete(id), timeout); // auto‑cleanup
|
||
}
|
||
|
||
sock.on("f", (d) => {
|
||
pending.get(d.id)?.(d.data);
|
||
pending.delete(d.id);
|
||
});
|
||
|
||
sock.on("disconnect", () => setTimeout(() => sock.connect(), 3_000));
|
||
|
||
return {
|
||
refreshTracker: (trackerId) => sock.emit("refreshTracker", { trackerId }),
|
||
sendToPython,
|
||
destroy: () => cleanupSocket(sock),
|
||
};
|
||
};
|
||
}
|
||
|
||
// helper to get scaled dims ------------------------------------------------------
|
||
s.getImageDimensions = (d) => {
|
||
const det = d.mon;
|
||
const height = det.detector_scale_y_object || det.detector_scale_y;
|
||
const width = det.detector_scale_x_object || det.detector_scale_x;
|
||
return { height: parseFloat(height), width: parseFloat(width) };
|
||
};
|
||
|
||
return s;
|
||
};
|