From c7f00c0624966f76c629726c38e7d0010c85a2fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Wed, 26 Nov 2025 11:00:38 +0200 Subject: [PATCH 01/48] Add build and deploy gh workflow (#1) --- .github/workflows/build.yaml | 101 +++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 .github/workflows/build.yaml diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..d2973bf --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,101 @@ +name: "Deploys a Cloudflare Worker App" +on: + # push: + # branches: + # - master + # - main + workflow_dispatch: + inputs: + cf_env: + description: Choose environment + type: choice + required: true + options: + - dev + - staging + - prod + default: dev + branch: + description: Choose branch or tag, defaults to main + type: string + required: true + default: main + emsdk_version: + description: Emscripten SDK version, defaults to latest + type: string + required: false + default: latest + preCommand: + description: Provide a bash script to execute before running wrangler + type: string + required: false + default: echo "No script provided for execution before running Wrangler. Moving along." + postCommand: + description: Provide a bash script to execute after running wrangler + type: string + required: false + default: echo "Nothing to execute after running Wrangler. Finishing..." +jobs: + deploy: + runs-on: "ubuntu-latest" + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + ref: ${{ inputs.branch }} + + - name: Set environment variables + run: | + if [[ "${{ inputs.cf_env }}" == "dev" ]]; then + echo "CF_ACCOUNT=DEV_ACCOUNT_ID" >> "$GITHUB_ENV" + echo "CF_TOKEN=JITSI_CF_DEV_TOKEN" >> "$GITHUB_ENV" + elif [[ "${{ inputs.cf_env }}" == "staging" ]]; then + echo "CF_ACCOUNT=STAGE_ACCOUNT_ID" >> "$GITHUB_ENV" + echo "CF_TOKEN=JITSI_CF_STAGING_TOKEN" >> "$GITHUB_ENV" + elif [[ "${{ inputs.cf_env }}" == "prod" ]]; then + echo "CF_ACCOUNT=PROD_ACCOUNT_ID" >> "$GITHUB_ENV" + echo "CF_TOKEN=JITSI_CF_PROD_TOKEN" >> "$GITHUB_ENV" + else + echo "Invalid environment specified: ${{ inputs.cf_env }}, exiting." + exit 1 + fi + + - name: Install Linux deps + run: | + sudo apt update + sudo apt -y install wget unzip + + - name: Setup Emscripten SDK + run: | + wget https://github.com/emscripten-core/emsdk/archive/main.zip + unzip main.zip + cd emsdk-main + ./emsdk install ${{ inputs.emsdk_version }} + ./emsdk activate ${{ inputs.emsdk_version }} + source ./emsdk_env.sh + echo "EMSDK=$EMSDK" >> $GITHUB_ENV + echo "EM_CONFIG=$EM_CONFIG" >> $GITHUB_ENV + echo "$EMSDK:$EMSDK/upstream/emscripten" >> $GITHUB_PATH + + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: npm + + - name: Install dependencies + run: | + npm ci + + - name: Build ts app + run: | + npm run build + + - name: Wrangler Deploy + uses: cloudflare/wrangler-action@v3 + with: + wranglerVersion: "4.40.2" + apiToken: ${{ secrets[env.CF_TOKEN] }} + accountId: ${{ secrets[env.CF_ACCOUNT] }} + preCommands: ${{ inputs.preCommand }} + postCommands: ${{ inputs.postCommand }} + command: deploy From 6c4cc9c299b277d6691da524d5c28590d3a009e8 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 11:05:08 +0200 Subject: [PATCH 02/48] fix: add libopus dep --- .github/workflows/build.yaml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index d2973bf..18c34d9 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -63,7 +63,7 @@ jobs: - name: Install Linux deps run: | sudo apt update - sudo apt -y install wget unzip + sudo apt -y install wget unzip libopus-dev - name: Setup Emscripten SDK run: | @@ -77,11 +77,6 @@ jobs: echo "EM_CONFIG=$EM_CONFIG" >> $GITHUB_ENV echo "$EMSDK:$EMSDK/upstream/emscripten" >> $GITHUB_PATH - - uses: actions/setup-node@v4 - with: - node-version: 22 - cache: npm - - name: Install dependencies run: | npm ci From 87562c2e6e5af7661146b9ca1f280affbc4b3695 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 11:06:16 +0200 Subject: [PATCH 03/48] chore: move node install --- .github/workflows/build.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 18c34d9..a4aa009 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -64,7 +64,10 @@ jobs: run: | sudo apt update sudo apt -y install wget unzip libopus-dev - + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: npm - name: Setup Emscripten SDK run: | wget https://github.com/emscripten-core/emsdk/archive/main.zip From 7d0e38b584e8bf53366a8b4372e0a0ed1dc35e04 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 11:07:08 +0200 Subject: [PATCH 04/48] chore: rename gh workflow --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index a4aa009..40099ea 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,4 +1,4 @@ -name: "Deploys a Cloudflare Worker App" +name: "Build and deploy" on: # push: # branches: From 9505f2b195795b5300b965b9485343094668dd83 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 11:18:48 +0200 Subject: [PATCH 05/48] fix: activate emsdk --- .github/workflows/build.yaml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 40099ea..86a6363 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -64,10 +64,16 @@ jobs: run: | sudo apt update sudo apt -y install wget unzip libopus-dev + - uses: actions/setup-node@v4 with: node-version: 22 cache: npm + + - name: Install Node Dependencies + run: | + npm ci + - name: Setup Emscripten SDK run: | wget https://github.com/emscripten-core/emsdk/archive/main.zip @@ -80,12 +86,9 @@ jobs: echo "EM_CONFIG=$EM_CONFIG" >> $GITHUB_ENV echo "$EMSDK:$EMSDK/upstream/emscripten" >> $GITHUB_PATH - - name: Install dependencies - run: | - npm ci - - name: Build ts app run: | + source "$EMSDK/emsdk_env.sh" npm run build - name: Wrangler Deploy From 75f44483e19a064c556aebc1664a6882167fa1f7 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 11:25:16 +0200 Subject: [PATCH 06/48] fix: also checkout submodules on build --- .github/workflows/build.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 86a6363..ca0f3a3 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -43,6 +43,7 @@ jobs: uses: actions/checkout@v6 with: ref: ${{ inputs.branch }} + submodules: 'true' - name: Set environment variables run: | From 55628485933ad08cfd0ff2b71914d2a42a734009 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 26 Nov 2025 11:12:28 -0500 Subject: [PATCH 07/48] Fix logging of OpenAI error message. --- src/OutgoingConnection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 9e33439..b575eb1 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -416,7 +416,7 @@ export class OutgoingConnection { // Reset completed this.setTag(this.pendingTags.shift()!); } else if (parsedMessage.type === 'error') { - console.error(`OpenAI sent error message for ${this._tag}: ${parsedMessage}`); + console.error(`OpenAI sent error message for ${this._tag}: ${data}`); this.doClose(true); } // TODO: are there any other messages we care about? From 79ebc9467ec5615e3248952d9a900a9403a0afa2 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 18:56:52 +0200 Subject: [PATCH 08/48] fix: stale context --- package-lock.json | 2 +- package.json | 2 +- src/index.ts | 36 +++++++++++++++++++----------------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/package-lock.json b/package-lock.json index 91e61d4..241351e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,7 +11,7 @@ "devDependencies": { "@cloudflare/vitest-pool-workers": "^0.8.19", "@types/node": "^24.7.2", - "prettier": "3.6.2", + "prettier": "^3.6.2", "typescript": "^5.9.3", "vitest": "~3.2.0", "wrangler": "^4.38.0" diff --git a/package.json b/package.json index 3482876..d26c7d6 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "devDependencies": { "@cloudflare/vitest-pool-workers": "^0.8.19", "@types/node": "^24.7.2", - "prettier": "3.6.2", + "prettier": "^3.6.2", "typescript": "^5.9.3", "vitest": "~3.2.0", "wrangler": "^4.38.0" diff --git a/src/index.ts b/src/index.ts index 1db7e82..c206db8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -124,23 +124,25 @@ export default { text: data.transcript.map((t) => t.text).join(' '), timestamp: data.timestamp, }; - ctx.waitUntil( - dispatcher - ?.dispatch(dispatcherMessage) - .then((response) => { - if (!response.success || response.errors) { - console.error('Dispatcher error:', { - message: response.message, - errors: response.errors, - dispatcherMessage, - }); - } - }) - .catch((error) => { - const message = error instanceof Error ? error.message : String(error); - console.error('Dispatcher RPC failed:', message, dispatcherMessage); - }), - ); + // Note: We intentionally don't use ctx.waitUntil() here because the + // ExecutionContext from the initial WebSocket upgrade request becomes + // stale after the response is sent. Using it would cause "IoContext + // timed out due to inactivity" errors when transcription events fire. + dispatcher + ?.dispatch(dispatcherMessage) + .then((response) => { + if (!response.success || response.errors) { + console.error('Dispatcher error:', { + message: response.message, + errors: response.errors, + dispatcherMessage, + }); + } + }) + .catch((error) => { + const message = error instanceof Error ? error.message : String(error); + console.error('Dispatcher RPC failed:', message, dispatcherMessage); + }); } }); From 95f8d5e34e16a3fdfbf2797a84ccb03f069ce129 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 26 Nov 2025 12:06:50 -0500 Subject: [PATCH 09/48] Temp: log transcription events chattily. --- src/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/index.ts b/src/index.ts index c206db8..42f0d19 100644 --- a/src/index.ts +++ b/src/index.ts @@ -110,6 +110,7 @@ export default { } session.on('transcription', (data: TranscriptionMessage) => { + console.log('Transcription event:', JSON.stringify(data)); const message = outbound || transcriptionator || sendBack ? JSON.stringify(data) : ''; outbound?.send(message); transcriptionator?.broadcastMessage(message); @@ -138,6 +139,7 @@ export default { dispatcherMessage, }); } + console.log('Sent to dispatcher: ', JSON.stringify(dispatcherMessage), '; Response:', JSON.stringify(response)); }) .catch((error) => { const message = error instanceof Error ? error.message : String(error); From f797f1c1ef4ae9aaf00b347433a42a5179b65e4d Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 19:22:26 +0200 Subject: [PATCH 10/48] fix: remove libopus --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ca0f3a3..ceaae43 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -64,7 +64,7 @@ jobs: - name: Install Linux deps run: | sudo apt update - sudo apt -y install wget unzip libopus-dev + sudo apt -y install wget unzip - uses: actions/setup-node@v4 with: From 7a16b8263d7a60f5190e0706da16ebf061adfa67 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 26 Nov 2025 12:27:04 -0500 Subject: [PATCH 11/48] Temp: Remove previous chattiness. Log opus decoding chattily, and log errors. --- src/OutgoingConnection.ts | 6 ++++++ src/index.ts | 2 -- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index b575eb1..490151c 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -284,8 +284,14 @@ export class OutgoingConnection { try { // Decode the Opus audio data + console.log(`Decoding opus frame of size ${opusFrame.length} for tag: ${this._tag}`); const decodedAudio = this.opusDecoder.decodeFrame(opusFrame); + if (decodedAudio.errors.length > 0) { + console.error(`Opus decoding errors for tag ${this._tag}:`, decodedAudio.errors); + return; + } this.lastOpusFrameSize = decodedAudio.samplesDecoded; + console.log(`Decoded opus frame to ${decodedAudio.pcmData.length} samples for tag: ${this._tag}`); this.sendOrEnqueueDecodedAudio(decodedAudio.pcmData); } catch (error) { console.error(`Error processing audio data for tag ${this._tag}:`, error); diff --git a/src/index.ts b/src/index.ts index 42f0d19..c206db8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -110,7 +110,6 @@ export default { } session.on('transcription', (data: TranscriptionMessage) => { - console.log('Transcription event:', JSON.stringify(data)); const message = outbound || transcriptionator || sendBack ? JSON.stringify(data) : ''; outbound?.send(message); transcriptionator?.broadcastMessage(message); @@ -139,7 +138,6 @@ export default { dispatcherMessage, }); } - console.log('Sent to dispatcher: ', JSON.stringify(dispatcherMessage), '; Response:', JSON.stringify(response)); }) .catch((error) => { const message = error instanceof Error ? error.message : String(error); From aa50e594ffaff2267d29ba5c98d1c04451ce8e10 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 26 Nov 2025 12:37:28 -0500 Subject: [PATCH 12/48] Temp: Move chattiness to Opus WASM module loading. --- src/OpusDecoder/OpusDecoder.ts | 2 ++ src/OutgoingConnection.ts | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/OpusDecoder/OpusDecoder.ts b/src/OpusDecoder/OpusDecoder.ts index 7923649..82cad2f 100644 --- a/src/OpusDecoder/OpusDecoder.ts +++ b/src/OpusDecoder/OpusDecoder.ts @@ -136,6 +136,8 @@ export class OpusDecoder 0) { console.error(`Opus decoding errors for tag ${this._tag}:`, decodedAudio.errors); return; } this.lastOpusFrameSize = decodedAudio.samplesDecoded; - console.log(`Decoded opus frame to ${decodedAudio.pcmData.length} samples for tag: ${this._tag}`); this.sendOrEnqueueDecodedAudio(decodedAudio.pcmData); } catch (error) { console.error(`Error processing audio data for tag ${this._tag}:`, error); From 01f4300160494f6f881b1477142c0d2cad92674b Mon Sep 17 00:00:00 2001 From: RazvanP Date: Wed, 26 Nov 2025 19:42:00 +0200 Subject: [PATCH 13/48] move build to emscripten step --- .github/workflows/build.yaml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ceaae43..3166f00 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -83,14 +83,16 @@ jobs: ./emsdk install ${{ inputs.emsdk_version }} ./emsdk activate ${{ inputs.emsdk_version }} source ./emsdk_env.sh + cd .. + npm run build echo "EMSDK=$EMSDK" >> $GITHUB_ENV echo "EM_CONFIG=$EM_CONFIG" >> $GITHUB_ENV echo "$EMSDK:$EMSDK/upstream/emscripten" >> $GITHUB_PATH - - name: Build ts app - run: | - source "$EMSDK/emsdk_env.sh" - npm run build + # - name: Build ts app + # run: | + # source "$EMSDK/emsdk_env.sh" + # npm run build - name: Wrangler Deploy uses: cloudflare/wrangler-action@v3 From d7abd850c47786bda19749ef5f84797452135546 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 26 Nov 2025 15:55:39 -0500 Subject: [PATCH 14/48] Properly handle errors in promise. --- src/OpusDecoder/OpusDecoder.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/OpusDecoder/OpusDecoder.ts b/src/OpusDecoder/OpusDecoder.ts index 82cad2f..dae7ea7 100644 --- a/src/OpusDecoder/OpusDecoder.ts +++ b/src/OpusDecoder/OpusDecoder.ts @@ -68,9 +68,14 @@ export class OpusDecoder((resolve, reject) => { OpusDecoderModule({ instantiateWasm(info: WebAssembly.Imports, receive: (instance: WebAssembly.Instance) => void) { - let instance = new WebAssembly.Instance(wasm, info); - receive(instance); - return instance.exports; + try { + let instance = new WebAssembly.Instance(wasm, info); + receive(instance); + return instance.exports; + } catch (error) { + reject(error); + throw error; + } }, }).then((module: any) => { resolve({ @@ -83,6 +88,8 @@ export class OpusDecoder { + reject(error); }); }); From 36f7c4627bf5f6e82720d27ec8f8a97e6a5ad917 Mon Sep 17 00:00:00 2001 From: RazvanP Date: Thu, 27 Nov 2025 08:56:40 +0200 Subject: [PATCH 15/48] chore: revert build changes, set the default to working emscripten 4.0.15 --- .github/workflows/build.yaml | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 3166f00..7d2b8e2 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -21,10 +21,10 @@ on: required: true default: main emsdk_version: - description: Emscripten SDK version, defaults to latest + description: Emscripten SDK version, defaults to 4.0.15 type: string required: false - default: latest + default: "4.0.15" preCommand: description: Provide a bash script to execute before running wrangler type: string @@ -83,21 +83,19 @@ jobs: ./emsdk install ${{ inputs.emsdk_version }} ./emsdk activate ${{ inputs.emsdk_version }} source ./emsdk_env.sh - cd .. - npm run build echo "EMSDK=$EMSDK" >> $GITHUB_ENV echo "EM_CONFIG=$EM_CONFIG" >> $GITHUB_ENV echo "$EMSDK:$EMSDK/upstream/emscripten" >> $GITHUB_PATH - # - name: Build ts app - # run: | - # source "$EMSDK/emsdk_env.sh" - # npm run build + - name: Build TS App + run: | + source "$EMSDK/emsdk_env.sh" + npm run build - name: Wrangler Deploy uses: cloudflare/wrangler-action@v3 with: - wranglerVersion: "4.40.2" + wranglerVersion: "4.51.0" apiToken: ${{ secrets[env.CF_TOKEN] }} accountId: ${{ secrets[env.CF_ACCOUNT] }} preCommands: ${{ inputs.preCommand }} From dfe945271a4517efaa307c8e3e1af0885db3c093 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Mon, 1 Dec 2025 17:00:06 -0500 Subject: [PATCH 16/48] Get emscripten code working with latest emscripten (4.0.20). Add source maps for emscripten-generated code. --- .github/workflows/build.yaml | 4 ++-- Makefile | 11 +++++++++-- src/OpusDecoder/OpusDecoder.ts | 8 ++++++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 7d2b8e2..a3eab28 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -21,10 +21,10 @@ on: required: true default: main emsdk_version: - description: Emscripten SDK version, defaults to 4.0.15 + description: Emscripten SDK version, defaults to 4.0.20 type: string required: false - default: "4.0.15" + default: "4.0.20" preCommand: description: Provide a bash script to execute before running wrangler type: string diff --git a/Makefile b/Makefile index 5ecd1cd..b3eb956 100644 --- a/Makefile +++ b/Makefile @@ -7,15 +7,17 @@ OPUS_DECODER_DIST=./dist OPUS_DECODER_EMSCRIPTEN_BUILD=$(OPUS_DECODER_BUILD)/EmscriptenWasm.tmp.js OPUS_DECODER_EMSCRIPTEN_WASM=$(OPUS_DECODER_BUILD)/EmscriptenWasm.tmp.wasm +OPUS_DECODER_EMSCRIPTEN_WASM_MAP=$(OPUS_DECODER_BUILD)/EmscriptenWasm.tmp.wasm.map OPUS_DECODER_MODULE=$(OPUS_DECODER_DIST)/opus-decoder.js OPUS_DECODER_WASM=$(OPUS_DECODER_DIST)/opus-decoder.wasm +OPUS_DECODER_WASM_MAP=$(OPUS_DECODER_DIST)/opus-decoder.wasm.map LIBOPUS_SRC=$(OPUS_DECODER_SRC)/opus LIBOPUS_BUILD=$(OPUS_DECODER_BUILD)/build-opus-wasm LIBOPUS_WASM_LIB=$(OPUS_DECODER_BUILD)/libopus.a clean: - rm -rf $(OPUS_DECODER_EMSCRIPTEN_BUILD) $(OPUS_DECODER_EMSCRIPTEN_WASM) $(OPUS_DECODER_MODULE) $(OPUS_DECODER_WASM) $(LIBOPUS_WASM_LIB) + rm -rf $(OPUS_DECODER_EMSCRIPTEN_BUILD) $(OPUS_DECODER_EMSCRIPTEN_WASM) $(OPUS_DECODER_EMSCRIPTEN_WASM_MAP) $(OPUS_DECODER_MODULE) $(OPUS_DECODER_WASM) $(OPUS_DECODER_WASM_MAP) $(LIBOPUS_WASM_LIB) +emmake $(MAKE) -C $(LIBOPUS_BUILD) clean configure: libopus-configure @@ -35,6 +37,10 @@ opus-decoder: opus-wasmlib $(OPUS_DECODER_EMSCRIPTEN_BUILD) else \ echo "Warning: WASM file not found, you may need to adjust emscripten settings"; \ fi + @if [ -f "$(OPUS_DECODER_EMSCRIPTEN_WASM_MAP)" ]; then \ + cp $(OPUS_DECODER_EMSCRIPTEN_WASM_MAP) $(OPUS_DECODER_WASM_MAP); \ + echo "Copied WASM source map to $(OPUS_DECODER_WASM_MAP)"; \ + fi # libopus opus-wasmlib: $(LIBOPUS_WASM_LIB) @@ -44,12 +50,13 @@ define EMCC_OPTS -O2 \ -msimd128 \ --minify 0 \ +-gsource-map \ -s WASM=1 \ -s TEXTDECODER=2 \ -s SINGLE_FILE=0 \ -s MALLOC="emmalloc" \ -s NO_FILESYSTEM=1 \ --s ENVIRONMENT=web \ +-s ENVIRONMENT=node \ -s ASSERTIONS=1 \ -s ABORTING_MALLOC=0 \ -s EXIT_RUNTIME=0 \ diff --git a/src/OpusDecoder/OpusDecoder.ts b/src/OpusDecoder/OpusDecoder.ts index dae7ea7..3520af1 100644 --- a/src/OpusDecoder/OpusDecoder.ts +++ b/src/OpusDecoder/OpusDecoder.ts @@ -5,6 +5,14 @@ // submodules and compiled into the dist files, may have different // licensing terms." +// Provide Node.js globals for emscripten module. HACK. +if (typeof globalThis.__filename === 'undefined') { + globalThis.__filename = './opus-decoder.js'; +} +if (typeof globalThis.__dirname === 'undefined') { + globalThis.__dirname = '.' +} + import OpusDecoderModule from '../../dist/opus-decoder.js'; // @ts-ignore import wasm from '../../dist/opus-decoder.wasm'; From 9e6fba2f5f32ce081bd17eab31761585f8e0cdc5 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 3 Dec 2025 16:49:11 -0500 Subject: [PATCH 17/48] Format. --- src/OpusDecoder/OpusDecoder.ts | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/OpusDecoder/OpusDecoder.ts b/src/OpusDecoder/OpusDecoder.ts index 3520af1..b397b18 100644 --- a/src/OpusDecoder/OpusDecoder.ts +++ b/src/OpusDecoder/OpusDecoder.ts @@ -10,7 +10,7 @@ if (typeof globalThis.__filename === 'undefined') { globalThis.__filename = './opus-decoder.js'; } if (typeof globalThis.__dirname === 'undefined') { - globalThis.__dirname = '.' + globalThis.__dirname = '.'; } import OpusDecoderModule from '../../dist/opus-decoder.js'; @@ -85,20 +85,22 @@ export class OpusDecoder { - resolve({ - opus_frame_decoder_create: module._opus_frame_decoder_create, - opus_frame_decoder_destroy: module._opus_frame_decoder_destroy, - opus_frame_decoder_reset: module._opus_frame_decoder_reset, - opus_frame_decode: module._opus_frame_decode, - malloc: module._malloc, - free: module._free, - HEAP: module.wasmMemory.buffer, - module, + }) + .then((module: any) => { + resolve({ + opus_frame_decoder_create: module._opus_frame_decoder_create, + opus_frame_decoder_destroy: module._opus_frame_decoder_destroy, + opus_frame_decoder_reset: module._opus_frame_decoder_reset, + opus_frame_decode: module._opus_frame_decode, + malloc: module._malloc, + free: module._free, + HEAP: module.wasmMemory.buffer, + module, + }); + }) + .catch((error) => { + reject(error); }); - }).catch((error) => { - reject(error); - }); }); private _sampleRate: OpusDecoderSampleRate; From 1cc1287bfcbf489fb83d92758c6b4a16b4661eef Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 3 Dec 2025 16:49:26 -0500 Subject: [PATCH 18/48] Include confidence and message IDs in transcription messages. --- src/OutgoingConnection.ts | 29 +++++++++++++++++++++++++---- src/transcriberproxy.ts | 4 +++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index f3d266b..e89084c 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -172,6 +172,7 @@ export class OutgoingConnection { }, }, }, + include: ['item.input_audio_transcription.logprobs'], }, }; @@ -379,10 +380,22 @@ export class OutgoingConnection { } } - private getTranscriptionMessage(transcript: string, timestamp: number, isInterim: boolean): TranscriptionMessage { + private getTranscriptionMessage( + transcript: string, + confidence: number | undefined, + timestamp: number, + message_id: string, + isInterim: boolean, + ): TranscriptionMessage { const message: TranscriptionMessage = { - transcript: [{ text: transcript }], + transcript: [ + { + ...(confidence !== undefined && { confidence }), + text: transcript, + }, + ], is_interim: isInterim, + message_id, type: 'transcription-result', participant: this.participant, timestamp, @@ -404,7 +417,8 @@ export class OutgoingConnection { if (this.lastTranscriptTime !== undefined) { this.lastTranscriptTime = now; } - const transcription = this.getTranscriptionMessage(parsedMessage.delta, now, true); + const confidence = parsedMessage.logprobs?.logprob !== undefined ? Math.exp(parsedMessage.logprobs.logprob) : undefined; + const transcription = this.getTranscriptionMessage(parsedMessage.delta, confidence, now, parsedMessage.message_id, true); this.onInterimTranscription?.(transcription); } else if (parsedMessage.type === 'conversation.item.input_audio_transcription.completed') { let transcriptTime; @@ -414,7 +428,14 @@ export class OutgoingConnection { } else { transcriptTime = Date.now(); } - const transcription = this.getTranscriptionMessage(parsedMessage.transcript, transcriptTime, false); + const confidence = parsedMessage.logprobs?.logprob !== undefined ? Math.exp(parsedMessage.logprobs.logprob) : undefined; + const transcription = this.getTranscriptionMessage( + parsedMessage.transcript, + confidence, + transcriptTime, + parsedMessage.message_id, + false, + ); this.onCompleteTranscription?.(transcription); } else if (parsedMessage.type === 'input_audio_buffer.cleared') { // Reset completed diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index 9a42d4d..ad047dc 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -2,8 +2,10 @@ import { OutgoingConnection } from './OutgoingConnection'; import { EventEmitter } from 'node:events'; export interface TranscriptionMessage { - transcript: Array<{ text: string }>; + transcript: Array<{ confidence?: number; text: string }>; is_interim: boolean; + language?: string; + message_id: string; type: 'transcription-result'; participant: { id: string; ssrc?: string }; timestamp: number; From d46c073d4a276ef681c96f7472b251254385162a Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 3 Dec 2025 17:27:42 -0500 Subject: [PATCH 19/48] Clean up some logic around resetting an outbound stream. --- src/OutgoingConnection.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index e89084c..c1fe20a 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -115,6 +115,10 @@ export class OutgoingConnection { // Reset the pending audio buffer this.pendingAudioFrames = []; this.pendingAudioDataBuffer.resize(0); + + this.lastChunkNo = -1; + this.lastTimestamp = -1; + this.lastOpusFrameSize = -1; } private async initializeOpusDecoder(): Promise { @@ -263,7 +267,7 @@ export class OutgoingConnection { } /* Make sure numbers make sense */ - const chunkDeltaInSamples = chunkDelta * this.lastOpusFrameSize; + const chunkDeltaInSamples = this.lastOpusFrameSize > 0 ? chunkDelta * this.lastOpusFrameSize : Infinity; const timestampDeltaInSamples = (timestampDelta / 48000) * 24000; const maxConcealment = 120 * 24; /* 120 ms at 24 kHz */ From d9e259e09f33b4885557c5d7936e86f43859de67 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 3 Dec 2025 17:43:25 -0500 Subject: [PATCH 20/48] Correctly reference the syntax of incoming messages from OpenAI. --- src/OutgoingConnection.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index c1fe20a..accdc6c 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -421,8 +421,8 @@ export class OutgoingConnection { if (this.lastTranscriptTime !== undefined) { this.lastTranscriptTime = now; } - const confidence = parsedMessage.logprobs?.logprob !== undefined ? Math.exp(parsedMessage.logprobs.logprob) : undefined; - const transcription = this.getTranscriptionMessage(parsedMessage.delta, confidence, now, parsedMessage.message_id, true); + const confidence = parsedMessage.logprobs?.[0]?.logprob !== undefined ? Math.exp(parsedMessage.logprobs[0].logprob) : undefined; + const transcription = this.getTranscriptionMessage(parsedMessage.delta, confidence, now, parsedMessage.item_id, true); this.onInterimTranscription?.(transcription); } else if (parsedMessage.type === 'conversation.item.input_audio_transcription.completed') { let transcriptTime; @@ -432,12 +432,12 @@ export class OutgoingConnection { } else { transcriptTime = Date.now(); } - const confidence = parsedMessage.logprobs?.logprob !== undefined ? Math.exp(parsedMessage.logprobs.logprob) : undefined; + const confidence = parsedMessage.logprobs?.[0]?.logprob !== undefined ? Math.exp(parsedMessage.logprobs[0].logprob) : undefined; const transcription = this.getTranscriptionMessage( parsedMessage.transcript, confidence, transcriptTime, - parsedMessage.message_id, + parsedMessage.item_id, false, ); this.onCompleteTranscription?.(transcription); From 98aff28dad45da1a6efe25d9e319b8ac34c2987f Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Thu, 4 Dec 2025 12:39:25 -0500 Subject: [PATCH 21/48] Change default OpenAI model to gpt-4o-mini-transcribe. Make it overridable from env. --- src/OutgoingConnection.ts | 2 +- worker-configuration.d.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index accdc6c..34c3528 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -165,7 +165,7 @@ export class OutgoingConnection { type: 'near_field', }, transcription: { - model: 'gpt-4o-transcribe', + model: env.OPENAI_MODEL || 'gpt-4o-mini-transcribe', language: 'en', // TODO parameterize this }, turn_detection: { diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index f419889..8a075f1 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -8,6 +8,7 @@ declare namespace Cloudflare { } interface Env { OPENAI_API_KEY: string; + OPENAI_MODEL?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; TRANSCRIPTION_DISPATCHER: Fetcher /* transcription-dispatcher */; } From 4c22673af11d641adac3af60bbebd09077593cae Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Thu, 4 Dec 2025 11:49:47 -0600 Subject: [PATCH 22/48] Make turn_detection configurable with an env var. --- src/OutgoingConnection.ts | 8 ++------ src/utils.ts | 20 ++++++++++++++++++++ worker-configuration.d.ts | 1 + 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 34c3528..2d3b5f8 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -1,5 +1,6 @@ import { OpusDecoder } from './OpusDecoder/OpusDecoder'; import type { TranscriptionMessage } from './transcriberproxy'; +import { getTurnDetectionConfig } from './utils'; // Type definition augmentation for Uint8Array - Cloudflare Worker's JS has these methods but TypeScript doesn't have // declarations for them as of version 5.9.3. @@ -168,12 +169,7 @@ export class OutgoingConnection { model: env.OPENAI_MODEL || 'gpt-4o-mini-transcribe', language: 'en', // TODO parameterize this }, - turn_detection: { - type: 'server_vad', - threshold: 0.5, - prefix_padding_ms: 300, - silence_duration_ms: 500, - }, + turn_detection: getTurnDetectionConfig(env), }, }, include: ['item.input_audio_transcription.logprobs'], diff --git a/src/utils.ts b/src/utils.ts index 15a43a3..2e5b2dd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -27,3 +27,23 @@ export function extractSessionParameters(url: string): ISessionParameters { sendBack: !!sendBack, }; } + +export function getTurnDetectionConfig(env: Env) { + const defaultTurnDetection = { + type: 'server_vad', + threshold: 0.5, + prefix_padding_ms: 300, + silence_duration_ms: 500, + }; + + if (env.OPENAI_TURN_DETECTION) { + try { + return JSON.parse(env.OPENAI_TURN_DETECTION); + } catch (error) { + console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults: ${error}`); + return defaultTurnDetection; + } + } + + return defaultTurnDetection; +} diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index 8a075f1..faeeb12 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -9,6 +9,7 @@ declare namespace Cloudflare { interface Env { OPENAI_API_KEY: string; OPENAI_MODEL?: string; + OPENAI_TURN_DETECTION?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; TRANSCRIPTION_DISPATCHER: Fetcher /* transcription-dispatcher */; } From 231fc9131fd985cac0b544be888cfbdd27ec76cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Mon, 8 Dec 2025 18:08:51 +0200 Subject: [PATCH 23/48] Add metrics (#3) * Add metrics * Debug logs * Remove metric debug log --- src/OutgoingConnection.ts | 23 +++++++++++++++ src/index.ts | 8 ++++++ src/metrics.ts | 60 +++++++++++++++++++++++++++++++++++++++ worker-configuration.d.ts | 1 + wrangler.jsonc | 7 +++++ 5 files changed, 99 insertions(+) create mode 100644 src/metrics.ts diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 2d3b5f8..80a02aa 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -1,6 +1,7 @@ import { OpusDecoder } from './OpusDecoder/OpusDecoder'; import type { TranscriptionMessage } from './transcriberproxy'; import { getTurnDetectionConfig } from './utils'; +import { writeMetric } from './metrics'; // Type definition augmentation for Uint8Array - Cloudflare Worker's JS has these methods but TypeScript doesn't have // declarations for them as of version 5.9.3. @@ -94,9 +95,13 @@ export class OutgoingConnection { onInterimTranscription?: (message: TranscriptionMessage) => void = undefined; onCompleteTranscription?: (message: TranscriptionMessage) => void = undefined; onClosed?: (tag: string) => void = undefined; + onOpenAIError?: (errorType: string, errorMessage: string) => void = undefined; + + private env: Env; constructor(tag: string, env: Env) { this.setTag(tag); + this.env = env; this.initializeOpusDecoder(); this.initializeOpenAIWebSocket(env); @@ -191,6 +196,12 @@ export class OutgoingConnection { openaiWs.addEventListener('error', (error) => { console.error(`OpenAI WebSocket error for tag ${this._tag}:`, error); + writeMetric(this.env.METRICS, { + name: 'openai_api_error', + worker: 'opus-transcriber-proxy', + errorType: 'websocket_error', + }); + this.onOpenAIError?.('websocket_error', 'WebSocket connection error'); this.doClose(true); this.connectionStatus = 'failed'; }); @@ -202,6 +213,12 @@ export class OutgoingConnection { }); } catch (error) { console.error(`Failed to create OpenAI WebSocket connection for tag ${this._tag}:`, error); + writeMetric(this.env.METRICS, { + name: 'openai_api_error', + worker: 'opus-transcriber-proxy', + errorType: 'connection_failed', + }); + this.onOpenAIError?.('connection_failed', error instanceof Error ? error.message : 'Unknown error'); this.connectionStatus = 'failed'; } } @@ -442,6 +459,12 @@ export class OutgoingConnection { this.setTag(this.pendingTags.shift()!); } else if (parsedMessage.type === 'error') { console.error(`OpenAI sent error message for ${this._tag}: ${data}`); + writeMetric(this.env.METRICS, { + name: 'openai_api_error', + worker: 'opus-transcriber-proxy', + errorType: 'api_error', + }); + this.onOpenAIError?.('api_error', parsedMessage.error?.message || data); this.doClose(true); } // TODO: are there any other messages we care about? diff --git a/src/index.ts b/src/index.ts index c206db8..b2dd381 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import { extractSessionParameters } from './utils'; import { TranscriberProxy, type TranscriptionMessage } from './transcriberproxy'; import { Transcriptionator } from './transcriptionator'; import { WorkerEntrypoint } from 'cloudflare:workers'; +import { writeMetric } from './metrics'; export interface DispatcherTranscriptionMessage { sessionId: string; @@ -110,6 +111,13 @@ export default { } session.on('transcription', (data: TranscriptionMessage) => { + // Track successful transcription + writeMetric(env.METRICS, { + name: 'transcription_success', + worker: 'opus-transcriber-proxy', + sessionId: sessionId ?? undefined, + }); + const message = outbound || transcriptionator || sendBack ? JSON.stringify(data) : ''; outbound?.send(message); transcriptionator?.broadcastMessage(message); diff --git a/src/metrics.ts b/src/metrics.ts new file mode 100644 index 0000000..91d16dc --- /dev/null +++ b/src/metrics.ts @@ -0,0 +1,60 @@ +/** + * Metrics service for writing to Cloudflare Analytics Engine + * Provides consistent metric structure across the transcription pipeline + * + * Note: Environment is not tracked since each Cloudflare account represents + * a separate environment (dev, staging, prod). + */ + +export type MetricName = + | 'ingester_success' + | 'ingester_failure' + | 'dispatcher_success' + | 'dispatcher_failure' + | 'transcription_success' + | 'transcription_failure' + | 'openai_api_error'; + +export interface MetricEvent { + name: MetricName; + worker: 'webhook-ingester' | 'transcription-dispatcher' | 'opus-transcriber-proxy'; + errorType?: string; + sessionId?: string; + targetName?: string; + latencyMs?: number; +} + +/** + * Writes a metric data point to Analytics Engine + * + * Schema: + * - blob1: metric_name (e.g., 'ingester_success', 'transcription_failure') + * - blob2: worker_name (e.g., 'webhook-ingester') + * - blob3: error_type (optional, for failures) + * - blob4: session_id (optional, for correlation) + * - blob5: target_name (optional, for dispatcher) + * - double1: count (always 1) + * - double2: latency_ms (optional) + * - index1: session_id (for sampling) + */ +export function writeMetric( + analytics: AnalyticsEngineDataset | undefined, + event: MetricEvent +): void { + if (!analytics) { + console.warn('Analytics Engine not configured, skipping metric:', event.name); + return; + } + + analytics.writeDataPoint({ + blobs: [ + event.name, + event.worker, + event.errorType ?? '', + event.sessionId ?? '', + event.targetName ?? '', + ], + doubles: [1, event.latencyMs ?? 0], + indexes: [event.sessionId ?? ''], + }); +} diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index faeeb12..b8d9fb3 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -12,6 +12,7 @@ declare namespace Cloudflare { OPENAI_TURN_DETECTION?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; TRANSCRIPTION_DISPATCHER: Fetcher /* transcription-dispatcher */; + METRICS?: AnalyticsEngineDataset; } } interface Env extends Cloudflare.Env {} diff --git a/wrangler.jsonc b/wrangler.jsonc index e3b60a3..ff12da7 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -43,6 +43,13 @@ * https://developers.cloudflare.com/workers/wrangler/configuration/#service-bindings */ "services": [{ "binding": "TRANSCRIPTION_DISPATCHER", "service": "transcription-dispatcher" }], + /** + * Analytics Engine binding for transcription metrics + * https://developers.cloudflare.com/analytics/analytics-engine/get-started/ + */ + "analytics_engine_datasets": [ + { "binding": "METRICS", "dataset": "transcription-metrics" } + ], "durable_objects": { "bindings": [ { From 735239a02195edb0d509e89fa9154389efa88477 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Mon, 8 Dec 2025 13:38:48 -0500 Subject: [PATCH 24/48] Close the worker websocket on Opus or OpenAI failure. (#4) --- src/OutgoingConnection.ts | 8 ++++++++ src/index.ts | 8 ++++++++ src/transcriberproxy.ts | 3 +++ 3 files changed, 19 insertions(+) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 80a02aa..1a2d053 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -96,6 +96,7 @@ export class OutgoingConnection { onCompleteTranscription?: (message: TranscriptionMessage) => void = undefined; onClosed?: (tag: string) => void = undefined; onOpenAIError?: (errorType: string, errorMessage: string) => void = undefined; + onError?: (tag: string, error: any) => void = undefined; private env: Env; @@ -142,6 +143,8 @@ export class OutgoingConnection { } catch (error) { console.error(`Failed to create Opus decoder for tag ${this._tag}:`, error); this.decoderStatus = 'failed'; + this.doClose(true); + this.onError?.(this._tag, `Error initializing Opus decoder: ${error instanceof Error ? error.message : String(error)}`); } } @@ -204,6 +207,7 @@ export class OutgoingConnection { this.onOpenAIError?.('websocket_error', 'WebSocket connection error'); this.doClose(true); this.connectionStatus = 'failed'; + this.onError?.(this._tag, `Error connecting to OpenAI service: ${error instanceof Error ? error.message : String(error)}`); }); openaiWs.addEventListener('close', () => { @@ -291,6 +295,7 @@ export class OutgoingConnection { this.sendOrEnqueueDecodedAudio(concealedAudio.pcmData); } catch (error) { console.error(`Error concealing ${samplesToConceal} samples for tag ${this._tag}:`, error); + // Don't call onError for concealment errors, as they may be transient } } @@ -305,6 +310,7 @@ export class OutgoingConnection { const decodedAudio = this.opusDecoder.decodeFrame(opusFrame); if (decodedAudio.errors.length > 0) { console.error(`Opus decoding errors for tag ${this._tag}:`, decodedAudio.errors); + // Don't call onError for decoding errors, as they may be transient return; } this.lastOpusFrameSize = decodedAudio.samplesDecoded; @@ -371,6 +377,7 @@ export class OutgoingConnection { this.openaiWebSocket.send(audioMessageString); } catch (error) { console.error(`Failed to send audio to OpenAI for tag ${this._tag}`, error); + // TODO should this call onError? } } @@ -466,6 +473,7 @@ export class OutgoingConnection { }); this.onOpenAIError?.('api_error', parsedMessage.error?.message || data); this.doClose(true); + this.onError?.(this._tag, `OpenAI service sent error message: ${data}`); } // TODO: are there any other messages we care about? } diff --git a/src/index.ts b/src/index.ts index b2dd381..dd00156 100644 --- a/src/index.ts +++ b/src/index.ts @@ -99,6 +99,14 @@ export default { server.close(); }); + session.on('error', (tag, error) => { + const message = `Error in session ${tag}: ${error}`; + console.error(message); + outbound?.close(1001, message); + transcriptionator?.notifySessionClosed(); + server.close(1011, message); + }); + if (outbound || transcriptionator || sendBack) { session.on('interim_transcription', (data: TranscriptionMessage) => { const message = JSON.stringify(data); diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index ad047dc..840a091 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -75,6 +75,9 @@ export class TranscriberProxy extends EventEmitter { newConnection.onClosed = (tag) => { this.outgoingConnections.delete(tag); }; + newConnection.onError = (tag, error) => { + this.emit('error', tag, error); + }; this.outgoingConnections.set(tag, newConnection); console.log(`Created outgoing connection entry for tag: ${tag}`); From 5ec073088ade7907232a5f8bf7f95b3eb692d2ab Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Mon, 8 Dec 2025 16:20:26 -0500 Subject: [PATCH 25/48] Make sendBackInterim a separate URL parameter. (#5) --- src/index.ts | 6 +++--- src/utils.ts | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/index.ts b/src/index.ts index dd00156..573f9d6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,14 +38,14 @@ export default { const parameters = extractSessionParameters(request.url); console.log('Session parameters:', JSON.stringify(parameters)); - const { url, sessionId, transcribe, connect, useTranscriptionator, useDispatcher, sendBack } = parameters; + const { url, sessionId, transcribe, connect, useTranscriptionator, useDispatcher, sendBack, sendBackInterim } = parameters; if (!url.pathname.endsWith('/events') && !url.pathname.endsWith('/transcribe')) { return new Response('Bad URL', { status: 400 }); } if (transcribe) { - if (!useTranscriptionator && !useDispatcher && !sendBack && !connect) { + if (!useTranscriptionator && !useDispatcher && !sendBack && !sendBackInterim && !connect) { return new Response('No transcription output method specified', { status: 400 }); } @@ -107,7 +107,7 @@ export default { server.close(1011, message); }); - if (outbound || transcriptionator || sendBack) { + if (outbound || transcriptionator || sendBackInterim) { session.on('interim_transcription', (data: TranscriptionMessage) => { const message = JSON.stringify(data); outbound?.send(message); diff --git a/src/utils.ts b/src/utils.ts index 2e5b2dd..77e08a4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -5,7 +5,8 @@ export interface ISessionParameters { connect: string | null; useTranscriptionator: boolean; useDispatcher: boolean; - sendBack?: boolean; + sendBack: boolean; + sendBackInterim: boolean; } export function extractSessionParameters(url: string): ISessionParameters { @@ -16,6 +17,7 @@ export function extractSessionParameters(url: string): ISessionParameters { const useTranscriptionator = parsedUrl.searchParams.get('useTranscriptionator'); const useDispatcher = parsedUrl.searchParams.get('useDispatcher'); const sendBack = parsedUrl.searchParams.get('sendBack'); + const sendBackInterim = parsedUrl.searchParams.get('sendBackInterim'); return { url: parsedUrl, @@ -25,6 +27,7 @@ export function extractSessionParameters(url: string): ISessionParameters { useTranscriptionator: !!useTranscriptionator, useDispatcher: !!useDispatcher, sendBack: !!sendBack, + sendBackInterim: !!sendBackInterim, }; } From eb24dfc82b3b08629af84495e70e167b5d3ac524 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Mon, 8 Dec 2025 16:35:05 -0500 Subject: [PATCH 26/48] Catch and handle errors from EventEmitter callbacks. (#6) I'm not sure what Cloudflare does if you don't do this, but it may relate to how we saw a callback stop firing. --- src/index.ts | 15 ++++++++++----- src/transcriberproxy.ts | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/index.ts b/src/index.ts index 573f9d6..da8be7f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -100,11 +100,16 @@ export default { }); session.on('error', (tag, error) => { - const message = `Error in session ${tag}: ${error}`; - console.error(message); - outbound?.close(1001, message); - transcriptionator?.notifySessionClosed(); - server.close(1011, message); + try { + const message = `Error in session ${tag}: ${error instanceof Error ? error.message : String(error)}`; + console.error(message); + outbound?.close(1001, message); + transcriptionator?.notifySessionClosed();q + server.close(1011, message); + } catch (closeError) { + // Error handlers do not themselves catch errors, so log to console + console.error(`Failed to close connections after error in session ${tag}: ${closeError instanceof Error ? closeError.message : String(closeError)}`); + } }); if (outbound || transcriptionator || sendBackInterim) { diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index 840a091..bca0da8 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -22,7 +22,7 @@ export class TranscriberProxy extends EventEmitter { private env: Env; constructor(ws: WebSocket, env: Env) { - super(); + super({captureRejections: true}); this.ws = ws; this.env = env; this.outgoingConnections = new Map(); From f81388a1b4180e6c5f720078a8c499c09c66d741 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Mon, 8 Dec 2025 16:35:22 -0500 Subject: [PATCH 27/48] Explicitly compare URL param values with 'true'. (#7) Previously we just checked if they were truthy, but "false" is truthy... --- src/utils.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 77e08a4..565da4c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -24,10 +24,10 @@ export function extractSessionParameters(url: string): ISessionParameters { sessionId, transcribe, connect, - useTranscriptionator: !!useTranscriptionator, - useDispatcher: !!useDispatcher, - sendBack: !!sendBack, - sendBackInterim: !!sendBackInterim, + useTranscriptionator: useTranscriptionator === 'true', + useDispatcher: useDispatcher === 'true', + sendBack: sendBack === 'true', + sendBackInterim: sendBackInterim === 'true', }; } From 9868f1ccdd856ca3e6921a14b847286e5d2bdd99 Mon Sep 17 00:00:00 2001 From: Aaron van Meerten Date: Tue, 9 Dec 2025 08:58:52 +0100 Subject: [PATCH 28/48] feat: logpush for opus-transcriber-proxy (#8) Enable logpush feature for the worker --- wrangler.jsonc | 1 + 1 file changed, 1 insertion(+) diff --git a/wrangler.jsonc b/wrangler.jsonc index ff12da7..b20684e 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -13,6 +13,7 @@ "observability": { "enabled": true }, + "logpush": true, /** * Smart Placement * Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement From 3a3e0d13e5e67aabe062bb84bdbc902883356c50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:55:41 +0200 Subject: [PATCH 29/48] Add a connection language param, leave empty for auto (#9) --- src/OutgoingConnection.ts | 18 ++++++++++++------ src/index.ts | 6 +++--- src/transcriberproxy.ts | 10 ++++++++-- src/utils.ts | 3 +++ 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 1a2d053..59fce37 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -1,5 +1,5 @@ import { OpusDecoder } from './OpusDecoder/OpusDecoder'; -import type { TranscriptionMessage } from './transcriberproxy'; +import type { TranscriptionMessage, TranscriberProxyOptions } from './transcriberproxy'; import { getTurnDetectionConfig } from './utils'; import { writeMetric } from './metrics'; @@ -99,10 +99,12 @@ export class OutgoingConnection { onError?: (tag: string, error: any) => void = undefined; private env: Env; + private options: TranscriberProxyOptions; - constructor(tag: string, env: Env) { + constructor(tag: string, env: Env, options: TranscriberProxyOptions) { this.setTag(tag); this.env = env; + this.options = options; this.initializeOpusDecoder(); this.initializeOpenAIWebSocket(env); @@ -160,6 +162,13 @@ export class OutgoingConnection { console.log(`OpenAI WebSocket connected for tag: ${this._tag}`); this.connectionStatus = 'connected'; + const transcriptionConfig: { model: string; language?: string } = { + model: env.OPENAI_MODEL || 'gpt-4o-mini-transcribe', + }; + if (this.options.language !== null) { + transcriptionConfig.language = this.options.language; + } + const sessionConfig = { type: 'session.update', session: { @@ -173,10 +182,7 @@ export class OutgoingConnection { noise_reduction: { type: 'near_field', }, - transcription: { - model: env.OPENAI_MODEL || 'gpt-4o-mini-transcribe', - language: 'en', // TODO parameterize this - }, + transcription: transcriptionConfig, turn_detection: getTurnDetectionConfig(env), }, }, diff --git a/src/index.ts b/src/index.ts index da8be7f..daec42a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,7 +38,7 @@ export default { const parameters = extractSessionParameters(request.url); console.log('Session parameters:', JSON.stringify(parameters)); - const { url, sessionId, transcribe, connect, useTranscriptionator, useDispatcher, sendBack, sendBackInterim } = parameters; + const { url, sessionId, transcribe, connect, useTranscriptionator, useDispatcher, sendBack, sendBackInterim, language } = parameters; if (!url.pathname.endsWith('/events') && !url.pathname.endsWith('/transcribe')) { return new Response('Bad URL', { status: 400 }); @@ -54,7 +54,7 @@ export default { server.accept(); - const session = new TranscriberProxy(server, env); + const session = new TranscriberProxy(server, env, { language }); let outbound: WebSocket | undefined; let transcriptionator: DurableObjectStub | undefined; @@ -104,7 +104,7 @@ export default { const message = `Error in session ${tag}: ${error instanceof Error ? error.message : String(error)}`; console.error(message); outbound?.close(1001, message); - transcriptionator?.notifySessionClosed();q + transcriptionator?.notifySessionClosed(); server.close(1011, message); } catch (closeError) { // Error handlers do not themselves catch errors, so log to console diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index bca0da8..6ddc6b3 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -11,6 +11,10 @@ export interface TranscriptionMessage { timestamp: number; } +export interface TranscriberProxyOptions { + language: string | null; +} + export class TranscriberProxy extends EventEmitter { private readonly ws: WebSocket; private outgoingConnections: Map; @@ -20,11 +24,13 @@ export class TranscriberProxy extends EventEmitter { // three concurrent speakers. private MAX_OUTGOING_CONNECTIONS = 4; private env: Env; + private options: TranscriberProxyOptions; - constructor(ws: WebSocket, env: Env) { + constructor(ws: WebSocket, env: Env, options: TranscriberProxyOptions) { super({captureRejections: true}); this.ws = ws; this.env = env; + this.options = options; this.outgoingConnections = new Map(); this.ws.addEventListener('close', () => { @@ -64,7 +70,7 @@ export class TranscriberProxy extends EventEmitter { } if (this.outgoingConnections.size < this.MAX_OUTGOING_CONNECTIONS) { - const newConnection = new OutgoingConnection(tag, this.env); + const newConnection = new OutgoingConnection(tag, this.env, this.options); newConnection.onInterimTranscription = (message) => { this.emit('interim_transcription', message); diff --git a/src/utils.ts b/src/utils.ts index 565da4c..53d91be 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -7,6 +7,7 @@ export interface ISessionParameters { useDispatcher: boolean; sendBack: boolean; sendBackInterim: boolean; + language: string | null; } export function extractSessionParameters(url: string): ISessionParameters { @@ -18,6 +19,7 @@ export function extractSessionParameters(url: string): ISessionParameters { const useDispatcher = parsedUrl.searchParams.get('useDispatcher'); const sendBack = parsedUrl.searchParams.get('sendBack'); const sendBackInterim = parsedUrl.searchParams.get('sendBackInterim'); + const lang = parsedUrl.searchParams.get('lang'); return { url: parsedUrl, @@ -28,6 +30,7 @@ export function extractSessionParameters(url: string): ISessionParameters { useDispatcher: useDispatcher === 'true', sendBack: sendBack === 'true', sendBackInterim: sendBackInterim === 'true', + language: lang }; } From 38d73f61dfa1efd5352561ae69646b78de9bb3e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:58:38 +0200 Subject: [PATCH 30/48] disable dev domain (#10) --- wrangler.jsonc | 1 + 1 file changed, 1 insertion(+) diff --git a/wrangler.jsonc b/wrangler.jsonc index b20684e..5e2e1bd 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -14,6 +14,7 @@ "enabled": true }, "logpush": true, + "workers_dev": false, /** * Smart Placement * Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement From b6a6be56938f5891b61e28d40b32a3a315ae5b84 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Wed, 10 Dec 2025 16:48:29 -0500 Subject: [PATCH 31/48] Fix PLC logic. (#11) * Fix PLC logic. * Add metrics for various Opus events. * Run 'prettier'. --- src/OutgoingConnection.ts | 46 +++++++++++++++++++++++++------ src/index.ts | 4 ++- src/metrics.ts | 58 ++++++++++++++++++--------------------- src/transcriberproxy.ts | 2 +- src/utils.ts | 2 +- 5 files changed, 69 insertions(+), 43 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 59fce37..fc49ead 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -254,16 +254,21 @@ export class OutgoingConnection { } if (Number.isInteger(mediaEvent.media?.chunk) && Number.isInteger(mediaEvent.media.timestamp)) { - if (this.lastChunkNo != -1 && mediaEvent.media.chunk != this.lastChunkNo - 1) { + if (this.lastChunkNo != -1 && mediaEvent.media.chunk != this.lastChunkNo + 1) { const chunkDelta = mediaEvent.media.chunk - this.lastChunkNo; - const timestampDelta = mediaEvent.media.timestamp - this.lastTimestamp; - if (chunkDelta <= 0 || timestampDelta <= 0) { - // Packets reordered, drop this packet + if (chunkDelta <= 0) { + // Packets reordered or replayed, drop this packet + writeMetric(this.env.METRICS, { + name: 'opus_packet_discarded', + worker: 'opus-transcriber-proxy', + }); + return; } // Packets lost, do concealment if (this.decoderStatus == 'ready') { + const timestampDelta = mediaEvent.media.timestamp - this.lastTimestamp; // TODO: enqueue concealment actions? Not sure this is needed in practice. this.doConcealment(opusFrame, chunkDelta, timestampDelta); } @@ -289,16 +294,36 @@ export class OutgoingConnection { return; } + const lostFrames = chunkDelta - 1; + if (lostFrames <= 0) { + return; + } + if (this.lastOpusFrameSize <= 0) { + // Not sure how we could have gotten here if we've never decoded anything + return; + } + /* Make sure numbers make sense */ - const chunkDeltaInSamples = this.lastOpusFrameSize > 0 ? chunkDelta * this.lastOpusFrameSize : Infinity; - const timestampDeltaInSamples = (timestampDelta / 48000) * 24000; + const lostFramesInSamples = lostFrames * this.lastOpusFrameSize; + const timestampDeltaInSamples = timestampDelta > 0 ? (timestampDelta / 48000) * 24000 : Infinity; const maxConcealment = 120 * 24; /* 120 ms at 24 kHz */ - const samplesToConceal = Math.min(chunkDeltaInSamples, timestampDeltaInSamples, maxConcealment); + const samplesToConceal = Math.min(lostFramesInSamples, timestampDeltaInSamples, maxConcealment); try { const concealedAudio = this.opusDecoder.conceal(opusFrame, samplesToConceal); - this.sendOrEnqueueDecodedAudio(concealedAudio.pcmData); + if (concealedAudio.errors.length > 0) { + writeMetric(this.env.METRICS, { + name: 'opus_decode_failure', + worker: 'opus-transcriber-proxy', + }); + } else { + this.sendOrEnqueueDecodedAudio(concealedAudio.pcmData); + writeMetric(this.env.METRICS, { + name: 'opus_loss_concealment', + worker: 'opus-transcriber-proxy', + }); + } } catch (error) { console.error(`Error concealing ${samplesToConceal} samples for tag ${this._tag}:`, error); // Don't call onError for concealment errors, as they may be transient @@ -316,6 +341,11 @@ export class OutgoingConnection { const decodedAudio = this.opusDecoder.decodeFrame(opusFrame); if (decodedAudio.errors.length > 0) { console.error(`Opus decoding errors for tag ${this._tag}:`, decodedAudio.errors); + writeMetric(this.env.METRICS, { + name: 'opus_decode_failure', + worker: 'opus-transcriber-proxy', + }); + // Don't call onError for decoding errors, as they may be transient return; } diff --git a/src/index.ts b/src/index.ts index daec42a..cce5be5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -108,7 +108,9 @@ export default { server.close(1011, message); } catch (closeError) { // Error handlers do not themselves catch errors, so log to console - console.error(`Failed to close connections after error in session ${tag}: ${closeError instanceof Error ? closeError.message : String(closeError)}`); + console.error( + `Failed to close connections after error in session ${tag}: ${closeError instanceof Error ? closeError.message : String(closeError)}`, + ); } }); diff --git a/src/metrics.ts b/src/metrics.ts index 91d16dc..1f1e843 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -7,21 +7,24 @@ */ export type MetricName = - | 'ingester_success' - | 'ingester_failure' - | 'dispatcher_success' - | 'dispatcher_failure' - | 'transcription_success' - | 'transcription_failure' - | 'openai_api_error'; + | 'ingester_success' + | 'ingester_failure' + | 'dispatcher_success' + | 'dispatcher_failure' + | 'transcription_success' + | 'transcription_failure' + | 'openai_api_error' + | 'opus_loss_concealment' + | 'opus_decode_failure' + | 'opus_packet_discarded'; export interface MetricEvent { - name: MetricName; - worker: 'webhook-ingester' | 'transcription-dispatcher' | 'opus-transcriber-proxy'; - errorType?: string; - sessionId?: string; - targetName?: string; - latencyMs?: number; + name: MetricName; + worker: 'webhook-ingester' | 'transcription-dispatcher' | 'opus-transcriber-proxy'; + errorType?: string; + sessionId?: string; + targetName?: string; + latencyMs?: number; } /** @@ -37,24 +40,15 @@ export interface MetricEvent { * - double2: latency_ms (optional) * - index1: session_id (for sampling) */ -export function writeMetric( - analytics: AnalyticsEngineDataset | undefined, - event: MetricEvent -): void { - if (!analytics) { - console.warn('Analytics Engine not configured, skipping metric:', event.name); - return; - } +export function writeMetric(analytics: AnalyticsEngineDataset | undefined, event: MetricEvent): void { + if (!analytics) { + console.warn('Analytics Engine not configured, skipping metric:', event.name); + return; + } - analytics.writeDataPoint({ - blobs: [ - event.name, - event.worker, - event.errorType ?? '', - event.sessionId ?? '', - event.targetName ?? '', - ], - doubles: [1, event.latencyMs ?? 0], - indexes: [event.sessionId ?? ''], - }); + analytics.writeDataPoint({ + blobs: [event.name, event.worker, event.errorType ?? '', event.sessionId ?? '', event.targetName ?? ''], + doubles: [1, event.latencyMs ?? 0], + indexes: [event.sessionId ?? ''], + }); } diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index 6ddc6b3..9cea5fb 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -27,7 +27,7 @@ export class TranscriberProxy extends EventEmitter { private options: TranscriberProxyOptions; constructor(ws: WebSocket, env: Env, options: TranscriberProxyOptions) { - super({captureRejections: true}); + super({ captureRejections: true }); this.ws = ws; this.env = env; this.options = options; diff --git a/src/utils.ts b/src/utils.ts index 53d91be..eb4d2c3 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -30,7 +30,7 @@ export function extractSessionParameters(url: string): ISessionParameters { useDispatcher: useDispatcher === 'true', sendBack: sendBack === 'true', sendBackInterim: sendBackInterim === 'true', - language: lang + language: lang, }; } From 98ceeb2f8f3e91c4786a36ccfafe7469b10ba2a6 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Thu, 11 Dec 2025 16:46:20 -0500 Subject: [PATCH 32/48] Add some metrics for normal correct behavior. (#12) * Add some metrics for normal correct behavior. * Only log happy-path metrics if env.DEBUG is true. --- src/OutgoingConnection.ts | 31 +++++++++++++++++++++++++++++++ src/metrics.ts | 9 +++++++-- worker-configuration.d.ts | 1 + 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index fc49ead..467c7de 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -253,6 +253,13 @@ export class OutgoingConnection { return; } + if (this.env.DEBUG === 'true') { + writeMetric(this.env.METRICS, { + name: 'opus_packet_received', + worker: 'opus-transcriber-proxy', + }); + } + if (Number.isInteger(mediaEvent.media?.chunk) && Number.isInteger(mediaEvent.media.timestamp)) { if (this.lastChunkNo != -1 && mediaEvent.media.chunk != this.lastChunkNo + 1) { const chunkDelta = mediaEvent.media.chunk - this.lastChunkNo; @@ -282,6 +289,12 @@ export class OutgoingConnection { } else if (this.decoderStatus === 'pending') { // Queue the binary data until decoder is ready this.pendingOpusFrames.push(opusFrame); + if (this.env.DEBUG === 'true') { + writeMetric(this.env.METRICS, { + name: 'opus_packet_queued', + worker: 'opus-transcriber-proxy', + }); + } // console.log(`Queued opus frame for tag: ${this.tag} (queue size: ${this.pendingOpusFrames.length})`); } else { console.log(`Not queueing opus frame for tag: ${this._tag}: decoder ${this.decoderStatus}`); @@ -349,6 +362,12 @@ export class OutgoingConnection { // Don't call onError for decoding errors, as they may be transient return; } + if (this.env.DEBUG === 'true') { + writeMetric(this.env.METRICS, { + name: 'opus_packet_decoded', + worker: 'opus-transcriber-proxy', + }); + } this.lastOpusFrameSize = decodedAudio.samplesDecoded; this.sendOrEnqueueDecodedAudio(decodedAudio.pcmData); } catch (error) { @@ -376,6 +395,12 @@ export class OutgoingConnection { this.pendingAudioDataBuffer.resize(uint8Data.byteLength); this.pendingAudioData.set(uint8Data); } + if (this.env.DEBUG === 'true') { + writeMetric(this.env.METRICS, { + name: 'openai_audio_queued', + worker: 'opus-transcriber-proxy', + }); + } } else { console.log(`Not queueing audio data for tag: ${this._tag}: connection ${this.connectionStatus}`); } @@ -411,6 +436,12 @@ export class OutgoingConnection { const audioMessageString = JSON.stringify(audioMessage); this.openaiWebSocket.send(audioMessageString); + if (this.env.DEBUG === 'true') { + writeMetric(this.env.METRICS, { + name: 'openai_audio_sent', + worker: 'opus-transcriber-proxy', + }); + } } catch (error) { console.error(`Failed to send audio to OpenAI for tag ${this._tag}`, error); // TODO should this call onError? diff --git a/src/metrics.ts b/src/metrics.ts index 1f1e843..70192e8 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -13,10 +13,15 @@ export type MetricName = | 'dispatcher_failure' | 'transcription_success' | 'transcription_failure' - | 'openai_api_error' + | 'opus_packet_received' + | 'opus_packet_queued' | 'opus_loss_concealment' + | 'opus_packet_decoded' | 'opus_decode_failure' - | 'opus_packet_discarded'; + | 'opus_packet_discarded' + | 'openai_audio_queued' + | 'openai_audio_sent' + | 'openai_api_error'; export interface MetricEvent { name: MetricName; diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index b8d9fb3..935c90f 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -10,6 +10,7 @@ declare namespace Cloudflare { OPENAI_API_KEY: string; OPENAI_MODEL?: string; OPENAI_TURN_DETECTION?: string; + DEBUG?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; TRANSCRIPTION_DISPATCHER: Fetcher /* transcription-dispatcher */; METRICS?: AnalyticsEngineDataset; From 6b687f4f16f856ace5932b6415c64b56ebefbeaf Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Thu, 11 Dec 2025 16:52:32 -0500 Subject: [PATCH 33/48] Commit audio to OpenAI when resetting a connection, before clearing it. (#13) --- src/OutgoingConnection.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 467c7de..7c8c856 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -113,6 +113,10 @@ export class OutgoingConnection { reset(newTag: string) { if (this.connectionStatus == 'connected') { this.pendingTags.push(newTag); + + const commitMessage = { type: 'input_audio_buffer.commit' }; + this.openaiWebSocket?.send(JSON.stringify(commitMessage)); + const clearMessage = { type: 'input_audio_buffer.clear' }; this.openaiWebSocket?.send(JSON.stringify(clearMessage)); } else { From 3c3a69b0c67d678a05aea5cfb4e459528bb94e52 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Thu, 11 Dec 2025 16:58:19 -0500 Subject: [PATCH 34/48] Receive and process transcription failure messages, and add a metric for them. (#14) --- src/OutgoingConnection.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 7c8c856..c3494a5 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -532,6 +532,12 @@ export class OutgoingConnection { false, ); this.onCompleteTranscription?.(transcription); + } else if (parsedMessage.type === 'conversation.item.input_audio_transcription.failed') { + console.error(`OpenAI failed to transcribe audio for tag ${this._tag}: ${data}`); + writeMetric(this.env.METRICS, { + name: 'transcription_failure', + worker: 'opus-transcriber-proxy', + }); } else if (parsedMessage.type === 'input_audio_buffer.cleared') { // Reset completed this.setTag(this.pendingTags.shift()!); From dac9ee1b9c87c56c24be5d2d40706696bdd164b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Mon, 15 Dec 2025 10:27:25 +0200 Subject: [PATCH 35/48] Rpurdel/fixes (#15) * fix: rm var shadowing * fix: avoid race condition between reset triggered and OpenAI response * fix: memory leaks on error, don't close already closed connection * fix: check if shift returns undefined * fix: if undefined set to now on interim --- src/OutgoingConnection.ts | 40 ++++++++++++++++++++++++++++++++------- src/index.ts | 2 +- src/transcriptionator.ts | 6 +++++- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index c3494a5..f3c925c 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -78,6 +78,7 @@ export class OutgoingConnection { private openaiWebSocket?: WebSocket; private pendingOpusFrames: Uint8Array[] = []; private pendingAudioDataBuffer = new ArrayBuffer(0, { maxByteLength: MAX_AUDIO_BLOCK_BYTES }); + private switchAudioBuffer = new ArrayBuffer(0, { maxByteLength: MAX_AUDIO_BLOCK_BYTES }); private pendingAudioData: Uint8Array = new Uint8Array(this.pendingAudioDataBuffer); private pendingAudioFrames: string[] = []; @@ -91,6 +92,8 @@ export class OutgoingConnection { private lastOpusFrameSize: number = -1; private lastTranscriptTime?: number = undefined; + private resetting: boolean = false; + private switchAudioData: Uint8Array = new Uint8Array(this.switchAudioBuffer); onInterimTranscription?: (message: TranscriptionMessage) => void = undefined; onCompleteTranscription?: (message: TranscriptionMessage) => void = undefined; @@ -112,6 +115,7 @@ export class OutgoingConnection { reset(newTag: string) { if (this.connectionStatus == 'connected') { + this.resetting = true; this.pendingTags.push(newTag); const commitMessage = { type: 'input_audio_buffer.commit' }; @@ -128,10 +132,6 @@ export class OutgoingConnection { // Reset the pending audio buffer this.pendingAudioFrames = []; this.pendingAudioDataBuffer.resize(0); - - this.lastChunkNo = -1; - this.lastTimestamp = -1; - this.lastOpusFrameSize = -1; } private async initializeOpusDecoder(): Promise { @@ -383,6 +383,16 @@ export class OutgoingConnection { const uint8Data = new Uint8Array(pcmData.buffer, pcmData.byteOffset, pcmData.byteLength); if (this.connectionStatus === 'connected' && this.openaiWebSocket) { + // If in the process of switching (after reset), buffer audio for the new participant + if (this.resetting) { + if (this.switchAudioData.length + uint8Data.length <= MAX_AUDIO_BLOCK_BYTES) { + const oldLength = this.switchAudioData.length; + this.switchAudioBuffer.resize(this.switchAudioData.byteLength + uint8Data.byteLength); + this.switchAudioData.set(uint8Data, oldLength); + } + // If exceeds buffer, drop the audio (unlikely given short reset window) + return; + } const encodedAudio = uint8Data.toBase64(); this.sendAudioToOpenAI(encodedAudio); } else if (this.connectionStatus === 'pending') { @@ -509,7 +519,7 @@ export class OutgoingConnection { } if (parsedMessage.type === 'conversation.item.input_audio_transcription.delta') { const now = Date.now(); - if (this.lastTranscriptTime !== undefined) { + if (this.lastTranscriptTime === undefined) { this.lastTranscriptTime = now; } const confidence = parsedMessage.logprobs?.[0]?.logprob !== undefined ? Math.exp(parsedMessage.logprobs[0].logprob) : undefined; @@ -539,8 +549,24 @@ export class OutgoingConnection { worker: 'opus-transcriber-proxy', }); } else if (parsedMessage.type === 'input_audio_buffer.cleared') { - // Reset completed - this.setTag(this.pendingTags.shift()!); + // Reset completed - update state atomically + this.lastChunkNo = -1; + this.lastTimestamp = -1; + this.lastOpusFrameSize = -1; + const nextTag = this.pendingTags.shift(); + if (nextTag !== undefined) { + this.setTag(nextTag); + } else { + console.error('Received cleared event but no pending tag available.'); + } + this.resetting = false; + + // Send any audio that was buffered during the reset + if (this.switchAudioData.length > 0) { + const encodedAudio = safeToBase64(this.switchAudioData); + this.sendAudioToOpenAI(encodedAudio); + this.switchAudioBuffer.resize(0); + } } else if (parsedMessage.type === 'error') { console.error(`OpenAI sent error message for ${this._tag}: ${data}`); writeMetric(this.env.METRICS, { diff --git a/src/index.ts b/src/index.ts index cce5be5..39fabb4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -62,7 +62,7 @@ export default { if (connect) { try { - const outbound = new WebSocket(connect, ['transcription']); + outbound = new WebSocket(connect, ['transcription']); // TODO: pass auth info to this websocket outbound.addEventListener('close', () => { diff --git a/src/transcriptionator.ts b/src/transcriptionator.ts index da5fd2a..65eb416 100644 --- a/src/transcriptionator.ts +++ b/src/transcriptionator.ts @@ -22,7 +22,11 @@ export class Transcriptionator extends DurableObject { this.observers.add(server); server.addEventListener('close', () => { this.observers.delete(server); - server.close(); + }); + + server.addEventListener('error', () => { + this.observers.delete(server); + console.error(`Observer connection closed with error.`); }); return new Response(null, { From 06faf9066a6ac27e3c8f5ef8f78606c348c178b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Mon, 15 Dec 2025 11:53:01 +0200 Subject: [PATCH 36/48] chore: log unhandled openai events (#16) * chore: log unhandled openai events * chore: run prettier --- src/OutgoingConnection.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index f3c925c..de1e19c 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -117,7 +117,7 @@ export class OutgoingConnection { if (this.connectionStatus == 'connected') { this.resetting = true; this.pendingTags.push(newTag); - + const commitMessage = { type: 'input_audio_buffer.commit' }; this.openaiWebSocket?.send(JSON.stringify(commitMessage)); @@ -577,8 +577,14 @@ export class OutgoingConnection { this.onOpenAIError?.('api_error', parsedMessage.error?.message || data); this.doClose(true); this.onError?.(this._tag, `OpenAI service sent error message: ${data}`); + } else if ( + parsedMessage.type !== 'session.created' && + parsedMessage.type !== 'session.updated' && + parsedMessage.type !== 'input_audio_buffer.committed' + ) { + // Log unexpected message types that might indicate issues + console.warn(`Unhandled OpenAI message type for ${this._tag}: ${parsedMessage.type}`); } - // TODO: are there any other messages we care about? } close(): void { From 39bbfe2032c13c9b1c06362b747b6a3bd2125970 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Mon, 15 Dec 2025 16:32:23 -0500 Subject: [PATCH 37/48] Add some more normal OpenAI messages that shouldn't trigger an unexpected message log. (#18) --- src/OutgoingConnection.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index de1e19c..d77f82a 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -580,7 +580,11 @@ export class OutgoingConnection { } else if ( parsedMessage.type !== 'session.created' && parsedMessage.type !== 'session.updated' && - parsedMessage.type !== 'input_audio_buffer.committed' + parsedMessage.type !== 'input_audio_buffer.committed' && + parsedMessage.type !== 'input_audio_buffer.speech_started' && + parsedMessage.type !== 'input_audio_buffer.speech_stopped' && + parsedMessage.type !== 'conversation.item.added' && + parsedMessage.type !== 'conversation.item.done' ) { // Log unexpected message types that might indicate issues console.warn(`Unhandled OpenAI message type for ${this._tag}: ${parsedMessage.type}`); From 7818da18ecb1ec12aec3a1a7ea9added2caa60d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Tue, 16 Dec 2025 10:00:18 +0200 Subject: [PATCH 38/48] revert tag switch changes (#19) --- src/OutgoingConnection.ts | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index d77f82a..a1f0a38 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -78,7 +78,6 @@ export class OutgoingConnection { private openaiWebSocket?: WebSocket; private pendingOpusFrames: Uint8Array[] = []; private pendingAudioDataBuffer = new ArrayBuffer(0, { maxByteLength: MAX_AUDIO_BLOCK_BYTES }); - private switchAudioBuffer = new ArrayBuffer(0, { maxByteLength: MAX_AUDIO_BLOCK_BYTES }); private pendingAudioData: Uint8Array = new Uint8Array(this.pendingAudioDataBuffer); private pendingAudioFrames: string[] = []; @@ -92,8 +91,6 @@ export class OutgoingConnection { private lastOpusFrameSize: number = -1; private lastTranscriptTime?: number = undefined; - private resetting: boolean = false; - private switchAudioData: Uint8Array = new Uint8Array(this.switchAudioBuffer); onInterimTranscription?: (message: TranscriptionMessage) => void = undefined; onCompleteTranscription?: (message: TranscriptionMessage) => void = undefined; @@ -115,7 +112,6 @@ export class OutgoingConnection { reset(newTag: string) { if (this.connectionStatus == 'connected') { - this.resetting = true; this.pendingTags.push(newTag); const commitMessage = { type: 'input_audio_buffer.commit' }; @@ -132,6 +128,10 @@ export class OutgoingConnection { // Reset the pending audio buffer this.pendingAudioFrames = []; this.pendingAudioDataBuffer.resize(0); + + this.lastChunkNo = -1; + this.lastTimestamp = -1; + this.lastOpusFrameSize = -1; } private async initializeOpusDecoder(): Promise { @@ -383,16 +383,6 @@ export class OutgoingConnection { const uint8Data = new Uint8Array(pcmData.buffer, pcmData.byteOffset, pcmData.byteLength); if (this.connectionStatus === 'connected' && this.openaiWebSocket) { - // If in the process of switching (after reset), buffer audio for the new participant - if (this.resetting) { - if (this.switchAudioData.length + uint8Data.length <= MAX_AUDIO_BLOCK_BYTES) { - const oldLength = this.switchAudioData.length; - this.switchAudioBuffer.resize(this.switchAudioData.byteLength + uint8Data.byteLength); - this.switchAudioData.set(uint8Data, oldLength); - } - // If exceeds buffer, drop the audio (unlikely given short reset window) - return; - } const encodedAudio = uint8Data.toBase64(); this.sendAudioToOpenAI(encodedAudio); } else if (this.connectionStatus === 'pending') { @@ -549,24 +539,14 @@ export class OutgoingConnection { worker: 'opus-transcriber-proxy', }); } else if (parsedMessage.type === 'input_audio_buffer.cleared') { - // Reset completed - update state atomically - this.lastChunkNo = -1; - this.lastTimestamp = -1; - this.lastOpusFrameSize = -1; + // Reset completed const nextTag = this.pendingTags.shift(); if (nextTag !== undefined) { this.setTag(nextTag); } else { console.error('Received cleared event but no pending tag available.'); } - this.resetting = false; - // Send any audio that was buffered during the reset - if (this.switchAudioData.length > 0) { - const encodedAudio = safeToBase64(this.switchAudioData); - this.sendAudioToOpenAI(encodedAudio); - this.switchAudioBuffer.resize(0); - } } else if (parsedMessage.type === 'error') { console.error(`OpenAI sent error message for ${this._tag}: ${data}`); writeMetric(this.env.METRICS, { From 12568b46dc5e396420b6bab1c1a78a8fb54068f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Tue, 16 Dec 2025 15:48:23 +0200 Subject: [PATCH 39/48] feat: heartbeat (#21) * feat: add pong to ping * feat: extend cpu limit to 5 minutes --- src/transcriberproxy.ts | 6 ++++-- wrangler.jsonc | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index 9cea5fb..1b1e511 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -46,8 +46,10 @@ export class TranscriberProxy extends EventEmitter { console.error('Failed to parse message as JSON:', parseError); parsedMessage = { raw: event.data, parseError: true }; } - // TODO: are there any other events that need to be handled? - if (parsedMessage && parsedMessage.event === 'media') { + + if (parsedMessage && parsedMessage.event === 'ping') { + this.ws.send(JSON.stringify({ event: 'pong' })); + } else if (parsedMessage && parsedMessage.event === 'media') { this.handleMediaEvent(parsedMessage); } }); diff --git a/wrangler.jsonc b/wrangler.jsonc index 5e2e1bd..076f348 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -45,6 +45,9 @@ * https://developers.cloudflare.com/workers/wrangler/configuration/#service-bindings */ "services": [{ "binding": "TRANSCRIPTION_DISPATCHER", "service": "transcription-dispatcher" }], + "limits": { + "cpu_ms": 300000, + }, /** * Analytics Engine binding for transcription metrics * https://developers.cloudflare.com/analytics/analytics-engine/get-started/ From 29f927c20f4eae9ce6a329baf8b8cbd34e3bce41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Tue, 16 Dec 2025 15:54:12 +0200 Subject: [PATCH 40/48] feat: transcription flush (#20) * feat: transcription flush * fix: add missing comma --- src/OutgoingConnection.ts | 37 +++++++++++++++++++++++++++++++++++++ worker-configuration.d.ts | 1 + wrangler.jsonc | 8 +++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index a1f0a38..0e0340c 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -92,6 +92,9 @@ export class OutgoingConnection { private lastTranscriptTime?: number = undefined; + // Idle commit timeout - forces transcription when audio stops + private idleCommitTimeout: ReturnType | null = null; + onInterimTranscription?: (message: TranscriptionMessage) => void = undefined; onCompleteTranscription?: (message: TranscriptionMessage) => void = undefined; onClosed?: (tag: string) => void = undefined; @@ -440,6 +443,7 @@ export class OutgoingConnection { const audioMessageString = JSON.stringify(audioMessage); this.openaiWebSocket.send(audioMessageString); + this.resetIdleCommitTimeout(); if (this.env.DEBUG === 'true') { writeMetric(this.env.METRICS, { name: 'openai_audio_sent', @@ -531,6 +535,7 @@ export class OutgoingConnection { parsedMessage.item_id, false, ); + this.clearIdleCommitTimeout(); this.onCompleteTranscription?.(transcription); } else if (parsedMessage.type === 'conversation.item.input_audio_transcription.failed') { console.error(`OpenAI failed to transcribe audio for tag ${this._tag}: ${data}`); @@ -571,11 +576,43 @@ export class OutgoingConnection { } } + private resetIdleCommitTimeout(): void { + this.clearIdleCommitTimeout(); + + const timeoutSeconds = parseInt(this.env.FORCE_COMMIT_TIMEOUT || '0', 10); + if (timeoutSeconds <= 0) { + return; + } + + this.idleCommitTimeout = setTimeout(() => { + this.forceCommit(); + }, timeoutSeconds * 1000); + } + + private clearIdleCommitTimeout(): void { + if (this.idleCommitTimeout !== null) { + clearTimeout(this.idleCommitTimeout); + this.idleCommitTimeout = null; + } + } + + private forceCommit(): void { + if (this.connectionStatus !== 'connected' || !this.openaiWebSocket) { + return; + } + + console.log(`Forcing commit for idle connection ${this._tag}`); + const commitMessage = { type: 'input_audio_buffer.commit' }; + this.openaiWebSocket.send(JSON.stringify(commitMessage)); + this.idleCommitTimeout = null; + } + close(): void { this.doClose(false); } private doClose(notify: boolean): void { + this.clearIdleCommitTimeout(); this.opusDecoder?.free(); this.openaiWebSocket?.close(); this.decoderStatus = 'closed'; diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index 935c90f..a061cde 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -11,6 +11,7 @@ declare namespace Cloudflare { OPENAI_MODEL?: string; OPENAI_TURN_DETECTION?: string; DEBUG?: string; + FORCE_COMMIT_TIMEOUT?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; TRANSCRIPTION_DISPATCHER: Fetcher /* transcription-dispatcher */; METRICS?: AnalyticsEngineDataset; diff --git a/wrangler.jsonc b/wrangler.jsonc index 076f348..61fb071 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -30,7 +30,13 @@ * Environment Variables * https://developers.cloudflare.com/workers/wrangler/configuration/#environment-variables */ - // "vars": { "MY_VARIABLE": "production_value" } + "vars": { + /** + After how many seconds of no received audio and no final transcription + a flush is triggered. + **/ + "FORCE_COMMIT_TIMEOUT": "2" + }, /** * Note: Use secrets to store sensitive data. * https://developers.cloudflare.com/workers/configuration/secrets/ From 4939af652c663ca90e5d18764d4ba52872a44753 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Tue, 16 Dec 2025 14:32:56 -0500 Subject: [PATCH 41/48] Ignore the OpenAI error input_audio_buffer_commit_empty. (#22) This can happen if our manual commit happens to hit right after the VAD detected that speech stopped. --- src/OutgoingConnection.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 0e0340c..15bfe18 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -551,8 +551,14 @@ export class OutgoingConnection { } else { console.error('Received cleared event but no pending tag available.'); } - } else if (parsedMessage.type === 'error') { + if (parsedMessage.error?.type === 'invalid_request_error' && parsedMessage.error?.code === 'input_audio_buffer_commit_empty') { + // This error indicates that we tried to commit an empty audio buffer, which can happen + // if the VAD detected speech stopped just before we did a manual commit. Ignore. + // TODO should we log this at all? + console.log(`OpenAI reported empty audio buffer commit for ${this._tag}, ignoring.`); + return; + } console.error(`OpenAI sent error message for ${this._tag}: ${data}`); writeMetric(this.env.METRICS, { name: 'openai_api_error', From 9954e3b9c6a8b22ef28bb063a9a5e820e627349d Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Tue, 16 Dec 2025 14:33:11 -0500 Subject: [PATCH 42/48] Avoid double-free of Opus decoder. (#23) --- src/OpusDecoder/OpusDecoder.ts | 37 +++++++++++++++++++++++++++++++--- src/OutgoingConnection.ts | 6 +++++- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/OpusDecoder/OpusDecoder.ts b/src/OpusDecoder/OpusDecoder.ts index b397b18..3f37172 100644 --- a/src/OpusDecoder/OpusDecoder.ts +++ b/src/OpusDecoder/OpusDecoder.ts @@ -115,7 +115,7 @@ export class OpusDecoder; private _output!: TypedArrayAllocation; - private _decoder!: number; + private _decoder: number | undefined; constructor( options: { @@ -169,6 +169,9 @@ export class OpusDecoder { const errors: DecodeError[] = []; + if (this._decoder === undefined) { + this.addError(errors, 'Decoder freed or not initialized', 0, 0, 0, 0); + console.error('Decoder freed or not initialized'); + return { + errors, + pcmData: new Int16Array(0), + channels: this._channels, + samplesDecoded: 0, + sampleRate: this._sampleRate, + } as OpusDecodedAudio; + } + this._input.buf.set(opusFrame); let samplesDecoded = this.wasm.opus_frame_decode( @@ -256,10 +274,23 @@ export class OpusDecoder { + const errors: DecodeError[] = []; + + if (this._decoder === undefined) { + this.addError(errors, 'Decoder freed or not initialized', 0, 0, 0, 0); + console.error('Decoder freed or not initialized'); + return { + errors, + pcmData: new Int16Array(0), + channels: this._channels, + samplesDecoded: 0, + sampleRate: this._sampleRate, + } as OpusDecodedAudio; + } + if (samplesToConceal > this._outputChannelSize) { samplesToConceal = this._outputChannelSize; } - const errors: DecodeError[] = []; let samplesDecoded: number; let inLength: number; if (opusFrame !== undefined) { diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 15bfe18..873fa90 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -620,9 +620,13 @@ export class OutgoingConnection { private doClose(notify: boolean): void { this.clearIdleCommitTimeout(); this.opusDecoder?.free(); - this.openaiWebSocket?.close(); + this.opusDecoder = undefined; this.decoderStatus = 'closed'; + + this.openaiWebSocket?.close(); + this.openaiWebSocket = undefined; this.connectionStatus = 'closed'; + if (notify) { this.onClosed?.(this._tag); } From f3beeb9ffd0caa6ae7f6f533957d4a1359cee4a4 Mon Sep 17 00:00:00 2001 From: bgrozev Date: Tue, 16 Dec 2025 15:17:50 -0600 Subject: [PATCH 43/48] Adjust turn detection settings (#24) * Use OPENAI_TURN_DETECTION as JSON. * Adjust default turn detection. * Make sure the turn detection is an object and has a type field. --- src/utils.ts | 27 +++++++++++++++++++-------- worker-configuration.d.ts | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index eb4d2c3..61607cd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -37,19 +37,30 @@ export function extractSessionParameters(url: string): ISessionParameters { export function getTurnDetectionConfig(env: Env) { const defaultTurnDetection = { type: 'server_vad', - threshold: 0.5, + threshold: 0.85, prefix_padding_ms: 300, - silence_duration_ms: 500, + silence_duration_ms: 300, }; + let turnDetection = defaultTurnDetection; + if (env.OPENAI_TURN_DETECTION) { - try { - return JSON.parse(env.OPENAI_TURN_DETECTION); - } catch (error) { - console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults: ${error}`); - return defaultTurnDetection; + if (typeof env.OPENAI_TURN_DETECTION === 'string') { + try { + turnDetection = JSON.parse(env.OPENAI_TURN_DETECTION); + } catch (error) { + console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults: ${error}`); + return defaultTurnDetection; + } } + // JSON object from CF + turnDetection = env.OPENAI_TURN_DETECTION; + } + + if (typeof turnDetection !== 'object' || typeof turnDetection.type !== string) { + console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults`); + return defaultTurnDetection; } - return defaultTurnDetection; + return turnDetection; } diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index a061cde..fff7341 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -9,7 +9,7 @@ declare namespace Cloudflare { interface Env { OPENAI_API_KEY: string; OPENAI_MODEL?: string; - OPENAI_TURN_DETECTION?: string; + OPENAI_TURN_DETECTION?: any; DEBUG?: string; FORCE_COMMIT_TIMEOUT?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; From 4dbf78b66b89390531fa99c68f65084b1cd66b39 Mon Sep 17 00:00:00 2001 From: bgrozev Date: Tue, 16 Dec 2025 15:36:49 -0600 Subject: [PATCH 44/48] Fix typeof check. (#25) --- src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index 61607cd..d9907e5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -57,7 +57,7 @@ export function getTurnDetectionConfig(env: Env) { turnDetection = env.OPENAI_TURN_DETECTION; } - if (typeof turnDetection !== 'object' || typeof turnDetection.type !== string) { + if (typeof turnDetection !== 'object' || typeof turnDetection.type !== 'string') { console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults`); return defaultTurnDetection; } From 0650005cdcf245c05f8df2d02f3e8007706e902a Mon Sep 17 00:00:00 2001 From: bgrozev Date: Wed, 17 Dec 2025 19:17:56 -0600 Subject: [PATCH 45/48] Mirror ping id, add an event field. (#28) * Mirror the "id" field when responding to ping. * Add an "event" field to transcription messages, so they parse as MediaJson. --- src/OutgoingConnection.ts | 1 + src/transcriberproxy.ts | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 873fa90..b0e41a7 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -496,6 +496,7 @@ export class OutgoingConnection { is_interim: isInterim, message_id, type: 'transcription-result', + event: 'transcription-result', participant: this.participant, timestamp, }; diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index 1b1e511..49b2cb5 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -7,6 +7,7 @@ export interface TranscriptionMessage { language?: string; message_id: string; type: 'transcription-result'; + event: 'transcription-result'; participant: { id: string; ssrc?: string }; timestamp: number; } @@ -48,7 +49,11 @@ export class TranscriberProxy extends EventEmitter { } if (parsedMessage && parsedMessage.event === 'ping') { - this.ws.send(JSON.stringify({ event: 'pong' })); + const pongMessage: { event: string; id?: number } = { event: 'pong' }; + if (typeof parsedMessage.id === 'number') { + pongMessage.id = parsedMessage.id; + } + this.ws.send(JSON.stringify(pongMessage)); } else if (parsedMessage && parsedMessage.event === 'media') { this.handleMediaEvent(parsedMessage); } From 985cec872e9aed869e1e15302041d64742a7774f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C4=83zvan=20Purdel?= <104128753+rpurdel@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:46:38 +0200 Subject: [PATCH 46/48] log actual OpenAI connection error instead of [object ErrorEvent] (#26) * fix: log the actual error instead of [object ErrorEvent] * fix: get reason and code from close event --- src/OutgoingConnection.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index b0e41a7..57eb184 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -210,8 +210,10 @@ export class OutgoingConnection { this.handleOpenAIMessage(event.data); }); - openaiWs.addEventListener('error', (error) => { - console.error(`OpenAI WebSocket error for tag ${this._tag}:`, error); + openaiWs.addEventListener('error', (event) => { + // Extract useful info from ErrorEvent (event.message is often empty for WebSocket errors) + const errorMessage = event instanceof ErrorEvent ? event.message || 'WebSocket error' : 'WebSocket error'; + console.error(`OpenAI WebSocket error for tag ${this._tag}: ${errorMessage}`); writeMetric(this.env.METRICS, { name: 'openai_api_error', worker: 'opus-transcriber-proxy', @@ -220,11 +222,11 @@ export class OutgoingConnection { this.onOpenAIError?.('websocket_error', 'WebSocket connection error'); this.doClose(true); this.connectionStatus = 'failed'; - this.onError?.(this._tag, `Error connecting to OpenAI service: ${error instanceof Error ? error.message : String(error)}`); + this.onError?.(this._tag, `Error connecting to OpenAI service: ${errorMessage}`); }); - openaiWs.addEventListener('close', () => { - console.log(`OpenAI WebSocket closed for tag: ${this._tag}`); + openaiWs.addEventListener('close', (event) => { + console.log(`OpenAI WebSocket closed for tag ${this._tag}: code=${event.code} reason=${event.reason || 'none'} wasClean=${event.wasClean}`); this.doClose(true); this.connectionStatus = 'failed'; }); From 7eeff8d2e13620079d69259317fcc7551a2d0645 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Thu, 18 Dec 2025 15:15:54 -0500 Subject: [PATCH 47/48] Add MetricCache. (#29) Aggregate happy-path metrics so as to not overwhelm the metrics system. --- src/MetricCache.ts | 75 +++++++++++++++++++++++++++++++++++++++ src/OutgoingConnection.ts | 56 ++++++++++++++--------------- src/metrics.ts | 6 ++-- 3 files changed, 104 insertions(+), 33 deletions(-) create mode 100644 src/MetricCache.ts diff --git a/src/MetricCache.ts b/src/MetricCache.ts new file mode 100644 index 0000000..4717642 --- /dev/null +++ b/src/MetricCache.ts @@ -0,0 +1,75 @@ +import { writeMetric, MetricEvent } from './metrics'; + +/** + * Aggregates metric counts and periodically flushes them to Analytics Engine. + * Reduces write frequency by batching metrics over a time interval. + */ +export class MetricCache { + private analytics: AnalyticsEngineDataset | undefined; + private intervalMs: number; + private metrics: Map; + + /** + * @param analytics - The Analytics Engine dataset to write to + * @param intervalMs - Time interval in milliseconds between metric writes (default: 1000ms) + */ + constructor(analytics: AnalyticsEngineDataset | undefined, intervalMs: number = 1000) { + this.analytics = analytics; + this.intervalMs = intervalMs; + this.metrics = new Map(); + } + + /** + * Increments the count for a metric. If the time interval has elapsed since + * the last write, flushes the accumulated count to Analytics Engine. + * + * @param event - The metric event to increment + */ + increment(event: MetricEvent): void { + const key = this.getKey(event); + const now = Date.now(); + const metric = this.metrics.get(key); + + if (!metric) { + // First time seeing this metric + this.metrics.set(key, { event, count: 1, lastWriteTime: now }); + } else { + // Increment existing metric + metric.count++; + + // Check if it's time to flush + if (now - metric.lastWriteTime >= this.intervalMs) { + writeMetric(this.analytics, metric.event, metric.count); + metric.count = 0; + metric.lastWriteTime = now; + } + } + } + + /** + * Flushes all accumulated metrics immediately, regardless of time interval. + * Useful for cleanup on shutdown or before long idle periods. + */ + flush(): void { + for (const [_, metric] of this.metrics) { + if (metric.count > 0) { + writeMetric(this.analytics, metric.event, metric.count); + metric.count = 0; + metric.lastWriteTime = Date.now(); + } + } + } + + /** + * Generates a unique key for a metric event based on its distinguishing properties. + * Does not include sessionId to allow aggregation across sessions. + */ + private getKey(event: MetricEvent): string { + return JSON.stringify({ + name: event.name, + worker: event.worker, + errorType: event.errorType ?? '', + targetName: event.targetName ?? '', + }); + } +} diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 57eb184..bc99c76 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -2,6 +2,7 @@ import { OpusDecoder } from './OpusDecoder/OpusDecoder'; import type { TranscriptionMessage, TranscriberProxyOptions } from './transcriberproxy'; import { getTurnDetectionConfig } from './utils'; import { writeMetric } from './metrics'; +import { MetricCache } from './metricCache'; // Type definition augmentation for Uint8Array - Cloudflare Worker's JS has these methods but TypeScript doesn't have // declarations for them as of version 5.9.3. @@ -103,17 +104,20 @@ export class OutgoingConnection { private env: Env; private options: TranscriberProxyOptions; + private metricCache: MetricCache; constructor(tag: string, env: Env, options: TranscriberProxyOptions) { this.setTag(tag); this.env = env; this.options = options; + this.metricCache = new MetricCache(env.METRICS); this.initializeOpusDecoder(); this.initializeOpenAIWebSocket(env); } reset(newTag: string) { + this.metricCache.flush(); if (this.connectionStatus == 'connected') { this.pendingTags.push(newTag); @@ -262,12 +266,10 @@ export class OutgoingConnection { return; } - if (this.env.DEBUG === 'true') { - writeMetric(this.env.METRICS, { - name: 'opus_packet_received', - worker: 'opus-transcriber-proxy', - }); - } + this.metricCache.increment({ + name: 'opus_packet_received', + worker: 'opus-transcriber-proxy', + }); if (Number.isInteger(mediaEvent.media?.chunk) && Number.isInteger(mediaEvent.media.timestamp)) { if (this.lastChunkNo != -1 && mediaEvent.media.chunk != this.lastChunkNo + 1) { @@ -298,12 +300,10 @@ export class OutgoingConnection { } else if (this.decoderStatus === 'pending') { // Queue the binary data until decoder is ready this.pendingOpusFrames.push(opusFrame); - if (this.env.DEBUG === 'true') { - writeMetric(this.env.METRICS, { - name: 'opus_packet_queued', - worker: 'opus-transcriber-proxy', - }); - } + this.metricCache.increment({ + name: 'opus_packet_queued', + worker: 'opus-transcriber-proxy', + }); // console.log(`Queued opus frame for tag: ${this.tag} (queue size: ${this.pendingOpusFrames.length})`); } else { console.log(`Not queueing opus frame for tag: ${this._tag}: decoder ${this.decoderStatus}`); @@ -371,12 +371,10 @@ export class OutgoingConnection { // Don't call onError for decoding errors, as they may be transient return; } - if (this.env.DEBUG === 'true') { - writeMetric(this.env.METRICS, { - name: 'opus_packet_decoded', - worker: 'opus-transcriber-proxy', - }); - } + this.metricCache.increment({ + name: 'opus_packet_decoded', + worker: 'opus-transcriber-proxy', + }); this.lastOpusFrameSize = decodedAudio.samplesDecoded; this.sendOrEnqueueDecodedAudio(decodedAudio.pcmData); } catch (error) { @@ -404,12 +402,10 @@ export class OutgoingConnection { this.pendingAudioDataBuffer.resize(uint8Data.byteLength); this.pendingAudioData.set(uint8Data); } - if (this.env.DEBUG === 'true') { - writeMetric(this.env.METRICS, { - name: 'openai_audio_queued', - worker: 'opus-transcriber-proxy', - }); - } + this.metricCache.increment({ + name: 'openai_audio_queued', + worker: 'opus-transcriber-proxy', + }); } else { console.log(`Not queueing audio data for tag: ${this._tag}: connection ${this.connectionStatus}`); } @@ -446,12 +442,10 @@ export class OutgoingConnection { this.openaiWebSocket.send(audioMessageString); this.resetIdleCommitTimeout(); - if (this.env.DEBUG === 'true') { - writeMetric(this.env.METRICS, { - name: 'openai_audio_sent', - worker: 'opus-transcriber-proxy', - }); - } + this.metricCache.increment({ + name: 'openai_audio_sent', + worker: 'opus-transcriber-proxy', + }); } catch (error) { console.error(`Failed to send audio to OpenAI for tag ${this._tag}`, error); // TODO should this call onError? @@ -611,6 +605,7 @@ export class OutgoingConnection { } console.log(`Forcing commit for idle connection ${this._tag}`); + this.metricCache.flush(); const commitMessage = { type: 'input_audio_buffer.commit' }; this.openaiWebSocket.send(JSON.stringify(commitMessage)); this.idleCommitTimeout = null; @@ -622,6 +617,7 @@ export class OutgoingConnection { private doClose(notify: boolean): void { this.clearIdleCommitTimeout(); + this.metricCache.flush(); this.opusDecoder?.free(); this.opusDecoder = undefined; this.decoderStatus = 'closed'; diff --git a/src/metrics.ts b/src/metrics.ts index 70192e8..2ed99e0 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -41,11 +41,11 @@ export interface MetricEvent { * - blob3: error_type (optional, for failures) * - blob4: session_id (optional, for correlation) * - blob5: target_name (optional, for dispatcher) - * - double1: count (always 1) + * - double1: count (default 1) * - double2: latency_ms (optional) * - index1: session_id (for sampling) */ -export function writeMetric(analytics: AnalyticsEngineDataset | undefined, event: MetricEvent): void { +export function writeMetric(analytics: AnalyticsEngineDataset | undefined, event: MetricEvent, count: number = 1): void { if (!analytics) { console.warn('Analytics Engine not configured, skipping metric:', event.name); return; @@ -53,7 +53,7 @@ export function writeMetric(analytics: AnalyticsEngineDataset | undefined, event analytics.writeDataPoint({ blobs: [event.name, event.worker, event.errorType ?? '', event.sessionId ?? '', event.targetName ?? ''], - doubles: [1, event.latencyMs ?? 0], + doubles: [count, event.latencyMs ?? 0], indexes: [event.sessionId ?? ''], }); } From b73e9ad67f4d7c014f50c0a2103f9fa813da3475 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Thu, 18 Dec 2025 15:25:10 -0500 Subject: [PATCH 48/48] Fix capitalization --- src/OutgoingConnection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index bc99c76..0009b41 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -2,7 +2,7 @@ import { OpusDecoder } from './OpusDecoder/OpusDecoder'; import type { TranscriptionMessage, TranscriberProxyOptions } from './transcriberproxy'; import { getTurnDetectionConfig } from './utils'; import { writeMetric } from './metrics'; -import { MetricCache } from './metricCache'; +import { MetricCache } from './MetricCache'; // Type definition augmentation for Uint8Array - Cloudflare Worker's JS has these methods but TypeScript doesn't have // declarations for them as of version 5.9.3.