Guide: dub your Videos through Synthesia API

This guide shows how to dub one or many videos through the Synthesia API using a single, resumable Python script. It explains the end-to-end logic, key concepts, the API endpoints involved, and provides a ready-to-run CLI with examples and troubleshooting.

Overview:

End-to-end flow

When you dub a video with Synthesia programmatically, you typically perform four steps:

  1. Create media asset — call POST /v2/assets to register the video and obtain short‑lived S3 upload credentials.
  2. Upload the video — use those credentials to multipart-upload the file to S3 (reliable for large files).
  3. Start dubbing — call POST /v2/dubbing with your asset id, the source language, and the target language(s).
  4. Poll & download — call GET /v2/dubbing/{imported_video_id} until status=COMPLETE, then download the output MP4(s).

Key concepts

  • Source language — language of the original audio (e.g., en).
  • Target languages — locales you want to dub into (e.g., fr-FR,es-ES). Use BCP‑47 language/locale codes.
  • Media asset — the uploaded video registered via POST /v2/assets. The response includes an asset id and temporary uploadCredentials for S3.
  • Imported (dubbed) video — the dubbing job/result referenced by createdImportedAsset.id returned from POST /v2/dubbing. Use this id to poll job status.
  • Temporary credentials — S3 credentials expire; if an upload fails due to expiration, create a new asset and re-upload. Region is eu-west-1 for the temporary bucket.
  • Resumable state — the script writes a small JSON file locally (default ~/.synthesia_dub_state.json) containing the asset_id and imported_id per file, so restarts pick up where they left off.

Dubbing multiple videos through the API

Prerequisites

  • Python 3.10+
  • Install deps: pip install requests boto3
  • A Synthesia API key (environment variable recommended)

Python CLI — file or folder (resumable)

  • Accepts a single file or a directory (recursively by default).
  • Supports video/mp4, video/webm, video/quicktime (mov).
  • Saves output files next to the source using pattern: <basename>.dubbed-<lang>.mp4.
  • Persists progress to ~/.synthesia_dub_state.json so you can stop/restart safely.
#!/usr/bin/env python3
import argparse
import json
import mimetypes
import os
import pathlib
import sys
import time
import hashlib
import traceback
from datetime import datetime, UTC

import boto3
import requests
from boto3.s3.transfer import TransferConfig


ALLOWED_VIDEO_MIMES = {
    "video/mp4",
    "video/webm",
    "video/quicktime",  # mov
}


def now():
    return time.strftime("%Y-%m-%d %H:%M:%S")


def log(msg: str):
    print(f"[{now()}] {msg}", flush=True)


def parse_args():
    p = argparse.ArgumentParser(description="Dub videos with Synthesia API (resumable, file or folder; S3 multipart).")
    p.add_argument("--api-key", required=True, help="Synthesia API key (paste directly).")
    p.add_argument("--path", dest="input_path", required=True,
                   help="Absolute path to a source video file OR a directory containing videos.")
    p.add_argument("--targets", required=True, help="Comma-separated target languages (e.g. fr-FR,es-ES).")
    p.add_argument("--source-language", default="en", help="Original language code (default: en).")
    p.add_argument("--title", default=None, help="Optional title prefix for the dubbed videos. Defaults to file name.")
    p.add_argument("--state-path", default=os.path.expanduser("~/.synthesia_dub_state.json"),
                   help="Where to store resumable job state (default: ~/.synthesia_dub_state.json).")
    p.add_argument("--force-new", action="store_true",
                   help="Ignore any saved state for jobs and start fresh (creates new asset(s)).")
    p.add_argument("--poll-interval", type=int, default=10, help="Seconds between polling attempts (default: 10).")
    p.add_argument("--recursive", dest="recursive", action="store_true", default=True,
                   help="Recurse into subfolders when PATH is a directory (default: on).")
    p.add_argument("--no-recursive", dest="recursive", action="store_false",
                   help="Do NOT recurse when PATH is a directory.")
    return p.parse_args()


