""" ============================================================= Audio Recorder Server for Windows Controlled by ESP32-S3 via HTTP or web browser ============================================================= SETUP ----- 1. Install Python 3.8+ from python.org 2. Install dependencies: pip install flask sounddevice soundfile numpy 3. Run: python audio_recorder_server.py API ENDPOINTS ------------- GET /api/start → Start a new recording GET /api/stop → Stop recording GET /api/pause → Pause / resume recording GET /api/resume → Resume paused recording GET /api/save → Save the last stopped recording to disk GET /api/status → Get current state, elapsed time, file list GET /api/devices → List available audio input devices POST /api/setdevice → Set input device {device: index_or_null} ============================================================= """ import os import sys import time import json import queue import signal import threading import argparse import datetime import numpy as np from pathlib import Path import sounddevice as sd import soundfile as sf try: import lameenc MP3_AVAILABLE = True except ImportError: MP3_AVAILABLE = False print("[Server] lameenc not installed — MP3 encoding unavailable") from flask import Flask, jsonify, request, send_from_directory # ─── ARGUMENT PARSING ──────────────────────────────────────────────────────── parser = argparse.ArgumentParser(description="Audio Recorder Server") parser.add_argument("--port", type=int, default=5000) parser.add_argument("--host", type=str, default="0.0.0.0") parser.add_argument("--outdir", type=str, default="./recordings") parser.add_argument("--device", type=int, default=None) parser.add_argument("--samplerate", type=int, default=44100) parser.add_argument("--channels", type=int, default=2) parser.add_argument("--format", type=str, default="WAV", choices=["WAV", "FLAC", "OGG", "MP3"]) parser.add_argument("--list-devices", action="store_true") # ─── CONFIG GLOBALS ────────────────────────────────────────────────────────── OUTPUT_DIR = Path("./recordings") SAMPLE_RATE = 44100 CHANNELS = 2 DEVICE = None FILE_FORMAT = "WAV" PORT = 5000 def init_from_args(): global OUTPUT_DIR, SAMPLE_RATE, CHANNELS, DEVICE, FILE_FORMAT, PORT args = parser.parse_args() if args.list_devices: print("\n=== Available Audio Input Devices ===\n") for i, d in enumerate(sd.query_devices()): if d['max_input_channels'] > 0: print(f" [{i:2d}] {d['name']}") print(f" Channels: {d['max_input_channels']} |" f" Rate: {int(d['default_samplerate'])} Hz") sys.exit(0) OUTPUT_DIR = Path(args.outdir) SAMPLE_RATE = args.samplerate CHANNELS = args.channels DEVICE = args.device FILE_FORMAT = args.format.upper() PORT = args.port OUTPUT_DIR.mkdir(parents=True, exist_ok=True) return args # ─── HELPERS ───────────────────────────────────────────────────────────────── def get_device_name(): try: if DEVICE is None: idx = sd.default.device[0] return "Default: " + sd.query_devices(idx)['name'] return sd.query_devices(DEVICE)['name'] except Exception: return "Unknown device" # ─── DEVICE CAPABILITY RESOLVER ───────────────────────────────────────────── def resolve_device_settings(device, wanted_rate, wanted_ch): """ Query what the device actually supports and return the best (sample_rate, channels) we can use without PortAudio rejecting it. Falls back gracefully so recording always starts. """ try: info = sd.query_devices(device, 'input') max_ch = int(info['max_input_channels']) default_rate = int(info['default_samplerate']) # Clamp channels to what device supports actual_ch = min(wanted_ch, max_ch) if actual_ch < 1: actual_ch = 1 # Try requested rate first, then device default, then common fallbacks candidate_rates = [wanted_rate, default_rate, 48000, 44100, 22050, 16000] seen = set() for rate in candidate_rates: if rate in seen: continue seen.add(rate) try: sd.check_input_settings(device=device, channels=actual_ch, samplerate=rate, dtype='float32') return rate, actual_ch except Exception: continue # Last resort — let sounddevice use whatever it wants return default_rate, actual_ch except Exception as e: print(f"[Resolver] Could not query device {device}: {e} — using defaults") return wanted_rate, wanted_ch # ─── RECORDER ──────────────────────────────────────────────────────────────── class RecorderState: IDLE = "idle" RECORDING = "recording" PAUSED = "paused" SAVING = "saving" class AudioRecorder: def __init__(self): self.state = RecorderState.IDLE self.audio_data = [] self.stream = None self.lock = threading.Lock() self.start_time = None self.pause_time = None self.paused_secs = 0.0 self.current_file = "" self.last_saved = "" self.error_msg = "" self.level = 0.0 # RMS level 0.0–1.0, updated per callback self.peak = 0.0 # peak hold, decays slowly self._peak_decay = 0.0 @property def elapsed_seconds(self): if self.start_time is None: return 0 if self.state == RecorderState.PAUSED: return self.pause_time - self.start_time - self.paused_secs if self.state == RecorderState.RECORDING: return time.time() - self.start_time - self.paused_secs return 0 @property def elapsed_str(self): s = int(self.elapsed_seconds) return f"{s//3600:02d}:{(s%3600)//60:02d}:{s%60:02d}" def _audio_callback(self, indata, frames, time_info, status): # Compute RMS level from this block (all channels mixed to mono) mono = indata.mean(axis=1) if indata.ndim > 1 else indata[:,0] rms = float(np.sqrt(np.mean(mono ** 2))) # Convert to 0–1 with a log scale so quiet signals are visible # -60 dB floor → 0.0, 0 dB → 1.0 if rms > 0: db = 20 * np.log10(rms + 1e-9) level = max(0.0, min(1.0, (db + 60) / 60)) else: level = 0.0 self.level = level # Peak hold — snap up instantly, decay at ~10 dB/s if level >= self.peak: self.peak = level else: self.peak = max(0.0, self.peak - 0.012) with self.lock: if self.state == RecorderState.RECORDING: self.audio_data.append(indata.copy()) def start(self): with self.lock: if self.state != RecorderState.IDLE: return False, f"Cannot start — currently {self.state}" try: self.audio_data = [] self.paused_secs = 0.0 self.start_time = time.time() self.pause_time = None self.error_msg = "" self.current_file = self._make_filename() use_device = DEVICE # None = system default # Query actual device capabilities and clamp to what it supports actual_rate, actual_ch = resolve_device_settings(use_device, SAMPLE_RATE, CHANNELS) print(f"[Recorder] Using device={use_device} rate={actual_rate} ch={actual_ch}") self.stream = sd.InputStream( samplerate = actual_rate, channels = actual_ch, device = use_device, callback = self._audio_callback, dtype = 'float32', blocksize = 4096 ) self.stream.start() # Store actual settings used (for correct save) self._actual_rate = actual_rate self._actual_ch = actual_ch with self.lock: self.state = RecorderState.RECORDING print(f"[Recorder] Started: {self.current_file} device={use_device}") return True, "Recording started" except Exception as e: self.error_msg = str(e) print(f"[Recorder] Start error: {e}") return False, str(e) def pause(self): with self.lock: if self.state != RecorderState.RECORDING: return False, f"Cannot pause — currently {self.state}" self.state = RecorderState.PAUSED self.pause_time = time.time() return True, "Paused" def resume(self): with self.lock: if self.state != RecorderState.PAUSED: return False, f"Cannot resume — currently {self.state}" self.paused_secs += time.time() - self.pause_time self.pause_time = None self.state = RecorderState.RECORDING return True, "Resumed" def stop(self): with self.lock: if self.state not in (RecorderState.RECORDING, RecorderState.PAUSED): return False, f"Cannot stop — currently {self.state}" self.state = RecorderState.IDLE if self.stream: self.stream.stop() self.stream.close() self.stream = None print(f"[Recorder] Stopped. Blocks: {len(self.audio_data)}") return True, "Stopped" def save(self): if not self.audio_data: return False, "No audio data to save" self.state = RecorderState.SAVING def _save_thread(): try: filepath = OUTPUT_DIR / self.current_file audio_np = np.concatenate(self.audio_data, axis=0) ext = FILE_FORMAT.lower() actual_rate = getattr(recorder, '_actual_rate', SAMPLE_RATE) actual_ch = getattr(recorder, '_actual_ch', CHANNELS) if ext == "mp3": if not MP3_AVAILABLE: raise Exception("lameenc not installed. Run: pip install lameenc") # Convert float32 [-1,1] to int16 PCM for lameenc pcm16 = (np.clip(audio_np, -1.0, 1.0) * 32767).astype(np.int16) enc = lameenc.Encoder() enc.set_bit_rate(192) # 192 kbps — good quality/size balance enc.set_in_sample_rate(actual_rate) enc.set_channels(actual_ch) enc.set_quality(2) # 2=high quality, 7=fastest mp3_data = enc.encode(pcm16.tobytes()) + enc.flush() with open(str(filepath), 'wb') as mp3f: mp3f.write(mp3_data) else: fmt_map = {"wav": ("WAV", "PCM_16"), "flac": ("FLAC", "PCM_16"), "ogg": ("OGG", "VORBIS")} fmt, sub = fmt_map.get(ext, ("WAV", "PCM_16")) sf.write(str(filepath), audio_np, actual_rate, format=fmt, subtype=sub) kb = filepath.stat().st_size // 1024 self.last_saved = self.current_file self.audio_data = [] self.state = RecorderState.IDLE print(f"[Recorder] Saved: {filepath} ({kb} KB)") except Exception as e: self.error_msg = str(e) self.state = RecorderState.IDLE print(f"[Recorder] Save error: {e}") threading.Thread(target=_save_thread, daemon=True).start() return True, f"Saving {self.current_file}" def list_files(self): files = [] for f in sorted(OUTPUT_DIR.iterdir(), reverse=True): if f.suffix.lower() in {".wav", ".flac", ".ogg", ".mp3"}: kb = f.stat().st_size // 1024 size_str = f"{kb} KB" if kb < 1024 else f"{kb//1024:.1f} MB" files.append({ "name": f.name, "size": size_str, "modified": datetime.datetime.fromtimestamp( f.stat().st_mtime).strftime("%Y-%m-%d %H:%M") }) return files[:50] def status_dict(self): return { "state": self.state, "elapsed": self.elapsed_str, "elapsed_seconds": int(self.elapsed_seconds), "file": self.current_file, "last_saved": self.last_saved, "error": self.error_msg, "device": DEVICE, "device_name": get_device_name(), "mp3_available": MP3_AVAILABLE, "files": self.list_files() } def _make_filename(self): now = datetime.datetime.now() day = now.strftime("%a").upper() # MON, TUE, WED ... ts = now.strftime(f"%Y%m%d_{day}_%H%M") return f"{ts}.{FILE_FORMAT.lower()}" # ─── FLASK APP ──────────────────────────────────────────────────────────────── app = Flask(__name__) recorder = AudioRecorder() import logging logging.getLogger('werkzeug').setLevel(logging.WARNING) # ─── API ROUTES ─────────────────────────────────────────────────────────────── @app.route("/api/start") def api_start(): ok, msg = recorder.start() r = recorder.status_dict(); r["message"] = msg return jsonify(r), (200 if ok else 400) @app.route("/api/stop") def api_stop(): ok, msg = recorder.stop() r = recorder.status_dict(); r["message"] = msg return jsonify(r), (200 if ok else 400) @app.route("/api/pause") def api_pause(): if recorder.state == RecorderState.RECORDING: ok, msg = recorder.pause() elif recorder.state == RecorderState.PAUSED: ok, msg = recorder.resume() else: ok, msg = False, f"Cannot pause — currently {recorder.state}" r = recorder.status_dict(); r["message"] = msg return jsonify(r), (200 if ok else 400) @app.route("/api/resume") def api_resume(): ok, msg = recorder.resume() r = recorder.status_dict(); r["message"] = msg return jsonify(r), (200 if ok else 400) @app.route("/api/save") def api_save(): if recorder.state in (RecorderState.RECORDING, RecorderState.PAUSED): recorder.stop() ok, msg = recorder.save() r = recorder.status_dict(); r["message"] = msg return jsonify(r), (200 if ok else 400) @app.route("/api/status") def api_status(): return jsonify(recorder.status_dict()) @app.route("/api/level") def api_level(): """Lightweight level poll — returns current RMS + peak (0.0–1.0).""" return jsonify({ "level": round(recorder.level, 4), "peak": round(recorder.peak, 4), "state": recorder.state }) @app.route("/api/devices") def api_devices(): devices = [] for i, d in enumerate(sd.query_devices()): if d['max_input_channels'] > 0: devices.append({ "index": i, "name": d['name'], "channels": int(d['max_input_channels']), "rate": int(d['default_samplerate']), "selected": (DEVICE == i) }) # Also include resolved settings for current device r, ch = resolve_device_settings(DEVICE, SAMPLE_RATE, CHANNELS) return jsonify({"devices": devices, "current": DEVICE, "current_name": get_device_name(), "resolved_rate": r, "resolved_ch": ch}) @app.route("/api/setdevice", methods=["POST"]) def api_set_device(): global DEVICE if recorder.state != RecorderState.IDLE: return jsonify({"ok": False, "error": "Cannot change device while recording"}), 400 data = request.get_json(force=True, silent=True) or {} raw = data.get("device", None) if raw is None or raw == "" or str(raw) == "-1": DEVICE = None name = get_device_name() else: try: idx = int(raw) sd.query_devices(idx, 'input') DEVICE = idx name = sd.query_devices(idx)['name'] except Exception as e: return jsonify({"ok": False, "error": str(e)}), 400 # Optional: also update channels/samplerate/format global CHANNELS, SAMPLE_RATE, FILE_FORMAT data2 = data # already parsed above if "channels" in data2: CHANNELS = int(data2["channels"]) if "samplerate" in data2: SAMPLE_RATE = int(data2["samplerate"]) if "format" in data2 and data2["format"] in ("WAV","FLAC","OGG","MP3"): FILE_FORMAT = data2["format"] print(f"[Server] Device={name} ch={CHANNELS} rate={SAMPLE_RATE} fmt={FILE_FORMAT}") return jsonify({"ok": True, "device": DEVICE, "device_name": name}) @app.route("/recordings/") def serve_recording(filename): return send_from_directory(str(OUTPUT_DIR), filename, as_attachment=True) @app.route("/api/rename", methods=["POST"]) def api_rename(): data = request.get_json(force=True, silent=True) or {} old_name = data.get("old", "").strip() new_name = data.get("new", "").strip() if not old_name or not new_name: return jsonify({"ok": False, "error": "Missing filename"}), 400 # Sanitise — no path separators allowed for ch in ("/", "\\", "..", ":"): if ch in new_name: return jsonify({"ok": False, "error": "Invalid filename"}), 400 old_path = OUTPUT_DIR / old_name # Preserve extension from original file ext = Path(old_name).suffix if not new_name.endswith(ext): new_name = new_name + ext new_path = OUTPUT_DIR / new_name if not old_path.exists(): return jsonify({"ok": False, "error": "File not found"}), 404 if new_path.exists(): return jsonify({"ok": False, "error": "A file with that name already exists"}), 409 old_path.rename(new_path) print(f"[Server] Renamed: {old_name} -> {new_name}") return jsonify({"ok": True, "new_name": new_name}) # ─── WEB UI ─────────────────────────────────────────────────────────────────── @app.route("/") def index(): # Return raw string — do NOT use render_template_string as Jinja2 # will try to parse CSS/JS curly braces as template variables html = build_ui_html() return html, 200, {"Content-Type": "text/html; charset=utf-8"} def build_ui_html(): """Build the web UI — plain string, no Jinja2.""" return """\ Audio Recorder

