diff --git a/index.js b/index.js index a9c4741..9a072c7 100644 --- a/index.js +++ b/index.js @@ -29,6 +29,7 @@ const limiter = rateLimit({ limit: 500, // Limit each IP to 100 requests per `window` (here, per 15 minutes). standardHeaders: 'draft-8', // draft-6: `RateLimit-*` headers; draft-7 & draft-8: combined `RateLimit` header legacyHeaders: false, // Disable the `X-RateLimit-*` headers. + skip: (req) => req.path.startsWith("/live-captions"), keyGenerator: (req) => { const forwarded = req.headers["x-forwarded-for"]?.split(",")[0]; // Take the first IP in the list const ip = forwarded || req.ip; // Fallback to req.ip diff --git a/package.json b/package.json index 0487d8d..9d69aae 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "scripts": { "test": "npx mocha test/auth.test.js", "start": "node index.js", + "dev": "node --watch index.js", "live-captions:test-sender": "node scripts/liveCaptionsTestSender.js", "docker": "docker compose up -d", "docker_restore": "docker-compose exec mongo mongorestore --db EMI_SOCIAL /dump/EMI_SOCIAL/", diff --git a/routes/liveCaptions.js b/routes/liveCaptions.js index 04490d6..caa01ad 100644 --- a/routes/liveCaptions.js +++ b/routes/liveCaptions.js @@ -1,19 +1,36 @@ var express = require('express'); var router = express.Router(); +const { rateLimit } = require("express-rate-limit"); const sessionChecker = require("../middleware/sessionChecker.js"); const MAX_BUFFER_SIZE = 300; const DEFAULT_INITIAL_LIMIT = 40; const MAX_INITIAL_LIMIT = 120; -const CAPTION_META_KEYS = new Set(["sequence", "createdAt", "original"]); +const INACTIVITY_RESET_MS = 10 * 60 * 1000; +const CAPTION_META_KEYS = new Set(["sequence", "createdAt", "original", "draft", "sourceLang", "lang", "isDraft", "status", "translations"]); const liveCaptionState = { startedAt: Date.now(), + lastIngestAt: 0, latestSequence: 0, captions: [], }; +const liveCaptionsLimiter = rateLimit({ + windowMs: 10 * 60 * 1000, + limit: 6000, + standardHeaders: "draft-8", + legacyHeaders: false, + keyGenerator: (req) => { + const forwarded = req.headers["x-forwarded-for"]?.split(",")[0]; + const ip = forwarded || req.ip || ""; + return ip.includes(":") ? ip.split(":")[0] : ip; + }, +}); + +router.use(liveCaptionsLimiter); + const normalizeLang = (lang = "") => { const value = String(lang || "").trim().toLowerCase(); if (!value) return ""; @@ -33,8 +50,23 @@ const normalizeTranslations = (translations) => { return normalized; }; +const readText = (value) => { + if (typeof value === "string") return value.trim(); + return ""; +}; + +const extractDraftText = (body = {}) => { + const directDraft = readText(body?.draft); + if (directDraft) return directDraft; + const nestedDraft = readText(body?.draft?.text); + if (nestedDraft) return nestedDraft; + const fallbackText = readText(body?.text); + if (fallbackText) return fallbackText; + return ""; +}; + const buildTranslationsFromFlatPayload = (payload) => { - const ignoredKeys = new Set(["original", "sourceLang", "translations"]); + const ignoredKeys = new Set(["original", "draft", "sourceLang", "lang", "isDraft", "status", "translations"]); const normalized = {}; for (const [key, value] of Object.entries(payload || {})) { if (ignoredKeys.has(key)) continue; @@ -67,8 +99,22 @@ const getAvailableLanguages = () => { return Array.from(langs).filter(Boolean).sort(); }; +const resetLiveCaptionState = () => { + liveCaptionState.startedAt = Date.now(); + liveCaptionState.lastIngestAt = 0; + liveCaptionState.latestSequence = 0; + liveCaptionState.captions = []; +}; + +const maybeResetForInactivity = () => { + if (!liveCaptionState.lastIngestAt) return; + if ((Date.now() - liveCaptionState.lastIngestAt) < INACTIVITY_RESET_MS) return; + resetLiveCaptionState(); +}; + router.get("/stream", sessionChecker, async (req, res) => { try { + maybeResetForInactivity(); const sinceSequence = Number.parseInt(req.query?.sinceSequence, 10); const requestedLimit = Number.parseInt(req.query?.limit, 10); const initialLimit = Number.isFinite(requestedLimit) @@ -103,16 +149,22 @@ router.get("/stream", sessionChecker, async (req, res) => { router.post("/ingest", async (req, res) => { try { // TODO: Add basic auth/API key validation before production roll-out. - const original = typeof req.body?.original === "string" ? req.body.original.trim() : ""; + const draft = extractDraftText(req.body || {}); + const originalFromPayload = readText(req.body?.original); + const original = originalFromPayload || draft; + const requestedLang = normalizeLang(req.body?.lang); + const sourceLangFromRequest = normalizeLang(req.body?.sourceLang || (requestedLang && requestedLang !== "draft" ? requestedLang : "")); + const isDraft = !!draft || requestedLang === "draft" || sourceLangFromRequest === "draft" || req.body?.isDraft === true || req.body?.status === "draft"; const mapFromNested = normalizeTranslations(req.body?.translations); const mapFromFlat = buildTranslationsFromFlatPayload(req.body); - const translations = { ...mapFromNested, ...mapFromFlat }; - const sourceLang = normalizeLang(req.body?.sourceLang || inferSourceLangFromTranslations(original, translations)); + const translations = isDraft ? {} : { ...mapFromNested, ...mapFromFlat }; + const inferredSource = inferSourceLangFromTranslations(original, translations); + const sourceLang = isDraft ? "" : (sourceLangFromRequest || inferredSource); if (!original) { return res.status(400).json({ status: "Original text is required" }); } - if (sourceLang && sourceLang !== "original" && !translations[sourceLang]) { + if (sourceLang && sourceLang !== "original" && sourceLang !== "draft" && !translations[sourceLang]) { translations[sourceLang] = original; } @@ -121,10 +173,15 @@ router.post("/ingest", async (req, res) => { sequence, createdAt: new Date().toISOString(), original, + sourceLang: sourceLang || undefined, + lang: isDraft ? "draft" : (sourceLang || undefined), + isDraft, + status: isDraft ? "draft" : "final", ...translations, }; liveCaptionState.latestSequence = sequence; + liveCaptionState.lastIngestAt = Date.now(); liveCaptionState.captions.push(caption); if (liveCaptionState.captions.length > MAX_BUFFER_SIZE) { liveCaptionState.captions.splice(0, liveCaptionState.captions.length - MAX_BUFFER_SIZE); @@ -145,9 +202,7 @@ router.post("/ingest", async (req, res) => { router.post("/reset", async (_, res) => { try { // TODO: Add admin authorization before exposing this endpoint. - liveCaptionState.startedAt = Date.now(); - liveCaptionState.latestSequence = 0; - liveCaptionState.captions = []; + resetLiveCaptionState(); return res.json({ status: "ok" }); } catch (error) { console.error("Error resetting live captions state", error); diff --git a/scripts/liveCaptionsTestSender.js b/scripts/liveCaptionsTestSender.js index 80c7633..8957884 100644 --- a/scripts/liveCaptionsTestSender.js +++ b/scripts/liveCaptionsTestSender.js @@ -5,7 +5,7 @@ const axios = require("axios"); const baseUrl = (process.env.CAPTION_TEST_BASE_URL || process.env.BASE_URL || "http://localhost:3000").replace(/\/+$/, ""); const ingestUrl = `${baseUrl}/live-captions/ingest`; -const intervalMs = 5000; +const intervalMs = 6000; const samples = [ { @@ -37,9 +37,8 @@ const samples = [ let sampleIndex = 0; let timer = null; -const sendNextSample = async () => { - const payload = samples[sampleIndex]; - sampleIndex = (sampleIndex + 1) % samples.length; +const postPayload = async (payload) => { + const kind = payload?.draft ? "draft" : "final"; try { const response = await axios.post(ingestUrl, payload, { @@ -47,7 +46,8 @@ const sendNextSample = async () => { timeout: 10000, }); const seq = response?.data?.caption?.sequence || response?.data?.latestSequence || "?"; - console.log(`[live-captions:test-sender] sent sequence=${seq} original="${payload.original}"`); + const text = payload?.draft || payload?.original || ""; + console.log(`[live-captions:test-sender] sent ${kind} sequence=${seq} text="${text}"`); } catch (error) { const status = error?.response?.status; const body = error?.response?.data; @@ -56,6 +56,21 @@ const sendNextSample = async () => { } }; +const sendNextSample = async () => { + const payload = samples[sampleIndex]; + + const draftWords = String(payload?.original || "").split(" ").filter(Boolean); + if (draftWords.length > 2) { + await postPayload({ draft: draftWords.slice(0, 2).join(" ") }); + await new Promise((resolve) => setTimeout(resolve, 550)); + await postPayload({ draft: draftWords.slice(0, 4).join(" ") }); + await new Promise((resolve) => setTimeout(resolve, 550)); + } + + await postPayload(payload); + sampleIndex = (sampleIndex + 1) % samples.length; +}; + const start = async () => { console.log(`[live-captions:test-sender] posting to ${ingestUrl} every ${intervalMs / 1000}s`); await sendNextSample();