6 Commits

Author SHA1 Message Date
Adolfo Reyna
ef7c24c884 feat: make main live captions stream public 2026-03-22 22:32:01 -04:00
Adolfo Reyna
f47ca67dae feat: add public (unauthenticated) live captions stream 2026-03-22 21:59:54 -04:00
Adolfo Reyna
3737edab72 Merge branch 'codex/live-captions' into master 2026-02-28 21:52:54 -05:00
Adolfo Reyna
5195317c0c Expire live caption state after inactivity and tune stream limits 2026-02-28 21:51:28 -05:00
Adolfo Reyna
8aa1f3addd Add draft caption ingest support and dev watch workflow 2026-02-28 21:30:38 -05:00
af4471b463 Merge pull request 'codex/live-captions' (#5) from codex/live-captions into master
Reviewed-on: #5
2026-02-27 04:00:57 +00:00
4 changed files with 121 additions and 15 deletions

View File

@@ -29,6 +29,7 @@ const limiter = rateLimit({
limit: 500, // Limit each IP to 100 requests per `window` (here, per 15 minutes). limit: 500, // Limit each IP to 100 requests per `window` (here, per 15 minutes).
standardHeaders: 'draft-8', // draft-6: `RateLimit-*` headers; draft-7 & draft-8: combined `RateLimit` header standardHeaders: 'draft-8', // draft-6: `RateLimit-*` headers; draft-7 & draft-8: combined `RateLimit` header
legacyHeaders: false, // Disable the `X-RateLimit-*` headers. legacyHeaders: false, // Disable the `X-RateLimit-*` headers.
skip: (req) => req.path.startsWith("/live-captions"),
keyGenerator: (req) => { keyGenerator: (req) => {
const forwarded = req.headers["x-forwarded-for"]?.split(",")[0]; // Take the first IP in the list const forwarded = req.headers["x-forwarded-for"]?.split(",")[0]; // Take the first IP in the list
const ip = forwarded || req.ip; // Fallback to req.ip const ip = forwarded || req.ip; // Fallback to req.ip

View File

@@ -6,6 +6,7 @@
"scripts": { "scripts": {
"test": "npx mocha test/auth.test.js", "test": "npx mocha test/auth.test.js",
"start": "node index.js", "start": "node index.js",
"dev": "node --watch index.js",
"live-captions:test-sender": "node scripts/liveCaptionsTestSender.js", "live-captions:test-sender": "node scripts/liveCaptionsTestSender.js",
"docker": "docker compose up -d", "docker": "docker compose up -d",
"docker_restore": "docker-compose exec mongo mongorestore --db EMI_SOCIAL /dump/EMI_SOCIAL/", "docker_restore": "docker-compose exec mongo mongorestore --db EMI_SOCIAL /dump/EMI_SOCIAL/",

View File

@@ -1,19 +1,36 @@
var express = require('express'); var express = require('express');
var router = express.Router(); var router = express.Router();
const { rateLimit } = require("express-rate-limit");
const sessionChecker = require("../middleware/sessionChecker.js"); const sessionChecker = require("../middleware/sessionChecker.js");
const MAX_BUFFER_SIZE = 300; const MAX_BUFFER_SIZE = 300;
const DEFAULT_INITIAL_LIMIT = 40; const DEFAULT_INITIAL_LIMIT = 40;
const MAX_INITIAL_LIMIT = 120; const MAX_INITIAL_LIMIT = 120;
const CAPTION_META_KEYS = new Set(["sequence", "createdAt", "original"]); const INACTIVITY_RESET_MS = 10 * 60 * 1000;
const CAPTION_META_KEYS = new Set(["sequence", "createdAt", "original", "draft", "sourceLang", "lang", "isDraft", "status", "translations"]);
const liveCaptionState = { const liveCaptionState = {
startedAt: Date.now(), startedAt: Date.now(),
lastIngestAt: 0,
latestSequence: 0, latestSequence: 0,
captions: [], captions: [],
}; };
const liveCaptionsLimiter = rateLimit({
windowMs: 10 * 60 * 1000,
limit: 6000,
standardHeaders: "draft-8",
legacyHeaders: false,
keyGenerator: (req) => {
const forwarded = req.headers["x-forwarded-for"]?.split(",")[0];
const ip = forwarded || req.ip || "";
return ip.includes(":") ? ip.split(":")[0] : ip;
},
});
router.use(liveCaptionsLimiter);
const normalizeLang = (lang = "") => { const normalizeLang = (lang = "") => {
const value = String(lang || "").trim().toLowerCase(); const value = String(lang || "").trim().toLowerCase();
if (!value) return ""; if (!value) return "";
@@ -33,8 +50,23 @@ const normalizeTranslations = (translations) => {
return normalized; return normalized;
}; };
const readText = (value) => {
if (typeof value === "string") return value.trim();
return "";
};
const extractDraftText = (body = {}) => {
const directDraft = readText(body?.draft);
if (directDraft) return directDraft;
const nestedDraft = readText(body?.draft?.text);
if (nestedDraft) return nestedDraft;
const fallbackText = readText(body?.text);
if (fallbackText) return fallbackText;
return "";
};
const buildTranslationsFromFlatPayload = (payload) => { const buildTranslationsFromFlatPayload = (payload) => {
const ignoredKeys = new Set(["original", "sourceLang", "translations"]); const ignoredKeys = new Set(["original", "draft", "sourceLang", "lang", "isDraft", "status", "translations"]);
const normalized = {}; const normalized = {};
for (const [key, value] of Object.entries(payload || {})) { for (const [key, value] of Object.entries(payload || {})) {
if (ignoredKeys.has(key)) continue; if (ignoredKeys.has(key)) continue;
@@ -67,8 +99,22 @@ const getAvailableLanguages = () => {
return Array.from(langs).filter(Boolean).sort(); return Array.from(langs).filter(Boolean).sort();
}; };
router.get("/stream", sessionChecker, async (req, res) => { const resetLiveCaptionState = () => {
liveCaptionState.startedAt = Date.now();
liveCaptionState.lastIngestAt = 0;
liveCaptionState.latestSequence = 0;
liveCaptionState.captions = [];
};
const maybeResetForInactivity = () => {
if (!liveCaptionState.lastIngestAt) return;
if ((Date.now() - liveCaptionState.lastIngestAt) < INACTIVITY_RESET_MS) return;
resetLiveCaptionState();
};
router.get("/stream", async (req, res) => {
try { try {
maybeResetForInactivity();
const sinceSequence = Number.parseInt(req.query?.sinceSequence, 10); const sinceSequence = Number.parseInt(req.query?.sinceSequence, 10);
const requestedLimit = Number.parseInt(req.query?.limit, 10); const requestedLimit = Number.parseInt(req.query?.limit, 10);
const initialLimit = Number.isFinite(requestedLimit) const initialLimit = Number.isFinite(requestedLimit)
@@ -103,16 +149,22 @@ router.get("/stream", sessionChecker, async (req, res) => {
router.post("/ingest", async (req, res) => { router.post("/ingest", async (req, res) => {
try { try {
// TODO: Add basic auth/API key validation before production roll-out. // TODO: Add basic auth/API key validation before production roll-out.
const original = typeof req.body?.original === "string" ? req.body.original.trim() : ""; const draft = extractDraftText(req.body || {});
const originalFromPayload = readText(req.body?.original);
const original = originalFromPayload || draft;
const requestedLang = normalizeLang(req.body?.lang);
const sourceLangFromRequest = normalizeLang(req.body?.sourceLang || (requestedLang && requestedLang !== "draft" ? requestedLang : ""));
const isDraft = !!draft || requestedLang === "draft" || sourceLangFromRequest === "draft" || req.body?.isDraft === true || req.body?.status === "draft";
const mapFromNested = normalizeTranslations(req.body?.translations); const mapFromNested = normalizeTranslations(req.body?.translations);
const mapFromFlat = buildTranslationsFromFlatPayload(req.body); const mapFromFlat = buildTranslationsFromFlatPayload(req.body);
const translations = { ...mapFromNested, ...mapFromFlat }; const translations = isDraft ? {} : { ...mapFromNested, ...mapFromFlat };
const sourceLang = normalizeLang(req.body?.sourceLang || inferSourceLangFromTranslations(original, translations)); const inferredSource = inferSourceLangFromTranslations(original, translations);
const sourceLang = isDraft ? "" : (sourceLangFromRequest || inferredSource);
if (!original) { if (!original) {
return res.status(400).json({ status: "Original text is required" }); return res.status(400).json({ status: "Original text is required" });
} }
if (sourceLang && sourceLang !== "original" && !translations[sourceLang]) { if (sourceLang && sourceLang !== "original" && sourceLang !== "draft" && !translations[sourceLang]) {
translations[sourceLang] = original; translations[sourceLang] = original;
} }
@@ -121,10 +173,15 @@ router.post("/ingest", async (req, res) => {
sequence, sequence,
createdAt: new Date().toISOString(), createdAt: new Date().toISOString(),
original, original,
sourceLang: sourceLang || undefined,
lang: isDraft ? "draft" : (sourceLang || undefined),
isDraft,
status: isDraft ? "draft" : "final",
...translations, ...translations,
}; };
liveCaptionState.latestSequence = sequence; liveCaptionState.latestSequence = sequence;
liveCaptionState.lastIngestAt = Date.now();
liveCaptionState.captions.push(caption); liveCaptionState.captions.push(caption);
if (liveCaptionState.captions.length > MAX_BUFFER_SIZE) { if (liveCaptionState.captions.length > MAX_BUFFER_SIZE) {
liveCaptionState.captions.splice(0, liveCaptionState.captions.length - MAX_BUFFER_SIZE); liveCaptionState.captions.splice(0, liveCaptionState.captions.length - MAX_BUFFER_SIZE);
@@ -145,9 +202,7 @@ router.post("/ingest", async (req, res) => {
router.post("/reset", async (_, res) => { router.post("/reset", async (_, res) => {
try { try {
// TODO: Add admin authorization before exposing this endpoint. // TODO: Add admin authorization before exposing this endpoint.
liveCaptionState.startedAt = Date.now(); resetLiveCaptionState();
liveCaptionState.latestSequence = 0;
liveCaptionState.captions = [];
return res.json({ status: "ok" }); return res.json({ status: "ok" });
} catch (error) { } catch (error) {
console.error("Error resetting live captions state", error); console.error("Error resetting live captions state", error);
@@ -155,4 +210,38 @@ router.post("/reset", async (_, res) => {
} }
}); });
router.get("/public/stream", async (req, res) => {
try {
maybeResetForInactivity();
const sinceSequence = Number.parseInt(req.query?.sinceSequence, 10);
const requestedLimit = Number.parseInt(req.query?.limit, 10);
const initialLimit = Number.isFinite(requestedLimit)
? Math.max(1, Math.min(requestedLimit, MAX_INITIAL_LIMIT))
: DEFAULT_INITIAL_LIMIT;
let captions = [];
if (Number.isFinite(sinceSequence) && sinceSequence >= 0) {
captions = liveCaptionState.captions.filter((item) => item.sequence > sinceSequence);
} else {
captions = liveCaptionState.captions.slice(-initialLimit);
}
return res.json({
status: "ok",
latestSequence: liveCaptionState.latestSequence,
startedAt: new Date(liveCaptionState.startedAt).toISOString(),
availableLanguages: getAvailableLanguages(),
captions,
});
} catch (error) {
console.error("Error getting public live captions stream", error);
return res.status(500).json({
status: "Internal server error",
latestSequence: liveCaptionState.latestSequence,
captions: [],
availableLanguages: [],
});
}
});
module.exports = router; module.exports = router;

View File

@@ -5,7 +5,7 @@ const axios = require("axios");
const baseUrl = (process.env.CAPTION_TEST_BASE_URL || process.env.BASE_URL || "http://localhost:3000").replace(/\/+$/, ""); const baseUrl = (process.env.CAPTION_TEST_BASE_URL || process.env.BASE_URL || "http://localhost:3000").replace(/\/+$/, "");
const ingestUrl = `${baseUrl}/live-captions/ingest`; const ingestUrl = `${baseUrl}/live-captions/ingest`;
const intervalMs = 5000; const intervalMs = 6000;
const samples = [ const samples = [
{ {
@@ -37,9 +37,8 @@ const samples = [
let sampleIndex = 0; let sampleIndex = 0;
let timer = null; let timer = null;
const sendNextSample = async () => { const postPayload = async (payload) => {
const payload = samples[sampleIndex]; const kind = payload?.draft ? "draft" : "final";
sampleIndex = (sampleIndex + 1) % samples.length;
try { try {
const response = await axios.post(ingestUrl, payload, { const response = await axios.post(ingestUrl, payload, {
@@ -47,7 +46,8 @@ const sendNextSample = async () => {
timeout: 10000, timeout: 10000,
}); });
const seq = response?.data?.caption?.sequence || response?.data?.latestSequence || "?"; const seq = response?.data?.caption?.sequence || response?.data?.latestSequence || "?";
console.log(`[live-captions:test-sender] sent sequence=${seq} original="${payload.original}"`); const text = payload?.draft || payload?.original || "";
console.log(`[live-captions:test-sender] sent ${kind} sequence=${seq} text="${text}"`);
} catch (error) { } catch (error) {
const status = error?.response?.status; const status = error?.response?.status;
const body = error?.response?.data; const body = error?.response?.data;
@@ -56,6 +56,21 @@ const sendNextSample = async () => {
} }
}; };
const sendNextSample = async () => {
const payload = samples[sampleIndex];
const draftWords = String(payload?.original || "").split(" ").filter(Boolean);
if (draftWords.length > 2) {
await postPayload({ draft: draftWords.slice(0, 2).join(" ") });
await new Promise((resolve) => setTimeout(resolve, 550));
await postPayload({ draft: draftWords.slice(0, 4).join(" ") });
await new Promise((resolve) => setTimeout(resolve, 550));
}
await postPayload(payload);
sampleIndex = (sampleIndex + 1) % samples.length;
};
const start = async () => { const start = async () => {
console.log(`[live-captions:test-sender] posting to ${ingestUrl} every ${intervalMs / 1000}s`); console.log(`[live-captions:test-sender] posting to ${ingestUrl} every ${intervalMs / 1000}s`);
await sendNextSample(); await sendNextSample();