🎤 Audio Recorder

Windows PC Server
Idle  
00:00:00
 
Input: loading...
Saved Recordings
No recordings yet
Audio Input Device
Input Device
Microphone, line-in, or Stereo Mix
Channels
Sample Rate
File Format
WAV = lossless, MP3 = ~10x smaller
Tip: Capture System Audio
To record audio playing on your PC, enable Stereo Mix in Windows: Right-click speaker → Sounds → Recording tab → right-click empty area → Show Disabled Devices → enable Stereo Mix. Then select it above.
""" # ─── STARTUP INFO ───────────────────────────────────────────────────────────── def print_startup_info(): import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() except Exception: ip = "unknown" print() print("=" * 50) print(" Audio Recorder Server") print("=" * 50) print(f" URL : http://{ip}:{PORT}") print(f" Output : {OUTPUT_DIR.resolve()}") print(f" Device : {get_device_name()}") print(f" Format : {FILE_FORMAT} {SAMPLE_RATE}Hz {CHANNELS}ch") print("=" * 50) print() if __name__ == "__main__": signal.signal(signal.SIGINT, lambda s,f: sys.exit(0)) signal.signal(signal.SIGTERM, lambda s,f: sys.exit(0)) args = init_from_args() print_startup_info() app.run(host=args.host, port=PORT, debug=False, threaded=True)