addOption('file', null, InputOption::VALUE_REQUIRED, "Chemin d'un JSON local (court-circuite l'appel HTTP, utile pour tests/rejeu).") ->addOption('ppp', null, InputOption::VALUE_REQUIRED, "Taille de page demandee a l'API.", (string) self::DEFAULT_PPP) ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Analyse sans ecriture en base.') ; } protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); $ppp = max(1, (int) $input->getOption('ppp')); $dryRun = (bool) $input->getOption('dry-run'); $file = $input->getOption('file'); // Verrou consultatif (session) : empeche deux runs de se chevaucher // (cron qui deborde, invocation manuelle parallele). Sans lui, le run le // plus tardif desactiverait les lignes que l'autre vient d'inserer. if (!$this->acquireLock()) { $io->error('Une synchronisation QUALIMAT est deja en cours (verrou non disponible).'); return Command::FAILURE; } try { return $this->doSync($io, $ppp, $dryRun, $file); } finally { $this->releaseLock(); } } /** * Coeur de la synchronisation, execute sous verrou consultatif. */ private function doSync(SymfonyStyle $io, int $ppp, bool $dryRun, ?string $file): int { // 1. Recuperation des items (fichier local ou API). try { $items = null !== $file ? $this->readLocal($file) : $this->fetchRemote($ppp); } catch (Throwable $e) { $io->error('Recuperation impossible : '.$e->getMessage()); return Command::FAILURE; } $total = count($items); $io->section(sprintf('QUALIMAT — %d items recus', $total)); // Garde-fou troncature : un retour egal a ppp signale un dataset coupe. if (null === $file && $total === $ppp) { $io->warning(sprintf("Le nombre d'items recus (%d) egale --ppp : resultat potentiellement tronque, augmente --ppp.", $ppp)); } // 2. Mapping / normalisation (les items sans SIRET sont ignores, les // doublons de SIRET sont fusionnes : derniere occurrence gagnante). ['rows' => $rows, 'skipped' => $skipped] = QualimatRowMapper::mapMany($items); $io->writeln(sprintf('%d lignes exploitables, %d ignorees (sans SIRET).', count($rows), $skipped)); if ($dryRun) { $this->renderPreview($io, $rows); $io->note(sprintf('Dry-run : aucune ecriture. (%d lignes au total)', count($rows))); return Command::SUCCESS; } // Garde-fou « zero ligne » : une source vide (incident amont, liste [] // legitime) ne doit JAMAIS atteindre le soft-delete, qui desactiverait // tout le referentiel. On abandonne sans rien ecrire. if ([] === $rows) { $io->error('Aucune ligne exploitable : synchronisation abandonnee (desactivation de masse evitee).'); return Command::FAILURE; } // 3. Sync transactionnelle : upsert -> soft-delete -> journal. $run = new DateTimeImmutable()->format('Y-m-d H:i:s.u'); $this->connection->beginTransaction(); try { $upserted = $this->upsertAll($rows, $run); $deactivated = $this->deactivateMissing($run); $this->log($run, $total, $upserted, $skipped, $deactivated); $this->connection->commit(); } catch (Throwable $e) { $this->connection->rollBack(); $io->error('Sync annulee (rollback) : '.$e->getMessage()); return Command::FAILURE; } $io->success(sprintf('%d upsert, %d ignore(s), %d desactive(s).', $upserted, $skipped, $deactivated)); return Command::SUCCESS; } /** * Tente de prendre le verrou consultatif de session. Retourne false si un * autre run le detient deja (Postgres `pg_try_advisory_lock`, non bloquant). */ private function acquireLock(): bool { return (bool) $this->connection->fetchOne('SELECT pg_try_advisory_lock(:key)', ['key' => self::ADVISORY_LOCK_KEY]); } /** * Relache le verrou consultatif pris par acquireLock(). */ private function releaseLock(): void { $this->connection->executeStatement('SELECT pg_advisory_unlock(:key)', ['key' => self::ADVISORY_LOCK_KEY]); } /** * Rejoue l'appel GET de l'API QUALIMAT et retourne le tableau d'items. * * @return array> */ private function fetchRemote(int $ppp): array { $response = $this->httpClient->request('GET', self::API_URL, [ 'query' => ['type' => 'operateur_transport', 'ppp' => $ppp], 'timeout' => 60, ]); // toArray() leve une exception sur un statut non-2xx ou un corps non-JSON. $data = $response->toArray(); // Un 2xx au corps inattendu (objet d'erreur, enveloppe {"data":[...]}, etc.) // ne doit PAS etre interprete comme « 0 transporteur » : ce serait masquer // un changement de contrat de l'API et declencher la desactivation de masse // (cf. garde-fou « zero ligne » dans execute()). On echoue franchement. if (!array_is_list($data)) { throw new RuntimeException("Reponse inattendue de l'API QUALIMAT : un tableau d'items etait attendu."); } return $data; } /** * Lit un export JSON local (tableau d'objets). * * @return array> */ private function readLocal(string $path): array { $raw = @file_get_contents($path); if (false === $raw) { throw new RuntimeException(sprintf('Fichier illisible : %s', $path)); } $data = json_decode($raw, true, 512, JSON_THROW_ON_ERROR); if (!is_array($data) || !array_is_list($data)) { throw new RuntimeException("Le JSON doit etre un tableau d'objets."); } return $data; } /** * Upsert de toutes les lignes valides (cle naturelle = siret) par paquets * (INSERT groupe), au lieu d'un aller-retour par ligne. Marque is_active=TRUE * et tamponne last_synced_at avec le run courant. Les lignes etant deja * dedoublonnees par SIRET en amont, le compte retourne = transporteurs * distincts effectivement synchronises. * * @param list> $rows */ private function upsertAll(array $rows, string $run): int { $count = 0; foreach (array_chunk($rows, self::UPSERT_CHUNK) as $chunk) { $placeholders = []; $params = []; foreach ($chunk as $r) { // 10 valeurs liees + is_active force a TRUE (litteral). $placeholders[] = '(?, ?, ?, ?, ?, ?, ?, ?, ?, TRUE, ?)'; $params[] = $r['siret']; $params[] = $r['name']; $params[] = $r['address']; $params[] = $r['postal_code']; $params[] = $r['city']; $params[] = $r['phone']; $params[] = $r['department']; $params[] = $r['status']; $params[] = $r['validity_date']; $params[] = $run; } $sql = sprintf( <<<'SQL' INSERT INTO qualimat_carrier (siret, name, address, postal_code, city, phone, department, status, validity_date, is_active, last_synced_at) VALUES %s ON CONFLICT (siret) DO UPDATE SET name = EXCLUDED.name, address = EXCLUDED.address, postal_code = EXCLUDED.postal_code, city = EXCLUDED.city, phone = EXCLUDED.phone, department = EXCLUDED.department, status = EXCLUDED.status, validity_date = EXCLUDED.validity_date, is_active = TRUE, last_synced_at = EXCLUDED.last_synced_at SQL, implode(",\n ", $placeholders), ); $this->connection->executeStatement($sql, $params); $count += count($chunk); } return $count; } /** * Soft-delete : toute ligne active non revue par ce run (tampon anterieur) * passe a is_active=false. */ private function deactivateMissing(string $run): int { return (int) $this->connection->executeStatement( 'UPDATE qualimat_carrier SET is_active = FALSE WHERE is_active = TRUE AND last_synced_at < :run', ['run' => $run], ); } private function log(string $run, int $total, int $upserted, int $skipped, int $deactivated): void { $this->connection->executeStatement( <<<'SQL' INSERT INTO qualimat_sync_log (fetched_at, rows_total, rows_upserted, rows_skipped, rows_deactivated) VALUES (:run, :total, :upserted, :skipped, :deactivated) SQL, [ 'run' => $run, 'total' => $total, 'upserted' => $upserted, 'skipped' => $skipped, 'deactivated' => $deactivated, ], ); } /** * @param list> $rows */ private function renderPreview(SymfonyStyle $io, array $rows): void { $io->table( ['SIRET', 'Nom', 'CP', 'Ville', 'Statut', 'Validite'], array_map(static fn (array $r): array => [ (string) $r['siret'], mb_strimwidth((string) $r['name'], 0, 40, '…'), (string) ($r['postal_code'] ?? ''), mb_strimwidth((string) ($r['city'] ?? ''), 0, 25, '…'), (string) $r['status'], (string) ($r['validity_date'] ?? ''), ], array_slice($rows, 0, 15)), ); } }