# --------------------------
# State helpers (resumable)
# --------------------------
def load_state(state_path: str) -> dict:
    # Ensure parent exists; create empty file if missing so users can see it
    state_path = os.path.abspath(os.path.expanduser(state_path))
    pathlib.Path(state_path).parent.mkdir(parents=True, exist_ok=True)
    if not os.path.exists(state_path):
        with open(state_path, "w", encoding="utf-8") as f:
            json.dump({}, f)
        log(f"State file created: {state_path}")
        return {}
    try:
        with open(state_path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        # Corrupted state → back it up and start clean
        bak = state_path + ".bak"
        try:
            os.replace(state_path, bak)
            log(f"State file unreadable; backed up to {bak} and reinitialized.")
        except Exception:
            log("State file unreadable and backup failed; reinitializing.")
        with open(state_path, "w", encoding="utf-8") as f:
            json.dump({}, f)
        return {}


def save_state(state_path: str, state: dict):
    state_path = os.path.abspath(os.path.expanduser(state_path))
    pathlib.Path(state_path).parent.mkdir(parents=True, exist_ok=True)
    tmp = state_path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(state, f, indent=2)
    os.replace(tmp, state_path)


def job_key(source_path: str, src_lang: str, targets: list, title: str | None) -> str:
    # Stable identity for this dubbing request
    h = hashlib.sha256()
    h.update(os.path.abspath(source_path).encode("utf-8"))
    h.update(b"|")
    h.update(src_lang.encode("utf-8"))
    h.update(b"|")
    h.update(",".join(sorted(targets)).encode("utf-8"))
    h.update(b"|")
    h.update((title or "").encode("utf-8"))
    return h.hexdigest()


# --------------------------
# API helpers
# --------------------------
def get_asset(api_key: str, asset_id: str) -> dict:
    r = requests.get(
        f"https://api.synthesia.io/v2/assets/{asset_id}",
        headers={"Authorization": api_key, "accept": "application/json"},
        timeout=30,
    )
    if r.status_code != 200:
        raise RuntimeError(f"[Get asset] {r.status_code}: {r.text}")
    return r.json()


def create_asset(api_key: str, source_path: str, title: str) -> dict:
    content_type = mimetypes.guess_type(source_path)[0] or "video/mp4"
    payload = {
        "contentType": content_type,
        "configuration": {"name": "dubbing", "detectLanguage": True},
        "title": title or pathlib.Path(source_path).name,
    }
    log(f"Creating Synthesia asset for {os.path.basename(source_path)} …")
    r = requests.post(
        "https://api.synthesia.io/v2/assets",
        headers={"Authorization": api_key, "accept": "application/json"},
        json=payload,
        timeout=60,
    )
    if r.status_code != 201:
        raise RuntimeError(f"[Create asset] {r.status_code}: {r.text}")
    data = r.json()
    log(f"Asset created: id={data.get('id')}")
    return data


def upload_to_s3(upload_credentials: dict, source_path: str, content_type: str):
    s3 = boto3.client(
        "s3",
        aws_access_key_id=upload_credentials["accessKeyId"],
        aws_secret_access_key=upload_credentials["secretAccessKey"],
        aws_session_token=upload_credentials["sessionToken"],
        region_name="eu-west-1",
    )
    cfg = TransferConfig(
        multipart_threshold=8 * 1024 * 1024,
        multipart_chunksize=32 * 1024 * 1024,
        max_concurrency=6,
        use_threads=True,
    )
    bucket, key = upload_credentials["bucket"], upload_credentials["key"]
    log(f"Uploading {source_path} → s3://{bucket}/{key} …")
    s3.upload_file(source_path, bucket, key, ExtraArgs={"ContentType": content_type}, Config=cfg)
    log("Upload complete ✅")


def start_dubbing(api_key: str, asset_id: str, src_lang: str, targets: list, title: str) -> str:
    payload = {
        "sourceLanguage": src_lang,
        "targetLanguages": targets,
        "lipsyncEnabled": False,
        "videoDuration": "adaptive",
        "visibility": "private",
        "title": f"{title} (dubbed)" if title else "Dubbed video",
        "sourceAssetId": asset_id,
    }
    log(f"Starting dubbing job for {len(targets)} target(s)…")
    r = requests.post(
        "https://api.synthesia.io/v2/dubbing",
        headers={
            "Authorization": api_key,
            "accept": "application/json",
            "content-type": "application/json",
        },
        json=payload,
        timeout=60,
    )
    if r.status_code != 202:
        raise RuntimeError(f"[Start dubbing] {r.status_code}: {r.text}")
    data = r.json()
    imported_id = data["createdImportedAsset"]["id"]
    log(f"Dubbing started: imported_id={imported_id}")
    return imported_id


def poll_and_download(api_key: str, imported_id: str, targets: list, source_path: str, poll_interval: int = 10):
    def fetch_status():
        rr = requests.get(
            f"https://api.synthesia.io/v2/dubbing/{imported_id}",
            params={"targetLanguages": ",".join(targets)},
            headers={"Authorization": api_key, "accept": "application/json"},
            timeout=30,
        )
        rr.raise_for_status()
        return rr.json()

    def iter_assets(data):
        # Normalize payload → list of {language, downloadUrl}
        entries = data.get("dubbedAssets") or data.get("dubbedVideos") or data.get("dubbed") or []
        norm = []
        for dv in entries:
            lang = dv.get("language") or dv.get("locale") or "xx"
            url = dv.get("download") or dv.get("downloadUrl") or dv.get("url")
            if url:
                norm.append((lang, url))
        return norm

    log(f"Polling dubbing status every {poll_interval}s… (Ctrl-C to stop; you can resume later)")
    while True:
        data = fetch_status()
        status_raw = data.get("status", "")
        status = str(status_raw).strip().lower()
        log(f"Status: {status_raw}")

        if status in ("complete", "completed"):
            assets = iter_assets(data)
            if not assets:
                log(f"Complete but no downloadable entries. Keys: {list(data.keys())}")
                return

            out_dir = os.path.dirname(source_path)
            base_name = pathlib.Path(source_path).stem

            # Per-language download with refresh-on-403
            for lang, url in assets:
                out_path = os.path.join(out_dir, f"{base_name}.dubbed-{lang}.mp4")

                attempt = 0
                while True:
                    attempt += 1
                    try:
                        log(f"Downloading {lang} (attempt {attempt}) → {out_path}")
                        with requests.get(url, stream=True, timeout=60) as r:
                            if r.status_code in (403, 404) and attempt <= 3:
                                log(f"Received {r.status_code} for {lang}; refreshing signed URL…")
                                time.sleep(0.8)
                                # Refresh URLs and pick the matching language again
                                new_data = fetch_status()
                                new_assets = dict(iter_assets(new_data))
                                if lang in new_assets:
                                    url = new_assets[lang]
                                    continue
                            r.raise_for_status()
                            with open(out_path, "wb") as f:
                                for chunk in r.iter_content(chunk_size=1024 * 128):
                                    if chunk:
                                        f.write(chunk)
                        log(f"Saved: {out_path}")
                        break
                    except requests.HTTPError as e:
                        if attempt >= 3:
                            raise
                        log(f"HTTP error on {lang}: {e}. Retrying…")
                        time.sleep(1.2)

            log("All downloads complete ✅")
            return

        if status == "error":
            raise RuntimeError(json.dumps(data, indent=2))

        time.sleep(poll_interval)

# --------------------------
# File discovery
# --------------------------
def is_allowed_video(path: str) -> bool:
    mime, _ = mimetypes.guess_type(path)
    return (mime in ALLOWED_VIDEO_MIMES)


def discover_inputs(input_path: str, recursive: bool) -> list[str]:
    p = pathlib.Path(input_path)
    if p.is_file():
        return [str(p)]
    if not p.is_dir():
        raise FileNotFoundError(f"Path not found: {input_path}")

    files = []
    if recursive:
        it = p.rglob("*")
    else:
        it = p.glob("*")
    for f in it:
        if f.is_file() and is_allowed_video(str(f)):
            files.append(str(f))
    return files


# --------------------------
# Orchestrate a single file
# --------------------------
def process_one_file(
    api_key: str,
    source_path: str,
    src_lang: str,
    targets: list[str],
    title_prefix: str | None,
    state: dict,
    state_path: str,
    force_new: bool,
    poll_interval: int,
):
    if not os.path.isfile(source_path):
        raise FileNotFoundError(f"File not found: {source_path}")

    content_type = mimetypes.guess_type(source_path)[0] or "video/mp4"
    base_title = pathlib.Path(source_path).stem
    title = (title_prefix + " - " + base_title) if title_prefix else base_title

    key = job_key(source_path, src_lang, targets, title)
    job = None if force_new else state.get(key)

    # Resume path
    if job:
        log(f"[{os.path.basename(source_path)}] Found existing job state; attempting to resume.")
        asset_id = job.get("asset_id")
        imported_id = job.get("imported_id")

        if imported_id:
            log(f"[{os.path.basename(source_path)}] Resuming from imported_id={imported_id}")
            poll_and_download(api_key, imported_id, targets, source_path, poll_interval=poll_interval)

            state.pop(key, None)
            save_state(state_path, state)
            return

        if asset_id:
            log(f"[{os.path.basename(source_path)}] Validating existing asset_id={asset_id}")
            try:
                _ = get_asset(api_key, asset_id)
            except Exception as e:
                log(f"[{os.path.basename(source_path)}] Existing asset check failed: {e}. Creating a new asset.")
                job = None
        else:
            job = None
    else:
        log(f"[{os.path.basename(source_path)}] No prior job state found or --force-new; starting fresh.")

    # Fresh path
    if not job:
        asset = create_asset(api_key, source_path, title)
        asset_id = asset["id"]
        state[key] = {
            "asset_id": asset_id,
            "imported_id": None,
            "targets": targets,
            "title": title,
            "source_path": os.path.abspath(source_path),
            "created_at": datetime.now(UTC).isoformat(),
        }
        save_state(state_path, state)

        upload_credentials = asset.get("uploadCredentials")
        if not upload_credentials:
            raise RuntimeError("Missing uploadCredentials in asset response (required for S3 multipart upload).")

        try:
            upload_to_s3(upload_credentials, source_path, content_type)
        except Exception:
            log(f"[{os.path.basename(source_path)}] Upload failed. If credentials expired, re-run to create a fresh asset.")
            raise

    # Start dubbing if needed
    job = state.get(key)
    asset_id = job["asset_id"]
    if not job.get("imported_id"):
        imported_id = start_dubbing(api_key, asset_id, src_lang, targets, title)
        job["imported_id"] = imported_id
        job["started_dubbing_at"] = datetime.now(UTC).isoformat()
        state[key] = job
        save_state(state_path, state)
    else:
        imported_id = job["imported_id"]
        log(f"[{os.path.basename(source_path)}] Resuming polling for imported_id={imported_id}")

    # Poll & download
    poll_and_download(api_key, imported_id, targets, source_path, poll_interval=poll_interval)


    # Cleanup
    state.pop(key, None)
    save_state(state_path, state)


# --------------------------
# Main
# --------------------------
def main():
    args = parse_args()
    try:
        # Parse + normalize
        targets = [t.strip() for t in args.targets.split(",") if t.strip()]
        if not targets:
            raise ValueError("No valid target languages provided.")

        # Discover input files
        files = discover_inputs(args.input_path, recursive=args.recursive)
        if not files:
            raise FileNotFoundError("No matching video files found. "
                                    "Allowed types: video/mp4, video/webm, video/quicktime.")
        log(f"Discovered {len(files)} file(s).")

        # Load state once
        state = load_state(args.state_path)
        successes, failures = 0, 0

        # Process sequentially
        for idx, f in enumerate(files, 1):
            log(f"----- [{idx}/{len(files)}] {f} -----")
            try:
                process_one_file(
                    api_key=args.api_key,
                    source_path=f,
                    src_lang=args.source_language,
                    targets=targets,
                    title_prefix=args.title,
                    state=state,
                    state_path=args.state_path,
                    force_new=args.force_new,
                    poll_interval=args.poll_interval,
                )
                successes += 1
            except KeyboardInterrupt:
                log("Interrupted by user. Progress saved; re-run to resume remaining files.")
                sys.exit(130)
            except Exception as e:
                failures += 1
                log(f"❌ ERROR on {f}: {e}")
                traceback.print_exc()

        # Summary
        log(f"Done. Success: {successes}, Failed: {failures}")

        # Exit code reflects failures (useful for CI)
        sys.exit(0 if failures == 0 else 1)

    except Exception as e:
        log(f"❌ ERROR: {e}")
        traceback.print_exc()
        sys.exit(1)


if __name__ == "__main__":
    main()

Usage examples

 python synthesia_dub.py \
  --api-key "<SYNTHESIA_API_KEY>" \
  --path "/abs/path/to/video.mp4" \
  --targets "fr-FR,es-ES" \
  --source-language "en"

Output files are saved next to each source file using the pattern <basename>.dubbed-<lang>.mp4 (e.g., launch-video.dubbed-fr-FR.mp4).


CLI options reference

  • --api-key — Synthesia API key (required).
  • --path — file or directory to process (required).
  • --targets — comma-separated target language locales (e.g., fr-FR,es-ES) (required).
  • --source-language — original audio language (default: en).
  • --title — optional title prefix for the dubbed asset; the file name is appended automatically.
  • --state-path — state file for resumability (default: ~/.synthesia_dub_state.json).
  • --force-new — ignore saved state for this run and create new asset(s).
  • --poll-interval — seconds between status checks (default: 10).
  • --recursive / --no-recursive — control folder recursion.

Troubleshooting

SymptomLikely causeFix
401 UnauthorizedMissing or wrong API keyEnsure the Authorization header is present and correct on every call.
415 Unsupported Media TypeWrong MIME typeUse valid types (video/mp4, video/webm, video/quicktime).
ExpiredToken during S3 uploadTemporary credentials expiredCreate a new asset with POST /v2/assets and re-upload.
AuthorizationHeaderMalformed (S3)Region mismatchUse region eu-west-1 for the temporary bucket (the script sets this).
Polling never completesLong job or wrong idMake sure you poll GET /v2/dubbing/{imported_id} returned by start dubbing.
No download URLStill processingKeep polling until status=COMPLETE.
Script resumes wrong fileTitle/args changed between runsAvoid changing --targets, --source-language, or --title mid-run; these are part of the job key.