fix(transport) : securiser la synchro QUALIMAT (revue ERP-39)
- garde-fou anti-desactivation de masse : fetchRemote leve sur un payload non-list (2xx inattendu) et la commande abandonne sans ecriture si aucune ligne exploitable, au lieu de soft-delete tout le referentiel - verrou consultatif pg_try_advisory_lock pour serialiser les runs (anti-overlap) - deduplication par SIRET dans le mapper (rows_upserted = transporteurs distincts) - upsert par paquets (INSERT groupe) au lieu d'un aller-retour par ligne - migration des tables qualimat deplacee vers le namespace modulaire Transport (+ enregistrement du path dans doctrine_migrations.yaml) - tests : deduplication + abandon sur source vide
This commit is contained in:
@@ -13,7 +13,11 @@ final class QualimatRowMapper
|
||||
{
|
||||
/**
|
||||
* Mappe un lot d'items. Les items sans SIRET exploitable sont ignores et
|
||||
* comptes a part (cf. `rows_skipped` du journal).
|
||||
* comptes a part (cf. `rows_skipped` du journal). Les doublons de SIRET
|
||||
* (source "sale" : memes chiffres a separateurs pres) sont fusionnes,
|
||||
* derniere occurrence gagnante — l'upsert ne verrait qu'une ligne de toute
|
||||
* facon, et le compte `rows_upserted` reflete ainsi les transporteurs
|
||||
* distincts.
|
||||
*
|
||||
* @param array<int, array<string, mixed>> $items
|
||||
*
|
||||
@@ -21,7 +25,7 @@ final class QualimatRowMapper
|
||||
*/
|
||||
public static function mapMany(array $items): array
|
||||
{
|
||||
$rows = [];
|
||||
$bySiret = [];
|
||||
$skipped = 0;
|
||||
|
||||
foreach ($items as $item) {
|
||||
@@ -33,10 +37,12 @@ final class QualimatRowMapper
|
||||
continue;
|
||||
}
|
||||
|
||||
$rows[] = $row;
|
||||
// Cle = SIRET normalise : une occurrence ulterieure ecrase la
|
||||
// precedente (derniere gagnante).
|
||||
$bySiret[$row['siret']] = $row;
|
||||
}
|
||||
|
||||
return ['rows' => $rows, 'skipped' => $skipped];
|
||||
return ['rows' => array_values($bySiret), 'skipped' => $skipped];
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -41,6 +41,14 @@ final class SyncQualimatCommand extends Command
|
||||
private const string API_URL = 'https://www.qualimat.org/wp-json/qualimat/v1/getOperateurs';
|
||||
private const int DEFAULT_PPP = 10000;
|
||||
|
||||
// Cle arbitraire (mais stable) du verrou consultatif Postgres serialisant
|
||||
// les runs de `app:qualimat:sync` entre eux. Propre a cette commande.
|
||||
private const int ADVISORY_LOCK_KEY = 3_900_000_039;
|
||||
|
||||
// Nombre de lignes par INSERT groupe. 10 parametres/ligne, large marge sous
|
||||
// la limite Postgres de 65535 parametres par requete.
|
||||
private const int UPSERT_CHUNK = 1000;
|
||||
|
||||
public function __construct(
|
||||
private readonly Connection $connection,
|
||||
private readonly HttpClientInterface $httpClient,
|
||||
@@ -64,9 +72,30 @@ final class SyncQualimatCommand extends Command
|
||||
$dryRun = (bool) $input->getOption('dry-run');
|
||||
$file = $input->getOption('file');
|
||||
|
||||
// Verrou consultatif (session) : empeche deux runs de se chevaucher
|
||||
// (cron qui deborde, invocation manuelle parallele). Sans lui, le run le
|
||||
// plus tardif desactiverait les lignes que l'autre vient d'inserer.
|
||||
if (!$this->acquireLock()) {
|
||||
$io->error('Une synchronisation QUALIMAT est deja en cours (verrou non disponible).');
|
||||
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
try {
|
||||
return $this->doSync($io, $ppp, $dryRun, $file);
|
||||
} finally {
|
||||
$this->releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Coeur de la synchronisation, execute sous verrou consultatif.
|
||||
*/
|
||||
private function doSync(SymfonyStyle $io, int $ppp, bool $dryRun, ?string $file): int
|
||||
{
|
||||
// 1. Recuperation des items (fichier local ou API).
|
||||
try {
|
||||
$items = null !== $file ? $this->readLocal((string) $file) : $this->fetchRemote($ppp);
|
||||
$items = null !== $file ? $this->readLocal($file) : $this->fetchRemote($ppp);
|
||||
} catch (Throwable $e) {
|
||||
$io->error('Recuperation impossible : '.$e->getMessage());
|
||||
|
||||
@@ -81,7 +110,8 @@ final class SyncQualimatCommand extends Command
|
||||
$io->warning(sprintf("Le nombre d'items recus (%d) egale --ppp : resultat potentiellement tronque, augmente --ppp.", $ppp));
|
||||
}
|
||||
|
||||
// 2. Mapping / normalisation (les items sans SIRET sont ignores).
|
||||
// 2. Mapping / normalisation (les items sans SIRET sont ignores, les
|
||||
// doublons de SIRET sont fusionnes : derniere occurrence gagnante).
|
||||
['rows' => $rows, 'skipped' => $skipped] = QualimatRowMapper::mapMany($items);
|
||||
$io->writeln(sprintf('%d lignes exploitables, %d ignorees (sans SIRET).', count($rows), $skipped));
|
||||
|
||||
@@ -92,6 +122,15 @@ final class SyncQualimatCommand extends Command
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
|
||||
// Garde-fou « zero ligne » : une source vide (incident amont, liste []
|
||||
// legitime) ne doit JAMAIS atteindre le soft-delete, qui desactiverait
|
||||
// tout le referentiel. On abandonne sans rien ecrire.
|
||||
if ([] === $rows) {
|
||||
$io->error('Aucune ligne exploitable : synchronisation abandonnee (desactivation de masse evitee).');
|
||||
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
// 3. Sync transactionnelle : upsert -> soft-delete -> journal.
|
||||
$run = new DateTimeImmutable()->format('Y-m-d H:i:s.u');
|
||||
|
||||
@@ -114,6 +153,23 @@ final class SyncQualimatCommand extends Command
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tente de prendre le verrou consultatif de session. Retourne false si un
|
||||
* autre run le detient deja (Postgres `pg_try_advisory_lock`, non bloquant).
|
||||
*/
|
||||
private function acquireLock(): bool
|
||||
{
|
||||
return (bool) $this->connection->fetchOne('SELECT pg_try_advisory_lock(:key)', ['key' => self::ADVISORY_LOCK_KEY]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Relache le verrou consultatif pris par acquireLock().
|
||||
*/
|
||||
private function releaseLock(): void
|
||||
{
|
||||
$this->connection->executeStatement('SELECT pg_advisory_unlock(:key)', ['key' => self::ADVISORY_LOCK_KEY]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rejoue l'appel GET de l'API QUALIMAT et retourne le tableau d'items.
|
||||
*
|
||||
@@ -129,7 +185,15 @@ final class SyncQualimatCommand extends Command
|
||||
// toArray() leve une exception sur un statut non-2xx ou un corps non-JSON.
|
||||
$data = $response->toArray();
|
||||
|
||||
return array_is_list($data) ? $data : [];
|
||||
// Un 2xx au corps inattendu (objet d'erreur, enveloppe {"data":[...]}, etc.)
|
||||
// ne doit PAS etre interprete comme « 0 transporteur » : ce serait masquer
|
||||
// un changement de contrat de l'API et declencher la desactivation de masse
|
||||
// (cf. garde-fou « zero ligne » dans execute()). On echoue franchement.
|
||||
if (!array_is_list($data)) {
|
||||
throw new RuntimeException("Reponse inattendue de l'API QUALIMAT : un tableau d'items etait attendu.");
|
||||
}
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -155,47 +219,60 @@ final class SyncQualimatCommand extends Command
|
||||
}
|
||||
|
||||
/**
|
||||
* Upsert de toutes les lignes valides (cle naturelle = siret). Marque
|
||||
* is_active=TRUE et tamponne last_synced_at avec le run courant.
|
||||
* Upsert de toutes les lignes valides (cle naturelle = siret) par paquets
|
||||
* (INSERT groupe), au lieu d'un aller-retour par ligne. Marque is_active=TRUE
|
||||
* et tamponne last_synced_at avec le run courant. Les lignes etant deja
|
||||
* dedoublonnees par SIRET en amont, le compte retourne = transporteurs
|
||||
* distincts effectivement synchronises.
|
||||
*
|
||||
* @param list<array<string, mixed>> $rows
|
||||
*/
|
||||
private function upsertAll(array $rows, string $run): int
|
||||
{
|
||||
$sql = <<<'SQL'
|
||||
INSERT INTO qualimat_carrier
|
||||
(siret, name, address, postal_code, city, phone, department, status, validity_date, is_active, last_synced_at)
|
||||
VALUES
|
||||
(:siret, :name, :address, :postal_code, :city, :phone, :department, :status, :validity_date, TRUE, :run)
|
||||
ON CONFLICT (siret) DO UPDATE SET
|
||||
name = EXCLUDED.name,
|
||||
address = EXCLUDED.address,
|
||||
postal_code = EXCLUDED.postal_code,
|
||||
city = EXCLUDED.city,
|
||||
phone = EXCLUDED.phone,
|
||||
department = EXCLUDED.department,
|
||||
status = EXCLUDED.status,
|
||||
validity_date = EXCLUDED.validity_date,
|
||||
is_active = TRUE,
|
||||
last_synced_at = EXCLUDED.last_synced_at
|
||||
SQL;
|
||||
|
||||
$count = 0;
|
||||
|
||||
foreach ($rows as $r) {
|
||||
$this->connection->executeStatement($sql, [
|
||||
'siret' => $r['siret'],
|
||||
'name' => $r['name'],
|
||||
'address' => $r['address'],
|
||||
'postal_code' => $r['postal_code'],
|
||||
'city' => $r['city'],
|
||||
'phone' => $r['phone'],
|
||||
'department' => $r['department'],
|
||||
'status' => $r['status'],
|
||||
'validity_date' => $r['validity_date'],
|
||||
'run' => $run,
|
||||
]);
|
||||
++$count;
|
||||
foreach (array_chunk($rows, self::UPSERT_CHUNK) as $chunk) {
|
||||
$placeholders = [];
|
||||
$params = [];
|
||||
|
||||
foreach ($chunk as $r) {
|
||||
// 10 valeurs liees + is_active force a TRUE (litteral).
|
||||
$placeholders[] = '(?, ?, ?, ?, ?, ?, ?, ?, ?, TRUE, ?)';
|
||||
$params[] = $r['siret'];
|
||||
$params[] = $r['name'];
|
||||
$params[] = $r['address'];
|
||||
$params[] = $r['postal_code'];
|
||||
$params[] = $r['city'];
|
||||
$params[] = $r['phone'];
|
||||
$params[] = $r['department'];
|
||||
$params[] = $r['status'];
|
||||
$params[] = $r['validity_date'];
|
||||
$params[] = $run;
|
||||
}
|
||||
|
||||
$sql = sprintf(
|
||||
<<<'SQL'
|
||||
INSERT INTO qualimat_carrier
|
||||
(siret, name, address, postal_code, city, phone, department, status, validity_date, is_active, last_synced_at)
|
||||
VALUES
|
||||
%s
|
||||
ON CONFLICT (siret) DO UPDATE SET
|
||||
name = EXCLUDED.name,
|
||||
address = EXCLUDED.address,
|
||||
postal_code = EXCLUDED.postal_code,
|
||||
city = EXCLUDED.city,
|
||||
phone = EXCLUDED.phone,
|
||||
department = EXCLUDED.department,
|
||||
status = EXCLUDED.status,
|
||||
validity_date = EXCLUDED.validity_date,
|
||||
is_active = TRUE,
|
||||
last_synced_at = EXCLUDED.last_synced_at
|
||||
SQL,
|
||||
implode(",\n ", $placeholders),
|
||||
);
|
||||
|
||||
$this->connection->executeStatement($sql, $params);
|
||||
$count += count($chunk);
|
||||
}
|
||||
|
||||
return $count;
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Module\Transport\Infrastructure\Doctrine\Migrations;
|
||||
|
||||
use Doctrine\DBAL\Schema\Schema;
|
||||
use Doctrine\Migrations\AbstractMigration;
|
||||
|
||||
/**
|
||||
* ERP-39 (Module Transport) : referentiel des transporteurs agrees QUALIMAT.
|
||||
*
|
||||
* Tables alimentees par la commande de synchronisation `app:qualimat:sync`
|
||||
* (upsert sur le SIRET + soft-delete des absents + journal). Aucune FK
|
||||
* cross-module (referentiel autonome) : migration au namespace modulaire
|
||||
* Transport. Tables autonomes, sans dependance d'ordre vis-a-vis des autres
|
||||
* migrations, donc insensible au tri cross-namespace de Doctrine Migrations.
|
||||
*/
|
||||
final class Version20260612150000 extends AbstractMigration
|
||||
{
|
||||
public function getDescription(): string
|
||||
{
|
||||
return 'ERP-39 : tables qualimat_carrier + qualimat_sync_log (referentiel transporteurs QUALIMAT, synchro console).';
|
||||
}
|
||||
|
||||
public function up(Schema $schema): void
|
||||
{
|
||||
$this->addSql(<<<'SQL'
|
||||
CREATE TABLE qualimat_carrier (
|
||||
id BIGINT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
|
||||
siret VARCHAR(20) NOT NULL,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
address VARCHAR(255) DEFAULT NULL,
|
||||
postal_code VARCHAR(10) DEFAULT NULL,
|
||||
city VARCHAR(255) DEFAULT NULL,
|
||||
phone VARCHAR(32) DEFAULT NULL,
|
||||
department VARCHAR(64) DEFAULT NULL,
|
||||
status VARCHAR(32) NOT NULL,
|
||||
validity_date DATE DEFAULT NULL,
|
||||
is_active BOOLEAN DEFAULT TRUE NOT NULL,
|
||||
last_synced_at TIMESTAMP(6) WITHOUT TIME ZONE NOT NULL,
|
||||
PRIMARY KEY (id),
|
||||
CONSTRAINT uq_qualimat_carrier_siret UNIQUE (siret)
|
||||
)
|
||||
SQL);
|
||||
$this->addSql('CREATE INDEX idx_qualimat_carrier_active ON qualimat_carrier (is_active)');
|
||||
|
||||
$this->comment('qualimat_carrier', '_table', "Referentiel des transporteurs agrees QUALIMAT, synchronise quotidiennement depuis l'API qualimat.org (type=operateur_transport).");
|
||||
$this->comment('qualimat_carrier', 'id', 'Cle technique auto-incrementee.');
|
||||
$this->comment('qualimat_carrier', 'siret', 'SIRET normalise (chiffres sans espaces). Cle naturelle de synchro (unique). Source parfois incomplete (longueur variable), non contrainte a 14.');
|
||||
$this->comment('qualimat_carrier', 'name', 'Raison sociale du transporteur (champs Nom = Societe de la source, identiques).');
|
||||
$this->comment('qualimat_carrier', 'address', 'Adresse postale (voie). Nullable.');
|
||||
$this->comment('qualimat_carrier', 'postal_code', 'Code postal. Nullable.');
|
||||
$this->comment('qualimat_carrier', 'city', 'Ville. Nullable.');
|
||||
$this->comment('qualimat_carrier', 'phone', 'Telephone au format source "indicatif|numero" (ex: +33|0608890316). Nullable.');
|
||||
$this->comment('qualimat_carrier', 'department', 'Departement au format source "code - libelle" (ex: 65 - Hautes-Pyrenees). Nullable.');
|
||||
$this->comment('qualimat_carrier', 'status', "Statut d'agrement QUALIMAT (valeurs connues : Audite, Valide, Suspendu). Valeur brute de la source, non contrainte.");
|
||||
$this->comment('qualimat_carrier', 'validity_date', 'Date de fin de validite de la certification (convertie depuis dd/mm/yyyy). Nullable.');
|
||||
$this->comment('qualimat_carrier', 'is_active', 'Faux = transporteur absent du dernier import (soft-delete). Toute ligne non revue par le dernier run passe a FALSE.');
|
||||
$this->comment('qualimat_carrier', 'last_synced_at', 'Horodatage du run de synchro ayant vu cette ligne en dernier (soft-delete : last_synced_at < run courant).');
|
||||
|
||||
$this->addSql(<<<'SQL'
|
||||
CREATE TABLE qualimat_sync_log (
|
||||
id BIGINT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
|
||||
fetched_at TIMESTAMP(6) WITHOUT TIME ZONE NOT NULL,
|
||||
rows_total INT NOT NULL,
|
||||
rows_upserted INT NOT NULL,
|
||||
rows_skipped INT NOT NULL,
|
||||
rows_deactivated INT NOT NULL,
|
||||
created_at TIMESTAMP(6) WITHOUT TIME ZONE DEFAULT NOW() NOT NULL,
|
||||
PRIMARY KEY (id)
|
||||
)
|
||||
SQL);
|
||||
|
||||
$this->comment('qualimat_sync_log', '_table', 'Journal des synchronisations QUALIMAT (une ligne par run de la commande app:qualimat:sync).');
|
||||
$this->comment('qualimat_sync_log', 'id', 'Cle technique auto-incrementee.');
|
||||
$this->comment('qualimat_sync_log', 'fetched_at', "Horodatage de l'appel a l'API source (= run de synchro).");
|
||||
$this->comment('qualimat_sync_log', 'rows_total', "Nombre d'items renvoyes par l'API.");
|
||||
$this->comment('qualimat_sync_log', 'rows_upserted', 'Nombre de transporteurs inseres ou mis a jour.');
|
||||
$this->comment('qualimat_sync_log', 'rows_skipped', "Nombre d'items ignores (sans SIRET exploitable).");
|
||||
$this->comment('qualimat_sync_log', 'rows_deactivated', 'Nombre de transporteurs passes a is_active=false (absents de cet import).');
|
||||
$this->comment('qualimat_sync_log', 'created_at', 'Horodatage de fin du run (insertion du journal).');
|
||||
}
|
||||
|
||||
public function down(Schema $schema): void
|
||||
{
|
||||
$this->addSql('DROP TABLE IF EXISTS qualimat_sync_log');
|
||||
$this->addSql('DROP TABLE IF EXISTS qualimat_carrier');
|
||||
}
|
||||
|
||||
/**
|
||||
* Pose un COMMENT ON TABLE/COLUMN en dollar-quoting Postgres ($_$...$_$)
|
||||
* pour eviter tout echappement d'apostrophes dans les descriptions.
|
||||
*/
|
||||
private function comment(string $table, string $column, string $description): void
|
||||
{
|
||||
$quotedTable = '"'.str_replace('"', '""', $table).'"';
|
||||
|
||||
if ('_table' === $column) {
|
||||
$this->addSql(sprintf('COMMENT ON TABLE %s IS $_$%s$_$', $quotedTable, $description));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$this->addSql(sprintf(
|
||||
'COMMENT ON COLUMN %s.%s IS $_$%s$_$',
|
||||
$quotedTable,
|
||||
'"'.str_replace('"', '""', $column).'"',
|
||||
$description,
|
||||
));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user