addOption('file', null, InputOption::VALUE_REQUIRED, "Chemin d'un JSON local (court-circuite l'appel HTTP, utile pour tests/rejeu).") ->addOption('ppp', null, InputOption::VALUE_REQUIRED, "Taille de page demandee a l'API.", (string) self::DEFAULT_PPP) ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Analyse sans ecriture en base.') ; } protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); $ppp = max(1, (int) $input->getOption('ppp')); $dryRun = (bool) $input->getOption('dry-run'); $file = $input->getOption('file'); // 1. Recuperation des items (fichier local ou API). try { $items = null !== $file ? $this->readLocal((string) $file) : $this->fetchRemote($ppp); } catch (Throwable $e) { $io->error('Recuperation impossible : '.$e->getMessage()); return Command::FAILURE; } $total = count($items); $io->section(sprintf('QUALIMAT — %d items recus', $total)); // Garde-fou troncature : un retour egal a ppp signale un dataset coupe. if (null === $file && $total === $ppp) { $io->warning(sprintf("Le nombre d'items recus (%d) egale --ppp : resultat potentiellement tronque, augmente --ppp.", $ppp)); } // 2. Mapping / normalisation (les items sans SIRET sont ignores). ['rows' => $rows, 'skipped' => $skipped] = QualimatRowMapper::mapMany($items); $io->writeln(sprintf('%d lignes exploitables, %d ignorees (sans SIRET).', count($rows), $skipped)); if ($dryRun) { $this->renderPreview($io, $rows); $io->note(sprintf('Dry-run : aucune ecriture. (%d lignes au total)', count($rows))); return Command::SUCCESS; } // 3. Sync transactionnelle : upsert -> soft-delete -> journal. $run = new DateTimeImmutable()->format('Y-m-d H:i:s.u'); $this->connection->beginTransaction(); try { $upserted = $this->upsertAll($rows, $run); $deactivated = $this->deactivateMissing($run); $this->log($run, $total, $upserted, $skipped, $deactivated); $this->connection->commit(); } catch (Throwable $e) { $this->connection->rollBack(); $io->error('Sync annulee (rollback) : '.$e->getMessage()); return Command::FAILURE; } $io->success(sprintf('%d upsert, %d ignore(s), %d desactive(s).', $upserted, $skipped, $deactivated)); return Command::SUCCESS; } /** * Rejoue l'appel GET de l'API QUALIMAT et retourne le tableau d'items. * * @return array> */ private function fetchRemote(int $ppp): array { $response = $this->httpClient->request('GET', self::API_URL, [ 'query' => ['type' => 'operateur_transport', 'ppp' => $ppp], 'timeout' => 60, ]); // toArray() leve une exception sur un statut non-2xx ou un corps non-JSON. $data = $response->toArray(); return array_is_list($data) ? $data : []; } /** * Lit un export JSON local (tableau d'objets). * * @return array> */ private function readLocal(string $path): array { $raw = @file_get_contents($path); if (false === $raw) { throw new RuntimeException(sprintf('Fichier illisible : %s', $path)); } $data = json_decode($raw, true, 512, JSON_THROW_ON_ERROR); if (!is_array($data) || !array_is_list($data)) { throw new RuntimeException("Le JSON doit etre un tableau d'objets."); } return $data; } /** * Upsert de toutes les lignes valides (cle naturelle = siret). Marque * is_active=TRUE et tamponne last_synced_at avec le run courant. * * @param list> $rows */ private function upsertAll(array $rows, string $run): int { $sql = <<<'SQL' INSERT INTO qualimat_carrier (siret, name, address, postal_code, city, phone, department, status, validity_date, is_active, last_synced_at) VALUES (:siret, :name, :address, :postal_code, :city, :phone, :department, :status, :validity_date, TRUE, :run) ON CONFLICT (siret) DO UPDATE SET name = EXCLUDED.name, address = EXCLUDED.address, postal_code = EXCLUDED.postal_code, city = EXCLUDED.city, phone = EXCLUDED.phone, department = EXCLUDED.department, status = EXCLUDED.status, validity_date = EXCLUDED.validity_date, is_active = TRUE, last_synced_at = EXCLUDED.last_synced_at SQL; $count = 0; foreach ($rows as $r) { $this->connection->executeStatement($sql, [ 'siret' => $r['siret'], 'name' => $r['name'], 'address' => $r['address'], 'postal_code' => $r['postal_code'], 'city' => $r['city'], 'phone' => $r['phone'], 'department' => $r['department'], 'status' => $r['status'], 'validity_date' => $r['validity_date'], 'run' => $run, ]); ++$count; } return $count; } /** * Soft-delete : toute ligne active non revue par ce run (tampon anterieur) * passe a is_active=false. */ private function deactivateMissing(string $run): int { return (int) $this->connection->executeStatement( 'UPDATE qualimat_carrier SET is_active = FALSE WHERE is_active = TRUE AND last_synced_at < :run', ['run' => $run], ); } private function log(string $run, int $total, int $upserted, int $skipped, int $deactivated): void { $this->connection->executeStatement( <<<'SQL' INSERT INTO qualimat_sync_log (fetched_at, rows_total, rows_upserted, rows_skipped, rows_deactivated) VALUES (:run, :total, :upserted, :skipped, :deactivated) SQL, [ 'run' => $run, 'total' => $total, 'upserted' => $upserted, 'skipped' => $skipped, 'deactivated' => $deactivated, ], ); } /** * @param list> $rows */ private function renderPreview(SymfonyStyle $io, array $rows): void { $io->table( ['SIRET', 'Nom', 'CP', 'Ville', 'Statut', 'Validite'], array_map(static fn (array $r): array => [ (string) $r['siret'], mb_strimwidth((string) $r['name'], 0, 40, '…'), (string) ($r['postal_code'] ?? ''), mb_strimwidth((string) ($r['city'] ?? ''), 0, 25, '…'), (string) $r['status'], (string) ($r['validity_date'] ?? ''), ], array_slice($rows, 0, 15)), ); } }