Merge branch 'codex/live-captions' into master
This commit is contained in:
1
index.js
1
index.js
@@ -29,6 +29,7 @@ const limiter = rateLimit({
|
|||||||
limit: 500, // Limit each IP to 100 requests per `window` (here, per 15 minutes).
|
limit: 500, // Limit each IP to 100 requests per `window` (here, per 15 minutes).
|
||||||
standardHeaders: 'draft-8', // draft-6: `RateLimit-*` headers; draft-7 & draft-8: combined `RateLimit` header
|
standardHeaders: 'draft-8', // draft-6: `RateLimit-*` headers; draft-7 & draft-8: combined `RateLimit` header
|
||||||
legacyHeaders: false, // Disable the `X-RateLimit-*` headers.
|
legacyHeaders: false, // Disable the `X-RateLimit-*` headers.
|
||||||
|
skip: (req) => req.path.startsWith("/live-captions"),
|
||||||
keyGenerator: (req) => {
|
keyGenerator: (req) => {
|
||||||
const forwarded = req.headers["x-forwarded-for"]?.split(",")[0]; // Take the first IP in the list
|
const forwarded = req.headers["x-forwarded-for"]?.split(",")[0]; // Take the first IP in the list
|
||||||
const ip = forwarded || req.ip; // Fallback to req.ip
|
const ip = forwarded || req.ip; // Fallback to req.ip
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "npx mocha test/auth.test.js",
|
"test": "npx mocha test/auth.test.js",
|
||||||
"start": "node index.js",
|
"start": "node index.js",
|
||||||
|
"dev": "node --watch index.js",
|
||||||
"live-captions:test-sender": "node scripts/liveCaptionsTestSender.js",
|
"live-captions:test-sender": "node scripts/liveCaptionsTestSender.js",
|
||||||
"docker": "docker compose up -d",
|
"docker": "docker compose up -d",
|
||||||
"docker_restore": "docker-compose exec mongo mongorestore --db EMI_SOCIAL /dump/EMI_SOCIAL/",
|
"docker_restore": "docker-compose exec mongo mongorestore --db EMI_SOCIAL /dump/EMI_SOCIAL/",
|
||||||
|
|||||||
@@ -1,19 +1,36 @@
|
|||||||
var express = require('express');
|
var express = require('express');
|
||||||
var router = express.Router();
|
var router = express.Router();
|
||||||
|
const { rateLimit } = require("express-rate-limit");
|
||||||
|
|
||||||
const sessionChecker = require("../middleware/sessionChecker.js");
|
const sessionChecker = require("../middleware/sessionChecker.js");
|
||||||
|
|
||||||
const MAX_BUFFER_SIZE = 300;
|
const MAX_BUFFER_SIZE = 300;
|
||||||
const DEFAULT_INITIAL_LIMIT = 40;
|
const DEFAULT_INITIAL_LIMIT = 40;
|
||||||
const MAX_INITIAL_LIMIT = 120;
|
const MAX_INITIAL_LIMIT = 120;
|
||||||
const CAPTION_META_KEYS = new Set(["sequence", "createdAt", "original"]);
|
const INACTIVITY_RESET_MS = 10 * 60 * 1000;
|
||||||
|
const CAPTION_META_KEYS = new Set(["sequence", "createdAt", "original", "draft", "sourceLang", "lang", "isDraft", "status", "translations"]);
|
||||||
|
|
||||||
const liveCaptionState = {
|
const liveCaptionState = {
|
||||||
startedAt: Date.now(),
|
startedAt: Date.now(),
|
||||||
|
lastIngestAt: 0,
|
||||||
latestSequence: 0,
|
latestSequence: 0,
|
||||||
captions: [],
|
captions: [],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const liveCaptionsLimiter = rateLimit({
|
||||||
|
windowMs: 10 * 60 * 1000,
|
||||||
|
limit: 6000,
|
||||||
|
standardHeaders: "draft-8",
|
||||||
|
legacyHeaders: false,
|
||||||
|
keyGenerator: (req) => {
|
||||||
|
const forwarded = req.headers["x-forwarded-for"]?.split(",")[0];
|
||||||
|
const ip = forwarded || req.ip || "";
|
||||||
|
return ip.includes(":") ? ip.split(":")[0] : ip;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
router.use(liveCaptionsLimiter);
|
||||||
|
|
||||||
const normalizeLang = (lang = "") => {
|
const normalizeLang = (lang = "") => {
|
||||||
const value = String(lang || "").trim().toLowerCase();
|
const value = String(lang || "").trim().toLowerCase();
|
||||||
if (!value) return "";
|
if (!value) return "";
|
||||||
@@ -33,8 +50,23 @@ const normalizeTranslations = (translations) => {
|
|||||||
return normalized;
|
return normalized;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const readText = (value) => {
|
||||||
|
if (typeof value === "string") return value.trim();
|
||||||
|
return "";
|
||||||
|
};
|
||||||
|
|
||||||
|
const extractDraftText = (body = {}) => {
|
||||||
|
const directDraft = readText(body?.draft);
|
||||||
|
if (directDraft) return directDraft;
|
||||||
|
const nestedDraft = readText(body?.draft?.text);
|
||||||
|
if (nestedDraft) return nestedDraft;
|
||||||
|
const fallbackText = readText(body?.text);
|
||||||
|
if (fallbackText) return fallbackText;
|
||||||
|
return "";
|
||||||
|
};
|
||||||
|
|
||||||
const buildTranslationsFromFlatPayload = (payload) => {
|
const buildTranslationsFromFlatPayload = (payload) => {
|
||||||
const ignoredKeys = new Set(["original", "sourceLang", "translations"]);
|
const ignoredKeys = new Set(["original", "draft", "sourceLang", "lang", "isDraft", "status", "translations"]);
|
||||||
const normalized = {};
|
const normalized = {};
|
||||||
for (const [key, value] of Object.entries(payload || {})) {
|
for (const [key, value] of Object.entries(payload || {})) {
|
||||||
if (ignoredKeys.has(key)) continue;
|
if (ignoredKeys.has(key)) continue;
|
||||||
@@ -67,8 +99,22 @@ const getAvailableLanguages = () => {
|
|||||||
return Array.from(langs).filter(Boolean).sort();
|
return Array.from(langs).filter(Boolean).sort();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const resetLiveCaptionState = () => {
|
||||||
|
liveCaptionState.startedAt = Date.now();
|
||||||
|
liveCaptionState.lastIngestAt = 0;
|
||||||
|
liveCaptionState.latestSequence = 0;
|
||||||
|
liveCaptionState.captions = [];
|
||||||
|
};
|
||||||
|
|
||||||
|
const maybeResetForInactivity = () => {
|
||||||
|
if (!liveCaptionState.lastIngestAt) return;
|
||||||
|
if ((Date.now() - liveCaptionState.lastIngestAt) < INACTIVITY_RESET_MS) return;
|
||||||
|
resetLiveCaptionState();
|
||||||
|
};
|
||||||
|
|
||||||
router.get("/stream", sessionChecker, async (req, res) => {
|
router.get("/stream", sessionChecker, async (req, res) => {
|
||||||
try {
|
try {
|
||||||
|
maybeResetForInactivity();
|
||||||
const sinceSequence = Number.parseInt(req.query?.sinceSequence, 10);
|
const sinceSequence = Number.parseInt(req.query?.sinceSequence, 10);
|
||||||
const requestedLimit = Number.parseInt(req.query?.limit, 10);
|
const requestedLimit = Number.parseInt(req.query?.limit, 10);
|
||||||
const initialLimit = Number.isFinite(requestedLimit)
|
const initialLimit = Number.isFinite(requestedLimit)
|
||||||
@@ -103,16 +149,22 @@ router.get("/stream", sessionChecker, async (req, res) => {
|
|||||||
router.post("/ingest", async (req, res) => {
|
router.post("/ingest", async (req, res) => {
|
||||||
try {
|
try {
|
||||||
// TODO: Add basic auth/API key validation before production roll-out.
|
// TODO: Add basic auth/API key validation before production roll-out.
|
||||||
const original = typeof req.body?.original === "string" ? req.body.original.trim() : "";
|
const draft = extractDraftText(req.body || {});
|
||||||
|
const originalFromPayload = readText(req.body?.original);
|
||||||
|
const original = originalFromPayload || draft;
|
||||||
|
const requestedLang = normalizeLang(req.body?.lang);
|
||||||
|
const sourceLangFromRequest = normalizeLang(req.body?.sourceLang || (requestedLang && requestedLang !== "draft" ? requestedLang : ""));
|
||||||
|
const isDraft = !!draft || requestedLang === "draft" || sourceLangFromRequest === "draft" || req.body?.isDraft === true || req.body?.status === "draft";
|
||||||
const mapFromNested = normalizeTranslations(req.body?.translations);
|
const mapFromNested = normalizeTranslations(req.body?.translations);
|
||||||
const mapFromFlat = buildTranslationsFromFlatPayload(req.body);
|
const mapFromFlat = buildTranslationsFromFlatPayload(req.body);
|
||||||
const translations = { ...mapFromNested, ...mapFromFlat };
|
const translations = isDraft ? {} : { ...mapFromNested, ...mapFromFlat };
|
||||||
const sourceLang = normalizeLang(req.body?.sourceLang || inferSourceLangFromTranslations(original, translations));
|
const inferredSource = inferSourceLangFromTranslations(original, translations);
|
||||||
|
const sourceLang = isDraft ? "" : (sourceLangFromRequest || inferredSource);
|
||||||
|
|
||||||
if (!original) {
|
if (!original) {
|
||||||
return res.status(400).json({ status: "Original text is required" });
|
return res.status(400).json({ status: "Original text is required" });
|
||||||
}
|
}
|
||||||
if (sourceLang && sourceLang !== "original" && !translations[sourceLang]) {
|
if (sourceLang && sourceLang !== "original" && sourceLang !== "draft" && !translations[sourceLang]) {
|
||||||
translations[sourceLang] = original;
|
translations[sourceLang] = original;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,10 +173,15 @@ router.post("/ingest", async (req, res) => {
|
|||||||
sequence,
|
sequence,
|
||||||
createdAt: new Date().toISOString(),
|
createdAt: new Date().toISOString(),
|
||||||
original,
|
original,
|
||||||
|
sourceLang: sourceLang || undefined,
|
||||||
|
lang: isDraft ? "draft" : (sourceLang || undefined),
|
||||||
|
isDraft,
|
||||||
|
status: isDraft ? "draft" : "final",
|
||||||
...translations,
|
...translations,
|
||||||
};
|
};
|
||||||
|
|
||||||
liveCaptionState.latestSequence = sequence;
|
liveCaptionState.latestSequence = sequence;
|
||||||
|
liveCaptionState.lastIngestAt = Date.now();
|
||||||
liveCaptionState.captions.push(caption);
|
liveCaptionState.captions.push(caption);
|
||||||
if (liveCaptionState.captions.length > MAX_BUFFER_SIZE) {
|
if (liveCaptionState.captions.length > MAX_BUFFER_SIZE) {
|
||||||
liveCaptionState.captions.splice(0, liveCaptionState.captions.length - MAX_BUFFER_SIZE);
|
liveCaptionState.captions.splice(0, liveCaptionState.captions.length - MAX_BUFFER_SIZE);
|
||||||
@@ -145,9 +202,7 @@ router.post("/ingest", async (req, res) => {
|
|||||||
router.post("/reset", async (_, res) => {
|
router.post("/reset", async (_, res) => {
|
||||||
try {
|
try {
|
||||||
// TODO: Add admin authorization before exposing this endpoint.
|
// TODO: Add admin authorization before exposing this endpoint.
|
||||||
liveCaptionState.startedAt = Date.now();
|
resetLiveCaptionState();
|
||||||
liveCaptionState.latestSequence = 0;
|
|
||||||
liveCaptionState.captions = [];
|
|
||||||
return res.json({ status: "ok" });
|
return res.json({ status: "ok" });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error resetting live captions state", error);
|
console.error("Error resetting live captions state", error);
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ const axios = require("axios");
|
|||||||
|
|
||||||
const baseUrl = (process.env.CAPTION_TEST_BASE_URL || process.env.BASE_URL || "http://localhost:3000").replace(/\/+$/, "");
|
const baseUrl = (process.env.CAPTION_TEST_BASE_URL || process.env.BASE_URL || "http://localhost:3000").replace(/\/+$/, "");
|
||||||
const ingestUrl = `${baseUrl}/live-captions/ingest`;
|
const ingestUrl = `${baseUrl}/live-captions/ingest`;
|
||||||
const intervalMs = 5000;
|
const intervalMs = 6000;
|
||||||
|
|
||||||
const samples = [
|
const samples = [
|
||||||
{
|
{
|
||||||
@@ -37,9 +37,8 @@ const samples = [
|
|||||||
let sampleIndex = 0;
|
let sampleIndex = 0;
|
||||||
let timer = null;
|
let timer = null;
|
||||||
|
|
||||||
const sendNextSample = async () => {
|
const postPayload = async (payload) => {
|
||||||
const payload = samples[sampleIndex];
|
const kind = payload?.draft ? "draft" : "final";
|
||||||
sampleIndex = (sampleIndex + 1) % samples.length;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const response = await axios.post(ingestUrl, payload, {
|
const response = await axios.post(ingestUrl, payload, {
|
||||||
@@ -47,7 +46,8 @@ const sendNextSample = async () => {
|
|||||||
timeout: 10000,
|
timeout: 10000,
|
||||||
});
|
});
|
||||||
const seq = response?.data?.caption?.sequence || response?.data?.latestSequence || "?";
|
const seq = response?.data?.caption?.sequence || response?.data?.latestSequence || "?";
|
||||||
console.log(`[live-captions:test-sender] sent sequence=${seq} original="${payload.original}"`);
|
const text = payload?.draft || payload?.original || "";
|
||||||
|
console.log(`[live-captions:test-sender] sent ${kind} sequence=${seq} text="${text}"`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const status = error?.response?.status;
|
const status = error?.response?.status;
|
||||||
const body = error?.response?.data;
|
const body = error?.response?.data;
|
||||||
@@ -56,6 +56,21 @@ const sendNextSample = async () => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const sendNextSample = async () => {
|
||||||
|
const payload = samples[sampleIndex];
|
||||||
|
|
||||||
|
const draftWords = String(payload?.original || "").split(" ").filter(Boolean);
|
||||||
|
if (draftWords.length > 2) {
|
||||||
|
await postPayload({ draft: draftWords.slice(0, 2).join(" ") });
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 550));
|
||||||
|
await postPayload({ draft: draftWords.slice(0, 4).join(" ") });
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 550));
|
||||||
|
}
|
||||||
|
|
||||||
|
await postPayload(payload);
|
||||||
|
sampleIndex = (sampleIndex + 1) % samples.length;
|
||||||
|
};
|
||||||
|
|
||||||
const start = async () => {
|
const start = async () => {
|
||||||
console.log(`[live-captions:test-sender] posting to ${ingestUrl} every ${intervalMs / 1000}s`);
|
console.log(`[live-captions:test-sender] posting to ${ingestUrl} every ${intervalMs / 1000}s`);
|
||||||
await sendNextSample();
|
await sendNextSample();
|
||||||
|
|||||||
Reference in New Issue
Block a user