From 0b9aaef38eaeb89b508dc64157c1032169617d03 Mon Sep 17 00:00:00 2001 From: tristan Date: Mon, 15 Jun 2026 10:20:53 +0200 Subject: [PATCH] fix(transport) : securiser la synchro QUALIMAT (revue ERP-39) - garde-fou anti-desactivation de masse : fetchRemote leve sur un payload non-list (2xx inattendu) et la commande abandonne sans ecriture si aucune ligne exploitable, au lieu de soft-delete tout le referentiel - verrou consultatif pg_try_advisory_lock pour serialiser les runs (anti-overlap) - deduplication par SIRET dans le mapper (rows_upserted = transporteurs distincts) - upsert par paquets (INSERT groupe) au lieu d'un aller-retour par ligne - migration des tables qualimat deplacee vers le namespace modulaire Transport (+ enregistrement du path dans doctrine_migrations.yaml) - tests : deduplication + abandon sur source vide --- config/packages/doctrine_migrations.yaml | 1 + .../Qualimat/QualimatRowMapper.php | 14 +- .../Console/SyncQualimatCommand.php | 151 +++++++++++++----- .../Migrations}/Version20260612150000.php | 7 +- .../Qualimat/QualimatRowMapperTest.php | 15 ++ .../Console/SyncQualimatCommandTest.php | 25 +++ 6 files changed, 169 insertions(+), 44 deletions(-) rename {migrations => src/Module/Transport/Infrastructure/Doctrine/Migrations}/Version20260612150000.php (95%) diff --git a/config/packages/doctrine_migrations.yaml b/config/packages/doctrine_migrations.yaml index c7bb23a..ef45b16 100644 --- a/config/packages/doctrine_migrations.yaml +++ b/config/packages/doctrine_migrations.yaml @@ -2,4 +2,5 @@ doctrine_migrations: migrations_paths: 'DoctrineMigrations': '%kernel.project_dir%/migrations' 'App\Module\Core\Infrastructure\Doctrine\Migrations': '%kernel.project_dir%/src/Module/Core/Infrastructure/Doctrine/Migrations' + 'App\Module\Transport\Infrastructure\Doctrine\Migrations': '%kernel.project_dir%/src/Module/Transport/Infrastructure/Doctrine/Migrations' enable_profiler: false diff --git a/src/Module/Transport/Application/Qualimat/QualimatRowMapper.php b/src/Module/Transport/Application/Qualimat/QualimatRowMapper.php index 91bb31a..c0e4943 100644 --- a/src/Module/Transport/Application/Qualimat/QualimatRowMapper.php +++ b/src/Module/Transport/Application/Qualimat/QualimatRowMapper.php @@ -13,7 +13,11 @@ final class QualimatRowMapper { /** * Mappe un lot d'items. Les items sans SIRET exploitable sont ignores et - * comptes a part (cf. `rows_skipped` du journal). + * comptes a part (cf. `rows_skipped` du journal). Les doublons de SIRET + * (source "sale" : memes chiffres a separateurs pres) sont fusionnes, + * derniere occurrence gagnante — l'upsert ne verrait qu'une ligne de toute + * facon, et le compte `rows_upserted` reflete ainsi les transporteurs + * distincts. * * @param array> $items * @@ -21,7 +25,7 @@ final class QualimatRowMapper */ public static function mapMany(array $items): array { - $rows = []; + $bySiret = []; $skipped = 0; foreach ($items as $item) { @@ -33,10 +37,12 @@ final class QualimatRowMapper continue; } - $rows[] = $row; + // Cle = SIRET normalise : une occurrence ulterieure ecrase la + // precedente (derniere gagnante). + $bySiret[$row['siret']] = $row; } - return ['rows' => $rows, 'skipped' => $skipped]; + return ['rows' => array_values($bySiret), 'skipped' => $skipped]; } /** diff --git a/src/Module/Transport/Infrastructure/Console/SyncQualimatCommand.php b/src/Module/Transport/Infrastructure/Console/SyncQualimatCommand.php index db1bde4..d03d2a4 100644 --- a/src/Module/Transport/Infrastructure/Console/SyncQualimatCommand.php +++ b/src/Module/Transport/Infrastructure/Console/SyncQualimatCommand.php @@ -41,6 +41,14 @@ final class SyncQualimatCommand extends Command private const string API_URL = 'https://www.qualimat.org/wp-json/qualimat/v1/getOperateurs'; private const int DEFAULT_PPP = 10000; + // Cle arbitraire (mais stable) du verrou consultatif Postgres serialisant + // les runs de `app:qualimat:sync` entre eux. Propre a cette commande. + private const int ADVISORY_LOCK_KEY = 3_900_000_039; + + // Nombre de lignes par INSERT groupe. 10 parametres/ligne, large marge sous + // la limite Postgres de 65535 parametres par requete. + private const int UPSERT_CHUNK = 1000; + public function __construct( private readonly Connection $connection, private readonly HttpClientInterface $httpClient, @@ -64,9 +72,30 @@ final class SyncQualimatCommand extends Command $dryRun = (bool) $input->getOption('dry-run'); $file = $input->getOption('file'); + // Verrou consultatif (session) : empeche deux runs de se chevaucher + // (cron qui deborde, invocation manuelle parallele). Sans lui, le run le + // plus tardif desactiverait les lignes que l'autre vient d'inserer. + if (!$this->acquireLock()) { + $io->error('Une synchronisation QUALIMAT est deja en cours (verrou non disponible).'); + + return Command::FAILURE; + } + + try { + return $this->doSync($io, $ppp, $dryRun, $file); + } finally { + $this->releaseLock(); + } + } + + /** + * Coeur de la synchronisation, execute sous verrou consultatif. + */ + private function doSync(SymfonyStyle $io, int $ppp, bool $dryRun, ?string $file): int + { // 1. Recuperation des items (fichier local ou API). try { - $items = null !== $file ? $this->readLocal((string) $file) : $this->fetchRemote($ppp); + $items = null !== $file ? $this->readLocal($file) : $this->fetchRemote($ppp); } catch (Throwable $e) { $io->error('Recuperation impossible : '.$e->getMessage()); @@ -81,7 +110,8 @@ final class SyncQualimatCommand extends Command $io->warning(sprintf("Le nombre d'items recus (%d) egale --ppp : resultat potentiellement tronque, augmente --ppp.", $ppp)); } - // 2. Mapping / normalisation (les items sans SIRET sont ignores). + // 2. Mapping / normalisation (les items sans SIRET sont ignores, les + // doublons de SIRET sont fusionnes : derniere occurrence gagnante). ['rows' => $rows, 'skipped' => $skipped] = QualimatRowMapper::mapMany($items); $io->writeln(sprintf('%d lignes exploitables, %d ignorees (sans SIRET).', count($rows), $skipped)); @@ -92,6 +122,15 @@ final class SyncQualimatCommand extends Command return Command::SUCCESS; } + // Garde-fou « zero ligne » : une source vide (incident amont, liste [] + // legitime) ne doit JAMAIS atteindre le soft-delete, qui desactiverait + // tout le referentiel. On abandonne sans rien ecrire. + if ([] === $rows) { + $io->error('Aucune ligne exploitable : synchronisation abandonnee (desactivation de masse evitee).'); + + return Command::FAILURE; + } + // 3. Sync transactionnelle : upsert -> soft-delete -> journal. $run = new DateTimeImmutable()->format('Y-m-d H:i:s.u'); @@ -114,6 +153,23 @@ final class SyncQualimatCommand extends Command return Command::SUCCESS; } + /** + * Tente de prendre le verrou consultatif de session. Retourne false si un + * autre run le detient deja (Postgres `pg_try_advisory_lock`, non bloquant). + */ + private function acquireLock(): bool + { + return (bool) $this->connection->fetchOne('SELECT pg_try_advisory_lock(:key)', ['key' => self::ADVISORY_LOCK_KEY]); + } + + /** + * Relache le verrou consultatif pris par acquireLock(). + */ + private function releaseLock(): void + { + $this->connection->executeStatement('SELECT pg_advisory_unlock(:key)', ['key' => self::ADVISORY_LOCK_KEY]); + } + /** * Rejoue l'appel GET de l'API QUALIMAT et retourne le tableau d'items. * @@ -129,7 +185,15 @@ final class SyncQualimatCommand extends Command // toArray() leve une exception sur un statut non-2xx ou un corps non-JSON. $data = $response->toArray(); - return array_is_list($data) ? $data : []; + // Un 2xx au corps inattendu (objet d'erreur, enveloppe {"data":[...]}, etc.) + // ne doit PAS etre interprete comme « 0 transporteur » : ce serait masquer + // un changement de contrat de l'API et declencher la desactivation de masse + // (cf. garde-fou « zero ligne » dans execute()). On echoue franchement. + if (!array_is_list($data)) { + throw new RuntimeException("Reponse inattendue de l'API QUALIMAT : un tableau d'items etait attendu."); + } + + return $data; } /** @@ -155,47 +219,60 @@ final class SyncQualimatCommand extends Command } /** - * Upsert de toutes les lignes valides (cle naturelle = siret). Marque - * is_active=TRUE et tamponne last_synced_at avec le run courant. + * Upsert de toutes les lignes valides (cle naturelle = siret) par paquets + * (INSERT groupe), au lieu d'un aller-retour par ligne. Marque is_active=TRUE + * et tamponne last_synced_at avec le run courant. Les lignes etant deja + * dedoublonnees par SIRET en amont, le compte retourne = transporteurs + * distincts effectivement synchronises. * * @param list> $rows */ private function upsertAll(array $rows, string $run): int { - $sql = <<<'SQL' - INSERT INTO qualimat_carrier - (siret, name, address, postal_code, city, phone, department, status, validity_date, is_active, last_synced_at) - VALUES - (:siret, :name, :address, :postal_code, :city, :phone, :department, :status, :validity_date, TRUE, :run) - ON CONFLICT (siret) DO UPDATE SET - name = EXCLUDED.name, - address = EXCLUDED.address, - postal_code = EXCLUDED.postal_code, - city = EXCLUDED.city, - phone = EXCLUDED.phone, - department = EXCLUDED.department, - status = EXCLUDED.status, - validity_date = EXCLUDED.validity_date, - is_active = TRUE, - last_synced_at = EXCLUDED.last_synced_at - SQL; - $count = 0; - foreach ($rows as $r) { - $this->connection->executeStatement($sql, [ - 'siret' => $r['siret'], - 'name' => $r['name'], - 'address' => $r['address'], - 'postal_code' => $r['postal_code'], - 'city' => $r['city'], - 'phone' => $r['phone'], - 'department' => $r['department'], - 'status' => $r['status'], - 'validity_date' => $r['validity_date'], - 'run' => $run, - ]); - ++$count; + foreach (array_chunk($rows, self::UPSERT_CHUNK) as $chunk) { + $placeholders = []; + $params = []; + + foreach ($chunk as $r) { + // 10 valeurs liees + is_active force a TRUE (litteral). + $placeholders[] = '(?, ?, ?, ?, ?, ?, ?, ?, ?, TRUE, ?)'; + $params[] = $r['siret']; + $params[] = $r['name']; + $params[] = $r['address']; + $params[] = $r['postal_code']; + $params[] = $r['city']; + $params[] = $r['phone']; + $params[] = $r['department']; + $params[] = $r['status']; + $params[] = $r['validity_date']; + $params[] = $run; + } + + $sql = sprintf( + <<<'SQL' + INSERT INTO qualimat_carrier + (siret, name, address, postal_code, city, phone, department, status, validity_date, is_active, last_synced_at) + VALUES + %s + ON CONFLICT (siret) DO UPDATE SET + name = EXCLUDED.name, + address = EXCLUDED.address, + postal_code = EXCLUDED.postal_code, + city = EXCLUDED.city, + phone = EXCLUDED.phone, + department = EXCLUDED.department, + status = EXCLUDED.status, + validity_date = EXCLUDED.validity_date, + is_active = TRUE, + last_synced_at = EXCLUDED.last_synced_at + SQL, + implode(",\n ", $placeholders), + ); + + $this->connection->executeStatement($sql, $params); + $count += count($chunk); } return $count; diff --git a/migrations/Version20260612150000.php b/src/Module/Transport/Infrastructure/Doctrine/Migrations/Version20260612150000.php similarity index 95% rename from migrations/Version20260612150000.php rename to src/Module/Transport/Infrastructure/Doctrine/Migrations/Version20260612150000.php index f82f5d7..17a5c8f 100644 --- a/migrations/Version20260612150000.php +++ b/src/Module/Transport/Infrastructure/Doctrine/Migrations/Version20260612150000.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace DoctrineMigrations; +namespace App\Module\Transport\Infrastructure\Doctrine\Migrations; use Doctrine\DBAL\Schema\Schema; use Doctrine\Migrations\AbstractMigration; @@ -12,8 +12,9 @@ use Doctrine\Migrations\AbstractMigration; * * Tables alimentees par la commande de synchronisation `app:qualimat:sync` * (upsert sur le SIRET + soft-delete des absents + journal). Aucune FK - * cross-module (referentiel autonome) : migration posee au namespace racine - * `DoctrineMigrations`, comme les autres migrations de creation de tables. + * cross-module (referentiel autonome) : migration au namespace modulaire + * Transport. Tables autonomes, sans dependance d'ordre vis-a-vis des autres + * migrations, donc insensible au tri cross-namespace de Doctrine Migrations. */ final class Version20260612150000 extends AbstractMigration { diff --git a/tests/Module/Transport/Application/Qualimat/QualimatRowMapperTest.php b/tests/Module/Transport/Application/Qualimat/QualimatRowMapperTest.php index f27d390..44ea13c 100644 --- a/tests/Module/Transport/Application/Qualimat/QualimatRowMapperTest.php +++ b/tests/Module/Transport/Application/Qualimat/QualimatRowMapperTest.php @@ -73,6 +73,21 @@ final class QualimatRowMapperTest extends TestCase self::assertSame(2, $result['skipped']); } + public function testMapManyDeduplicatesBySiretLastWins(): void + { + // Memes chiffres a separateurs pres : un seul transporteur, derniere + // occurrence gagnante (le compte ne doit pas surcompter les doublons). + $result = QualimatRowMapper::mapMany([ + ['Nom' => 'PREMIER', 'Siret' => '111 111 111 00011', 'Statut' => 'Audité'], + ['Nom' => 'DERNIER', 'Siret' => '11111111100011', 'Statut' => 'Valide'], + ]); + + self::assertCount(1, $result['rows']); + self::assertSame(0, $result['skipped']); + self::assertSame('DERNIER', $result['rows'][0]['name']); + self::assertSame('Valide', $result['rows'][0]['status']); + } + public function testEmptyOptionalFieldsBecomeNull(): void { $row = QualimatRowMapper::mapOne([ diff --git a/tests/Module/Transport/Infrastructure/Console/SyncQualimatCommandTest.php b/tests/Module/Transport/Infrastructure/Console/SyncQualimatCommandTest.php index 7084c00..ad3d497 100644 --- a/tests/Module/Transport/Infrastructure/Console/SyncQualimatCommandTest.php +++ b/tests/Module/Transport/Infrastructure/Console/SyncQualimatCommandTest.php @@ -7,6 +7,7 @@ namespace App\Tests\Module\Transport\Infrastructure\Console; use Doctrine\DBAL\Connection; use Symfony\Bundle\FrameworkBundle\Console\Application; use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; +use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Tester\CommandTester; use const JSON_THROW_ON_ERROR; @@ -108,6 +109,30 @@ final class SyncQualimatCommandTest extends KernelTestCase self::assertSame(0, (int) $log['rows_skipped']); } + public function testEmptySourceAbortsWithoutMassDeactivation(): void + { + // Premier run : 2 transporteurs actifs. + $a = ['Nom' => 'A', 'Siret' => '111 111 111 00011', 'Validite' => '01/01/2030', 'Statut' => 'Audité']; + $b = ['Nom' => 'B', 'Siret' => '222 222 222 00022', 'Validite' => '01/01/2030', 'Statut' => 'Audité']; + $this->runSync([$a, $b])->assertCommandIsSuccessful(); + self::assertSame(2, $this->countRows('SELECT COUNT(*) FROM qualimat_carrier WHERE is_active = TRUE')); + + // Source ne contenant que des items inexploitables (zero ligne mappee) : + // la commande doit ECHOUER sans toucher le referentiel (pas de soft-delete + // de masse) et sans journaliser de run. + $logsBefore = $this->countRows('SELECT COUNT(*) FROM qualimat_sync_log'); + $tester = $this->runSync([ + ['Nom' => 'SANS SIRET 1', 'Siret' => null], + ['Nom' => 'SANS SIRET 2', 'Siret' => ' '], + ]); + + self::assertSame(Command::FAILURE, $tester->getStatusCode()); + // Les 2 transporteurs restent ACTIFS (aucune desactivation de masse). + self::assertSame(2, $this->countRows('SELECT COUNT(*) FROM qualimat_carrier WHERE is_active = TRUE')); + // Aucun journal supplementaire (abandon avant la transaction). + self::assertSame($logsBefore, $this->countRows('SELECT COUNT(*) FROM qualimat_sync_log')); + } + /** * @param array> $items */