#!/usr/bin/env python3 import re import sys INSERT_RE = re.compile( r"(?PINSERT\s+INTO\s+[^;]*?\()(?P[^)]*)(?P\)\s+VALUES)", re.IGNORECASE | re.DOTALL, ) TABLE_RE = re.compile( r"(?PINSERT\s+INTO\s+)(?P(?:\"[^\"]+\"\.|[A-Za-z_][\w$]*\.)?\"[^\"]+\"|(?:\"[^\"]+\"\.|[A-Za-z_][\w$]*\.)?[A-Za-z_][\w$]*)", re.IGNORECASE, ) CREATE_DB_RE = re.compile(r"^CREATE\s+DATABASE\s+.+?;$", re.IGNORECASE | re.MULTILINE) CONNECT_RE = re.compile(r"^\\connect\\s+.+?$", re.IGNORECASE | re.MULTILINE) TABLE_NAME_MAP = { "ModelType": "model_types", "TypeMachine": "type_machines", "TypeMachineComponentRequirement": "type_machine_component_requirements", "TypeMachinePieceRequirement": "type_machine_piece_requirements", "TypeMachineProductRequirement": "type_machine_product_requirements", "MachinePieceLink": "machine_piece_links", "MachineComponentLink": "machine_component_links", "MachineProductLink": "machine_product_links", "Machine": "machines", "Product": "products", "Piece": "pieces", "Composant": "composants", "Profile": "profiles", "CustomField": "custom_fields", "CustomFieldValue": "custom_field_values", "Document": "documents", "Constructeur": "constructeurs", "Site": "sites", } SKIP_TABLES = { "_prisma_migrations", } def to_snake(name: str) -> str: out = [] length = len(name) for i, ch in enumerate(name): if ch.isupper(): prev = name[i - 1] if i > 0 else "" nxt = name[i + 1] if i + 1 < length else "" if i > 0 and (prev.islower() or prev.isdigit() or (prev.isupper() and nxt.islower())): out.append("_") out.append(ch.lower()) else: out.append(ch) return "".join(out) def to_lower_compact(name: str) -> str: return name.replace("_", "").lower() def remap_table(ident: str, mode: str) -> str: mapped = TABLE_NAME_MAP.get(ident) if mapped is not None: return mapped if mode == "snake": return to_snake(ident) if mode == "lower": return to_lower_compact(ident) raise ValueError(f"Unsupported mode: {mode}") def extract_table_ident(prefix: str) -> str | None: match = TABLE_RE.search(prefix) if not match: return None table = match.group("table") # Handle quoted schema like "public"."TableName" if '"."' in table: parts = table.split('"."', 1) ident = parts[1].strip('"') elif "." in table: _, ident = table.split(".", 1) ident = ident.strip('"') else: ident = table.strip('"') return ident def normalize_table_name(prefix: str, mode: str) -> str: def repl(match: re.Match[str]) -> str: table = match.group("table") schema = "" ident = table # Handle quoted schema like "public"."TableName" if '"."' in table: parts = table.split('"."', 1) schema_name = parts[0].strip('"') ident = parts[1].strip('"') schema = f'"{schema_name}".' elif "." in table: schema_part, ident = table.split(".", 1) schema_name = schema_part.strip('"') schema = f'"{schema_name}".' ident = ident.strip('"') else: ident = table.strip('"') mapped = remap_table(ident, mode) return f'{match.group("before")}{schema}"{mapped}"' return TABLE_RE.sub(repl, prefix) def remap_columns(cols: str, mode: str) -> str: def repl(match: re.Match[str]) -> str: name = match.group(1) if mode == "snake": if any(ch.isupper() for ch in name): return f"\"{to_snake(name)}\"" return match.group(0) if mode == "lower": return f"\"{to_lower_compact(name)}\"" raise ValueError(f"Unsupported mode: {mode}") return re.sub(r"\"([^\"]+)\"", repl, cols) def normalize_dump(sql: str, mode: str) -> str: sql = CREATE_DB_RE.sub("", sql) sql = CONNECT_RE.sub("", sql) def repl(match: re.Match[str]) -> str: raw_prefix = match.group("prefix") ident = extract_table_ident(raw_prefix) if ident is not None: mapped = remap_table(ident, mode) if ident in SKIP_TABLES or mapped in SKIP_TABLES: return "" prefix = normalize_table_name(raw_prefix, mode) cols = remap_columns(match.group("cols"), mode) return f"{prefix}{cols}{match.group('suffix')}" return INSERT_RE.sub(repl, sql) def main() -> int: if len(sys.argv) not in (3, 4): print("Usage: scripts/normalize-dump.py [--snake|--lower]", file=sys.stderr) return 1 src = sys.argv[1] dst = sys.argv[2] mode = "lower" if len(sys.argv) == 4: if sys.argv[3] == "--snake": mode = "snake" elif sys.argv[3] == "--lower": mode = "lower" else: print("Invalid mode. Use --snake or --lower.", file=sys.stderr) return 1 with open(src, "r", encoding="utf-8") as f: data = f.read() normalized = normalize_dump(data, mode) with open(dst, "w", encoding="utf-8") as f: f.write(normalized) return 0 if __name__ == "__main__": raise SystemExit(main())