expria-backend/src/lib/geminiLive.ts
Hermann_Kitio 5263372839
Some checks are pending
CI / quality (push) Waiting to run
feat(infra): route Gemini WS through SOCKS5 proxy (WARP)
- Add socks-proxy-agent dependency
- Add resolveGeminiProxyAgent() helper reading GEMINI_PROXY_URL env
- Apply agent to T1 and T2 Gemini WS factory defaults
- No proxy when GEMINI_PROXY_URL is unset (local dev unchanged)
- Tests: 311/311 green
2026-06-30 20:30:15 +03:00

532 lines
16 KiB
TypeScript

/**
* geminiLive.ts — Sprint 6d (revert WS brut).
*
* Historiquement, le proxy s'appuyait sur un SDK Gemini de haut niveau, mais
* celui-ci fermait la session sans setupComplete ni raison exploitable. On
* utilise désormais le WebSocket brut (package `ws`), qui permet de loguer
* précisément ce que Gemini répond et de maîtriser le contenu exact du setup
* frame (model, systemInstruction, transcriptions, VAD).
*
* Interface publique (consommée par `routes/t2live.ts`) — INCHANGÉE :
* - openGeminiLiveSession(clientWs, opts)
* - WebSocketLike, OpenGeminiLiveSessionOptions
* - buildT2SystemPrompt({role, contexte})
* - GEMINI_LIVE_MODEL, T2_SESSION_TIMEOUT_MS, T2_SESSION_WARNING_MS
*/
import { WebSocket as NodeWebSocket } from "ws";
import { SocksProxyAgent } from "socks-proxy-agent";
/**
* Résout l'agent proxy SOCKS5 pour les connexions WebSocket vers Gemini Live.
*
* Contexte : l'IP du VPS de production (datacenter) est bloquée par Google.
* Cloudflare WARP tourne en mode proxy sur le VPS (socks5://127.0.0.1:40000) ;
* router UNIQUEMENT le trafic Gemini via ce proxy le débloque, sans affecter
* le reste (Supabase, DeepSeek, clients).
*
* `GEMINI_PROXY_URL` est optionnelle : absente → connexion directe (dev local
* intact). Présente (ex: socks5://127.0.0.1:40000) → SocksProxyAgent.
*/
export function resolveGeminiProxyAgent(): SocksProxyAgent | undefined {
const url = process.env.GEMINI_PROXY_URL;
return url ? new SocksProxyAgent(url) : undefined;
}
export const GEMINI_LIVE_URL =
"wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent";
/**
* Modèle Live cible. `gemini-2.0-flash-live-001` est le modèle Live confirmé
* par la doc Google pour les clés API Developer + Express. Format `models/...`
* requis dans le setup frame natif.
*/
export const GEMINI_LIVE_MODEL = "gemini-3.1-flash-live-preview";
/** Timeout total session WS T2 Live : 3 min 30 (durée TCF) + marge évaluation. */
export const T2_SESSION_TIMEOUT_MS = 210_000;
/** Warning au client : 30 s avant le timeout. */
export const T2_SESSION_WARNING_MS = 180_000;
/**
* Construit le prompt système T2 Live à partir du sujet (role + contexte).
* Cf. docs/Prompt_t2live.md §3. Conservé en signature pour usage futur quand
* `systemInstruction` sera réintégré dans le setup frame.
*/
export function buildT2SystemPrompt(input: {
role: string;
contexte: string;
}): string {
const { role, contexte } = input;
return `RÔLE : Tu incarnes ${role}.
CONTEXTE : ${contexte}
RÈGLES ABSOLUES :
1. Tu parles TOUJOURS en français naturel et courant, niveau B2-C1.
2. Tu NE corriges JAMAIS les erreurs du candidat. Tu continues naturellement.
3. Tu attends que le candidat finisse sa question avant de répondre.
4. Tes réponses sont courtes (15 à 25 mots maximum) pour laisser la place au dialogue.
5. Ne donne pas toutes les informations d'un coup. Force le candidat à poser des questions précises.
6. Si le candidat est vague, réponds brièvement sans chercher à compléter — c'est à lui de reformuler.
7. STRICTE INTERDICTION DE POSER DES QUESTIONS. Tu n'as pas le droit d'utiliser de point d'interrogation. Tes phrases se terminent par un point.
8. SILENCE TOTAL APRÈS LA RÉPONSE. Réponds de manière factuelle, puis arrête-toi immédiatement. Ne suggère rien, ne relance pas, ne dis pas "et vous ?".
9. RÔLE PASSIF : tu es une source d'information inerte. Tu n'aides pas le candidat à tenir la conversation. S'il ne parle plus, le silence s'installe.
10. AUCUNE FORMULE DE POLITESSE DE FIN : bannis "n'hésitez pas", "j'espère que ça vous aide", "qu'en pensez-vous ?".
11. JAMAIS de listes ni de structure numérotée — parle naturellement.
12. Ne mentionne jamais que tu es une IA ou un modèle.
13. Tu ne prends PAS la parole en premier. Tu attends que le candidat s'adresse à toi.`;
}
/**
* Subset minimal d'une WebSocket — compatible avec :
* - le wrapper exposé par @hono/node-ws (côté client navigateur)
* - la WebSocket de `ws` (côté Gemini)
* - les fakes basés sur EventEmitter dans les tests
*/
export interface WebSocketLike {
send(data: unknown): void;
close(code?: number, reason?: string): void;
on(event: "message", listener: (data: unknown) => void): void;
on(event: "close", listener: (code?: number, reason?: unknown) => void): void;
on(event: "error", listener: (err: unknown) => void): void;
on(event: "open", listener: () => void): void;
}
export interface OpenGeminiLiveSessionOptions {
/** Rôle joué par l'IA, injecté dans le prompt système. */
role: string;
/** Contexte de la situation, injecté dans le prompt système. */
contexte: string;
/** Callback déclenché en fin de session avec le transcript reconstruit. */
onSessionEnd?: (transcript: string) => void | Promise<void>;
/** Override timeout (par défaut T2_SESSION_TIMEOUT_MS). */
timeoutMs?: number;
/** Override warning (par défaut T2_SESSION_WARNING_MS). */
warningMs?: number;
/** Surcharge la clé API (par défaut : process.env.GEMINI_API_KEY). */
apiKey?: string;
/**
* Injection pour les tests — fabrique de WebSocket vers Gemini.
*/
clientFactory?: (url: string) => WebSocketLike;
}
/**
* Forme minimale d'un message Gemini Live JSON entrant.
*/
export interface GeminiServerMessage {
setupComplete?: unknown;
serverContent?: {
modelTurn?: {
parts?: Array<{
inlineData?: { data?: string; mimeType?: string };
}>;
};
inputTranscription?: { text?: string };
outputTranscription?: { text?: string };
interrupted?: boolean;
turnComplete?: boolean;
};
}
export interface TranscriptEntry {
speaker: "candidat" | "examinateur";
text: string;
}
export function reconstructTranscript(entries: TranscriptEntry[]): string {
return entries
.map((e) =>
e.speaker === "candidat"
? `Candidat : ${e.text}`
: `Examinateur : ${e.text}`,
)
.join("\n");
}
/**
* Détecte un signal de fin de session envoyé par le client : `{type:'end'}`.
*/
export function isEndSignal(data: unknown): boolean {
let text: string;
if (typeof data === "string") {
text = data;
} else if (data instanceof Buffer) {
try {
text = data.toString("utf8");
} catch {
return false;
}
} else {
return false;
}
if (!text.startsWith("{")) return false;
try {
const parsed = JSON.parse(text) as { type?: string };
return parsed.type === "end";
} catch {
return false;
}
}
/**
* Parse un message client `{type:'audio', data: base64}` et renvoie le base64
* si le format est valide, sinon null.
*/
export function parseAudioChunk(data: unknown): string | null {
let text: string;
if (typeof data === "string") {
text = data;
} else if (data instanceof Buffer) {
try {
text = data.toString("utf8");
} catch {
return null;
}
} else {
return null;
}
if (!text.startsWith("{")) return null;
try {
const parsed = JSON.parse(text) as { type?: string; data?: unknown };
if (parsed.type === "audio" && typeof parsed.data === "string") {
return parsed.data;
}
return null;
} catch {
return null;
}
}
/**
* Tente de parser un message Gemini en JSON. Retourne null si binaire / non-JSON.
*/
export function tryParseGeminiJson(data: unknown): GeminiServerMessage | null {
let text: string;
if (typeof data === "string") {
text = data;
} else if (data instanceof Buffer) {
try {
text = data.toString("utf8");
if (!text.startsWith("{")) return null;
} catch {
return null;
}
} else if (typeof data === "object" && data !== null && "toString" in data) {
try {
text = (data as { toString: () => string }).toString();
if (!text.startsWith("{")) return null;
} catch {
return null;
}
} else {
return null;
}
try {
return JSON.parse(text) as GeminiServerMessage;
} catch {
return null;
}
}
/**
* VAD automatique par défaut (T2 Live) : START/END_SENSITIVITY_LOW, 2 s de
* silence avant que l'IA réponde — cf. IMPLEMENTATION_T2_LIVE.md §3.
*/
export const T2_AUTOMATIC_ACTIVITY_DETECTION = {
disabled: false,
startOfSpeechSensitivity: "START_SENSITIVITY_LOW",
endOfSpeechSensitivity: "END_SENSITIVITY_LOW",
silenceDurationMs: 2000,
} as const;
/**
* Construit le setup frame Gemini Live : model + responseModalities AUDIO,
* systemInstruction, input/outputAudioTranscription, et
* realtimeInputConfig.automaticActivityDetection.
*
* `automaticActivityDetection` est paramétrable (défaut = VAD T2 inchangé).
* T1 Live (VAD manuel) passera `{ disabled: true }` pour piloter les bornes de
* tour côté backend (activityStart / activityEnd).
*/
export function buildSetupFrame(
systemPrompt: string,
automaticActivityDetection: Record<
string,
unknown
> = T2_AUTOMATIC_ACTIVITY_DETECTION,
): string {
return JSON.stringify({
setup: {
model: `models/${GEMINI_LIVE_MODEL}`,
generationConfig: {
responseModalities: ["AUDIO"],
},
systemInstruction: {
parts: [{ text: systemPrompt }],
},
inputAudioTranscription: {},
outputAudioTranscription: {},
realtimeInputConfig: {
automaticActivityDetection,
},
},
});
}
/**
* Ouvre une session Gemini Live via WebSocket brut (`ws://...?key=...`) et
* proxifie les messages dans les deux sens entre le client (navigateur) et
* Gemini.
*
* - URL : GEMINI_LIVE_URL?key=apiKey
* - À l'open Gemini : envoi du setup frame minimal.
* - Forward client → Gemini : parse `{type:'audio', data: base64}` →
* message JSON `{ realtimeInput: { audio: { data, mimeType } } }`.
* - Forward Gemini → client : forward verbatim (string ou Buffer).
* - Accumule input/outputTranscription pour la correction finale.
* - Détecte `{type:'end'}` du client → fin de session.
* - Timer 210 s : warning à 180 s, fin auto à 210 s.
* - En fin : `onSessionEnd(transcript)` puis ferme Gemini. Le client WS
* n'est PAS fermé ici — c'est l'appelant qui décide.
* - Erreur Gemini / close prématurée → close client 4006 GEMINI_DISCONNECTED.
* - GEMINI_API_KEY absente → close client 4005 GEMINI_CONFIG.
*/
export function openGeminiLiveSession(
clientWs: WebSocketLike,
opts: OpenGeminiLiveSessionOptions,
): void {
const apiKey = opts.apiKey ?? process.env.GEMINI_API_KEY;
if (!apiKey) {
clientWs.close(4005, "GEMINI_CONFIG");
return;
}
const timeoutMs = opts.timeoutMs ?? T2_SESSION_TIMEOUT_MS;
const warningMs = opts.warningMs ?? T2_SESSION_WARNING_MS;
const systemPrompt = buildT2SystemPrompt({
role: opts.role,
contexte: opts.contexte,
});
const url = `${GEMINI_LIVE_URL}?key=${apiKey}`;
const proxyAgent = resolveGeminiProxyAgent();
const factory =
opts.clientFactory ??
((u: string) =>
new NodeWebSocket(
u,
proxyAgent ? { agent: proxyAgent } : undefined,
) as unknown as WebSocketLike);
console.log("[T2] Gemini WS URL:", GEMINI_LIVE_URL + "?key=***");
console.log("[T2] Gemini WS model:", GEMINI_LIVE_MODEL);
const geminiWs = factory(url);
const transcriptEntries: TranscriptEntry[] = [];
let sessionEnded = false;
let warningTimer: ReturnType<typeof setTimeout> | null = null;
let timeoutTimer: ReturnType<typeof setTimeout> | null = null;
const clearTimers = () => {
if (warningTimer !== null) {
clearTimeout(warningTimer);
warningTimer = null;
}
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
};
const endSession = async () => {
if (sessionEnded) return;
sessionEnded = true;
clearTimers();
try {
geminiWs.close(1000);
} catch {
/* ignore */
}
if (opts.onSessionEnd) {
try {
await opts.onSessionEnd(reconstructTranscript(transcriptEntries));
} catch (err) {
console.error(
"[T2] onSessionEnd threw:",
err instanceof Error ? err.message : String(err),
);
}
}
};
geminiWs.on("open", () => {
console.log("[T2] Gemini WS open");
const frame = buildSetupFrame(systemPrompt);
console.log("[T2] Gemini setup frame:", frame);
try {
geminiWs.send(frame);
} catch (err) {
console.error(
"[T2] Gemini setup frame send failed:",
err instanceof Error ? err.message : String(err),
);
try {
clientWs.close(4006, "GEMINI_DISCONNECTED");
} catch {
/* ignore */
}
return;
}
// Timers démarrés à l'ouverture de la WS (avant setupComplete éventuel).
warningTimer = setTimeout(() => {
if (sessionEnded) return;
try {
clientWs.send(
JSON.stringify({
type: "warning",
message: "30 secondes restantes",
}),
);
} catch {
/* ignore */
}
}, warningMs);
timeoutTimer = setTimeout(() => {
void endSession();
}, timeoutMs);
});
geminiWs.on("message", (data) => {
const preview =
typeof data === "string"
? data.slice(0, 300)
: data instanceof Buffer
? data.toString("utf8").slice(0, 300)
: "[binary]";
console.log("[T2] Gemini WS message:", preview);
// Accumuler input/outputTranscription.
const parsed = tryParseGeminiJson(data);
if (parsed) {
const sc = parsed.serverContent;
if (
sc?.inputTranscription?.text &&
sc.inputTranscription.text.length > 0
) {
transcriptEntries.push({
speaker: "candidat",
text: sc.inputTranscription.text,
});
}
if (
sc?.outputTranscription?.text &&
sc.outputTranscription.text.length > 0
) {
transcriptEntries.push({
speaker: "examinateur",
text: sc.outputTranscription.text,
});
}
}
// Forward verbatim au client (string ou Buffer audio inlineData).
try {
clientWs.send(data);
} catch {
void endSession();
}
});
geminiWs.on("close", (code, reason) => {
const reasonStr =
reason instanceof Buffer
? reason.toString("utf8")
: typeof reason === "string"
? reason
: "";
console.log(
"[T2] Gemini WS close:",
JSON.stringify({ code, reason: reasonStr }),
);
if (!sessionEnded) {
clearTimers();
sessionEnded = true;
try {
clientWs.close(4006, "GEMINI_DISCONNECTED");
} catch {
/* ignore */
}
}
});
geminiWs.on("error", (err) => {
console.log(
"[T2] Gemini WS error:",
JSON.stringify(err instanceof Error ? { message: err.message } : err),
);
if (!sessionEnded) {
clearTimers();
sessionEnded = true;
try {
clientWs.close(4006, "GEMINI_DISCONNECTED");
} catch {
/* ignore */
}
}
});
// ── Forward client → Gemini ──────────────────────────────────────────
clientWs.on("message", (data) => {
if (isEndSignal(data)) {
void endSession();
return;
}
const audioBase64 = parseAudioChunk(data);
if (audioBase64 !== null && !sessionEnded) {
try {
geminiWs.send(
JSON.stringify({
realtimeInput: {
audio: {
data: audioBase64,
mimeType: "audio/pcm;rate=16000",
},
},
}),
);
} catch (err) {
console.log(
"[T2] Gemini WS send (audio) failed:",
err instanceof Error ? err.message : String(err),
);
void endSession();
}
}
// Tout autre message client est ignoré.
});
clientWs.on("close", () => {
clearTimers();
sessionEnded = true;
try {
geminiWs.close(1000);
} catch {
/* ignore */
}
});
clientWs.on("error", () => {
clearTimers();
sessionEnded = true;
try {
geminiWs.close(1011);
} catch {
/* ignore */
}
});
}