> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-mintlify-8c05c8a2.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Migration de données d’Elastic vers ClickStack

> Migration de données d’Elastic vers ClickHouse Observability Stack

<div id="parallel-operation-strategy">
  ## Stratégie de fonctionnement en parallèle
</div>

Lors d'une migration d'Elastic vers ClickStack pour des cas d'usage d'observabilité, nous recommandons une approche de **fonctionnement en parallèle** plutôt que de tenter de migrer les données historiques. Cette stratégie présente plusieurs avantages :

1. **Risque minimal** : en faisant fonctionner les deux systèmes en parallèle, vous conservez l'accès aux données et aux tableaux de bord existants tout en validant ClickStack et en familiarisant vos utilisateurs avec le nouveau système.
2. **Expiration naturelle des données** : la plupart des données d'observabilité ont une durée de rétention limitée (généralement 30 jours ou moins), ce qui permet une transition naturelle à mesure que les données expirent dans Elastic.
3. **Migration simplifiée** : pas besoin d'outils ni de processus complexes pour transférer les données historiques d'un système à l'autre.

<br />

<Info>
  **Migration des données**

  Nous présentons, dans la section ["Migration des données"](#migrating-data), une approche pour migrer les données essentielles d'Elasticsearch vers ClickHouse. Cette approche ne doit pas être utilisée pour des jeux de données plus volumineux, car elle est rarement performante : elle est limitée par la capacité d'Elasticsearch à exporter efficacement, et seul le format JSON est pris en charge.
</Info>

<div id="implementation-steps">
  ### Étapes de mise en œuvre
</div>

<Steps>
  <Step>
    #### Configurer la double ingestion

    Configurez votre pipeline de collecte de données pour envoyer simultanément les données vers Elastic et ClickStack.

    La façon de procéder dépend de vos agents de collecte actuels ; consultez ["Migrating Agents"](/fr/clickstack/migration/elastic/migrating-agents).
  </Step>

  <Step>
    #### Ajuster les périodes de rétention

    Configurez les paramètres TTL d’Elastic pour qu’ils correspondent à la période de rétention souhaitée. Configurez le [TTL](/fr/clickstack/managing/production#configure-ttl) de ClickStack pour conserver les données pendant la même durée.
  </Step>

  <Step>
    #### Valider et comparer

    * Exécutez des requêtes sur les deux systèmes pour garantir la cohérence des données
    * Comparez les performances des requêtes et les résultats
    * Migrez les tableaux de bord et les alertes vers ClickStack. Il s’agit actuellement d’un processus manuel.
    * Vérifiez que tous les tableaux de bord et alertes critiques fonctionnent comme prévu dans ClickStack
  </Step>

  <Step>
    #### Transition progressive

    * À mesure que les données expirent naturellement dans Elastic, vous vous appuierez de plus en plus sur ClickStack
    * Une fois que vous avez confiance dans ClickStack, vous pouvez commencer à rediriger les requêtes et les tableaux de bord
  </Step>
</Steps>

<div id="long-term-retention">
  ### Rétention à long terme
</div>

Pour les organisations ayant besoin de durées de rétention plus longues :

* Continuez à faire fonctionner les deux systèmes en parallèle jusqu’à ce que toutes les données d’Elastic aient expiré
* Les fonctionnalités de [stockage hiérarchisé](/fr/reference/engines/table-engines/mergetree-family/mergetree#table_engine-mergetree-multiple-volumes) de ClickStack peuvent vous aider à gérer efficacement les données sur le long terme.
* Envisagez d’utiliser des [vues matérialisées](/fr/concepts/features/materialized-views/incremental-materialized-view) pour conserver des données historiques agrégées ou filtrées, tout en laissant expirer les données brutes.

<div id="migration-timeline">
  ### Calendrier de la migration
</div>

Le calendrier de la migration dépendra de vos exigences en matière de rétention des données :

* **Rétention de 30 jours** : la migration peut être achevée en un mois.
* **Rétention plus longue** : continuez à faire fonctionner les deux systèmes en parallèle jusqu’à ce que les données expirent dans Elastic.
* **Données historiques** : si cela est absolument nécessaire, envisagez d’utiliser [Migration des données](#migrating-data) pour importer des données historiques ciblées.

<div id="migration-settings">
  ## Migration des paramètres
</div>

Lors d’une migration d’Elastic vers ClickStack, vos paramètres d’indexation et de stockage devront être adaptés à l’architecture de ClickHouse. Alors qu’Elasticsearch s’appuie sur la mise à l’échelle horizontale et le sharding pour garantir les performances et la tolérance aux pannes, et comporte donc plusieurs segments par défaut, ClickHouse est optimisé pour la mise à l’échelle verticale et offre généralement de meilleures performances avec un nombre plus réduit de segments.

<div id="recommended-settings">
  ### Paramètres recommandés
</div>

Nous recommandons de commencer avec un **seul shard** et de privilégier une mise à l’échelle verticale. Cette configuration convient à la plupart des charges de travail d’observabilité et simplifie à la fois l’administration et l’optimisation des performances des requêtes.

* **[ClickHouse Cloud](https://clickhouse.com/cloud)** : utilise par défaut une architecture à shard unique avec plusieurs répliques. Le stockage et le compute évoluent indépendamment, ce qui en fait une solution idéale pour les cas d’usage d’observabilité avec des profils d’ingestion imprévisibles et des charges de travail à forte lecture.
* **ClickHouse OSS** : dans les déploiements autogérés, nous recommandons :
  * de commencer avec un seul shard
  * de privilégier une mise à l’échelle verticale avec davantage de CPU et de RAM
  * d’utiliser le [stockage hiérarchisé](/fr/guides/use-cases/observability/build-your-own/managing-data#storage-tiers) pour étendre le disque local avec un stockage d’objets compatible S3
  * d’utiliser [`ReplicatedMergeTree`](/fr/reference/engines/table-engines/mergetree-family/replication) si une haute disponibilité est requise
  * pour la tolérance aux pannes, [1 réplique de votre shard](/fr/reference/engines/table-engines/mergetree-family/replication) est généralement suffisante pour les charges de travail d’observabilité.

<div id="when-to-shard">
  ### Quand recourir au sharding
</div>

Le sharding peut être nécessaire si :

* Votre taux d’ingestion dépasse la capacité d’un seul nœud (généralement >500K lignes/s)
* Vous avez besoin d’une isolation des tenants ou d’une séparation des données par région
* Votre jeu de données total est trop volumineux pour un seul serveur, même avec le stockage objet

Si vous devez effectivement mettre en place du sharding, consultez [Mise à l’échelle horizontale](/fr/guides/oss/deployment-and-scaling/examples/2-shards-1-replica) pour obtenir des conseils sur les clés de sharding et la configuration des tables distribuées.

<div id="retention-and-ttl">
  ### Rétention et TTL
</div>

ClickHouse utilise des [clauses TTL](/fr/clickstack/managing/production#configure-ttl) sur les tables MergeTree pour gérer l’expiration des données. Les règles TTL peuvent :

* Supprimer automatiquement les données expirées
* Déplacer les données plus anciennes vers un stockage objet à froid
* Conserver uniquement les logs récents, fréquemment consultés, sur un disque rapide

Nous recommandons d’aligner votre configuration TTL de ClickHouse sur vos politiques de rétention Elastic existantes afin de maintenir un cycle de vie des données cohérent pendant la migration. Pour des exemples, consultez la [configuration TTL de production de ClickStack](/fr/clickstack/managing/production#configure-ttl).

<div id="migrating-data">
  ## Migration des données
</div>

Bien que nous recommandions, pour la plupart des données d’observabilité, un fonctionnement en parallèle, il existe certains cas où une migration directe des données d’Elasticsearch vers ClickHouse peut s’avérer nécessaire :

* De petites tables de correspondance utilisées pour l’enrichissement des données (par ex., des correspondances utilisateur, des catalogues de services)
* Des données métier stockées dans Elasticsearch qui doivent être corrélées avec les données d’observabilité, car les capacités SQL de ClickHouse et ses intégrations de business intelligence facilitent davantage la maintenance et l’interrogation des données que les options de requête plus limitées d’Elasticsearch.
* Des données de configuration qui doivent être conservées pendant la migration

Cette approche n’est viable que pour des jeux de données de moins de 10 millions de lignes, car les capacités d’export d’Elasticsearch se limitent au JSON sur HTTP et ne passent pas bien à l’échelle pour des jeux de données plus volumineux.

Les étapes suivantes permettent de migrer un seul index Elasticsearch à partir de ClickHouse.

<Steps>
  <Step>
    ### Migrer le schéma

    Créez une table dans ClickHouse pour l'index en cours de migration depuis Elasticsearch. Vous pouvez associer les [types Elasticsearch à leur équivalent ClickHouse](/fr/clickstack/migration/elastic/types). Vous pouvez également vous appuyer sur le type de données JSON dans ClickHouse, qui créera dynamiquement des colonnes du type approprié au fur et à mesure de l'insertion des données.

    Prenez le mapping Elasticsearch suivant pour un index contenant des données `syslog` :

    <Accordion title="Mapping Elasticsearch">
      ```javascripton theme={null}
      GET .ds-logs-system.syslog-default-2025.06.03-000001/_mapping
      {
        ".ds-logs-system.syslog-default-2025.06.03-000001": {
          "mappings": {
            "_meta": {
              "managed_by": "fleet",
              "managed": true,
              "package": {
                "name": "system"
              }
            },
            "_data_stream_timestamp": {
              "enabled": true
            },
            "dynamic_templates": [],
            "date_detection": false,
            "properties": {
              "@timestamp": {
                "type": "date",
                "ignore_malformed": false
              },
              "agent": {
                "properties": {
                  "ephemeral_id": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "id": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "name": {
                    "type": "keyword",
                    "fields": {
                      "text": {
                        "type": "match_only_text"
                      }
                    }
                  },
                  "type": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "version": {
                    "type": "keyword",
                    "ignore_above": 1024
                  }
                }
              },
              "cloud": {
                "properties": {
                  "account": {
                    "properties": {
                      "id": {
                        "type": "keyword",
                        "ignore_above": 1024
                      }
                    }
                  },
                  "availability_zone": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "image": {
                    "properties": {
                      "id": {
                        "type": "keyword",
                        "ignore_above": 1024
                      }
                    }
                  },
                  "instance": {
                    "properties": {
                      "id": {
                        "type": "keyword",
                        "ignore_above": 1024
                      }
                    }
                  },
                  "machine": {
                    "properties": {
                      "type": {
                        "type": "keyword",
                        "ignore_above": 1024
                      }
                    }
                  },
                  "provider": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "region": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "service": {
                    "properties": {
                      "name": {
                        "type": "keyword",
                        "fields": {
                          "text": {
                            "type": "match_only_text"
                          }
                        }
                      }
                    }
                  }
                }
              },
              "data_stream": {
                "properties": {
                  "dataset": {
                    "type": "constant_keyword",
                    "value": "system.syslog"
                  },
                  "namespace": {
                    "type": "constant_keyword",
                    "value": "default"
                  },
                  "type": {
                    "type": "constant_keyword",
                    "value": "logs"
                  }
                }
              },
              "ecs": {
                "properties": {
                  "version": {
                    "type": "keyword",
                    "ignore_above": 1024
                  }
                }
              },
              "elastic_agent": {
                "properties": {
                  "id": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "snapshot": {
                    "type": "boolean"
                  },
                  "version": {
                    "type": "keyword",
                    "ignore_above": 1024
                  }
                }
              },
              "event": {
                "properties": {
                  "agent_id_status": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "dataset": {
                    "type": "constant_keyword",
                    "value": "system.syslog"
                  },
                  "ingested": {
                    "type": "date",
                    "format": "strict_date_time_no_millis||strict_date_optional_time||epoch_millis",
                    "ignore_malformed": false
                  },
                  "module": {
                    "type": "constant_keyword",
                    "value": "system"
                  },
                  "timezone": {
                    "type": "keyword",
                    "ignore_above": 1024
                  }
                }
              },
              "host": {
                "properties": {
                  "architecture": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "containerized": {
                    "type": "boolean"
                  },
                  "hostname": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "id": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "ip": {
                    "type": "ip"
                  },
                  "mac": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "name": {
                    "type": "keyword",
                    "ignore_above": 1024
                  },
                  "os": {
                    "properties": {
                      "build": {
                        "type": "keyword",
                        "ignore_above": 1024
                      },
                      "codename": {
                        "type": "keyword",
                        "ignore_above": 1024
                      },
                      "family": {
                        "type": "keyword",
                        "ignore_above": 1024
                      },
                      "kernel": {
                        "type": "keyword",
                        "ignore_above": 1024
                      },
                      "name": {
                        "type": "keyword",
                        "fields": {
                          "text": {
                            "type": "match_only_text"
                          }
                        }
                      },
                      "platform": {
                        "type": "keyword",
                        "ignore_above": 1024
                      },
                      "type": {
                        "type": "keyword",
                        "ignore_above": 1024
                      },
                      "version": {
                        "type": "keyword",
                        "ignore_above": 1024
                      }
                    }
                  }
                }
              },
              "input": {
                "properties": {
                  "type": {
                    "type": "keyword",
                    "ignore_above": 1024
                  }
                }
              },
              "log": {
                "properties": {
                  "file": {
                    "properties": {
                      "path": {
                        "type": "keyword",
                        "fields": {
                          "text": {
                            "type": "match_only_text"
                          }
                        }
                      }
                    }
                  },
                  "offset": {
                    "type": "long"
                  }
                }
              },
              "message": {
                "type": "match_only_text"
              },
              "process": {
                "properties": {
                  "name": {
                    "type": "keyword",
                    "fields": {
                      "text": {
                        "type": "match_only_text"
                      }
                    }
                  },
                  "pid": {
                    "type": "long"
                  }
                }
              },
              "system": {
                "properties": {
                  "syslog": {
                    "type": "object"
                  }
                }
              }
            }
          }
        }
      }
      ```
    </Accordion>

    Le schéma de table ClickHouse équivalent :

    <Accordion title="Schéma ClickHouse">
      ```sql theme={null}
      SET enable_json_type = 1;

      CREATE TABLE logs_system_syslog
      (
          `@timestamp` DateTime,
          `agent` Tuple(
              ephemeral_id String,
              id String,
              name String,
              type String,
              version String),
          `cloud` Tuple(
              account Tuple(
                  id String),
              availability_zone String,
              image Tuple(
                  id String),
              instance Tuple(
                  id String),
              machine Tuple(
                  type String),
              provider String,
              region String,
              service Tuple(
                  name String)),
          `data_stream` Tuple(
              dataset String,
              namespace String,
              type String),
          `ecs` Tuple(
              version String),
          `elastic_agent` Tuple(
              id String,
              snapshot UInt8,
              version String),
          `event` Tuple(
              agent_id_status String,
              dataset String,
              ingested DateTime,
              module String,
              timezone String),
          `host` Tuple(
              architecture String,
              containerized UInt8,
              hostname String,
              id String,
              ip Array(Variant(IPv4, IPv6)),
              mac Array(String),
              name String,
              os Tuple(
                  build String,
                  codename String,
                  family String,
                  kernel String,
                  name String,
                  platform String,
                  type String,
                  version String)),
          `input` Tuple(
              type String),
          `log` Tuple(
              file Tuple(
                  path String),
              offset Int64),
          `message` String,
          `process` Tuple(
              name String,
              pid Int64),
          `system` Tuple(
              syslog JSON)
      )
      ENGINE = MergeTree
      ORDER BY (`host.name`, `@timestamp`)
      ```
    </Accordion>

    Notez que :

    * Les tuples sont utilisés pour représenter des structures imbriquées plutôt que par notation pointée
    * Utilisez les types ClickHouse appropriés selon le mapping :
      * `keyword` → `String`
      * `date` → `DateTime`
      * `boolean` → `UInt8`
      * `long` → `Int64`
      * `ip` → `Array(Variant(IPv4, IPv6))`. Nous utilisons ici un [`Variant(IPv4, IPv6)`](/fr/reference/data-types/variant), car ce champ contient à la fois des adresses [`IPv4`](/fr/reference/data-types/ipv4) et [`IPv6`](/fr/reference/data-types/ipv6).
      * `object` → `JSON` pour l'objet syslog, dont la structure est imprévisible.
    * Les colonnes `host.ip` et `host.mac` sont explicitement de type `Array`, contrairement à Elasticsearch où tous les types sont des tableaux.
    * Une clause `ORDER BY` est ajoutée à partir du timestamp et du nom d’hôte pour optimiser les requêtes basées sur le temps
    * `MergeTree`, particulièrement adapté aux données de logs, est utilisé comme moteur

    **Cette approche, qui consiste à définir le schéma de manière statique et à utiliser le type JSON de façon sélective là où cela s'avère nécessaire, [est recommandée](/fr/guides/clickhouse/data-formats/json/schema#handling-semi-structured-dynamic-structures).**

    Ce schéma strict présente plusieurs avantages :

    * **Validation des données** – l’application d’un schéma strict évite le risque d’explosion du nombre de colonnes, en dehors de structures spécifiques.
    * **Évite le risque d'explosion du nombre de colonnes** : bien que le type JSON puisse évoluer jusqu’à potentiellement des milliers de colonnes, les sous-colonnes étant stockées comme des colonnes dédiées, cela peut entraîner une explosion du nombre de fichiers de colonnes, avec la création d’un nombre excessif de fichiers, ce qui affecte les performances. Pour atténuer ce problème, le [type Dynamic](/fr/reference/data-types/dynamic) sous-jacent utilisé par JSON propose un paramètre [`max_dynamic_paths`](/fr/reference/data-types/newjson#reading-json-paths-as-sub-columns), qui limite le nombre de chemins uniques stockés dans des fichiers de colonnes distincts. Une fois ce seuil atteint, les chemins supplémentaires sont stockés dans un fichier de colonnes partagé à l’aide d’un format compact encodé, ce qui préserve les performances et l’efficacité du stockage tout en prenant en charge une ingestion flexible des données. L’accès à ce fichier de colonnes partagé est toutefois moins performant. Notez cependant que la colonne JSON peut être utilisée avec des [indices de type](/fr/guides/clickhouse/data-formats/json/schema#using-type-hints-and-skipping-paths). Les colonnes « annotées » offriront les mêmes performances que les colonnes dédiées.
    * **Introspection plus simple des chemins et des types** : bien que le type JSON prenne en charge des [fonctions d'introspection](/fr/reference/data-types/newjson#introspection-functions) pour déterminer les types et les chemins qui ont été inférés, les structures statiques peuvent être plus simples à explorer, par exemple avec `DESCRIBE`.

    <br />

    Vous pouvez également créer simplement une table avec une seule colonne `JSON`.

    ```sql theme={null}
    SET enable_json_type = 1;

    CREATE TABLE syslog_json
    (
     `json` JSON(`host.name` String, `@timestamp` DateTime)
    )
    ENGINE = MergeTree
    ORDER BY (`json.host.name`, `json.@timestamp`)
    ```

    <Note>
      Nous fournissons une indication de type pour les colonnes `host.name` et `timestamp` dans la définition JSON, car nous les utilisons dans la clé de tri/la clé primaire. Cela permet à ClickHouse de savoir que ces colonnes ne seront pas nulles et de déterminer quelles sous-colonnes utiliser (il peut y en avoir plusieurs pour chaque type, ce qui serait sinon ambigu).
    </Note>

    Cette dernière approche, bien que plus simple, convient davantage au prototypage et aux tâches d'ingénierie des données. En production, réservez `JSON` aux sous-structures dynamiques lorsque cela s'avère nécessaire.

    Pour plus de détails sur l'utilisation du type JSON dans les schémas et sur la façon de l'appliquer efficacement, nous vous recommandons le guide ["Designing your schema"](/fr/guides/clickhouse/data-formats/json/schema).
  </Step>

  <Step>
    ### Installer `elasticdump`

    Nous recommandons [`elasticdump`](https://github.com/elasticsearch-dump/elasticsearch-dump) pour exporter des données depuis Elasticsearch. Cet outil nécessite `node` et doit être installé sur une machine bénéficiant d'une bonne connectivité réseau avec Elasticsearch et ClickHouse. Nous recommandons un serveur dédié avec au moins 4 cœurs et 16 Go de RAM pour la plupart des exportations.

    ```shell theme={null}
    npm install elasticdump -g
    ```

    `elasticdump` présente plusieurs avantages pour la migration des données :

    * Il interagit directement avec l'API REST d'Elasticsearch, ce qui garantit une exportation correcte des données.
    * Il préserve la cohérence des données pendant le processus d'exportation grâce à l'API Point-in-Time (PIT) ; cela crée un instantané cohérent des données à un moment précis.
    * Il exporte directement les données au format JSON, qui peut être transmis en flux au ClickHouse client pour l'insertion.

    Dans la mesure du possible, nous recommandons d'exécuter ClickHouse, Elasticsearch et `elastic dump` dans la même zone de disponibilité ou le même centre de données afin de minimiser le trafic réseau sortant et de maximiser le débit.
  </Step>

  <Step>
    ### Installer ClickHouse client

    Assurez-vous que ClickHouse est [installé sur le serveur](/fr/get-started/setup/install) où se trouve `elasticdump`. **Ne démarrez pas de serveur ClickHouse** - ces étapes ne nécessitent que le client.
  </Step>

  <Step>
    ### Transférer les données en flux continu

    Pour transférer des données en flux continu entre Elasticsearch et ClickHouse, utilisez la commande `elasticdump` en envoyant directement sa sortie vers le `clickhouse client`. La commande suivante insère les données dans notre table bien structurée `logs_system_syslog`.

    ```shell theme={null}
    # export url and credentials
    export ELASTICSEARCH_INDEX=.ds-logs-system.syslog-default-2025.06.03-000001
    export ELASTICSEARCH_URL=
    export ELASTICDUMP_INPUT_USERNAME=
    export ELASTICDUMP_INPUT_PASSWORD=
    export CLICKHOUSE_HOST=
    export CLICKHOUSE_PASSWORD=
    export CLICKHOUSE_USER=default

    # command to run - modify as required
    elasticdump --input=${ELASTICSEARCH_URL} --type=data --input-index ${ELASTICSEARCH_INDEX} --output=$ --sourceOnly --searchAfter --pit=true | 
    clickhouse-client --host ${CLICKHOUSE_HOST} --secure --password ${CLICKHOUSE_PASSWORD} --user ${CLICKHOUSE_USER} --max_insert_block_size=1000 \
    --min_insert_block_size_bytes=0 --min_insert_block_size_rows=1000 --query="INSERT INTO test.logs_system_syslog FORMAT JSONEachRow"
    ```

    Notez l’utilisation des options suivantes avec `elasticdump` :

    * `type=data` - limite la réponse au seul contenu du document dans Elasticsearch.
    * `input-index` - notre index d’entrée Elasticsearch.
    * `output=$` - redirige tous les résultats vers stdout.
    * l’option `sourceOnly`, qui garantit l’omission des champs de métadonnées dans la réponse.
    * l’option `searchAfter`, pour utiliser l’[`API searchAfter`](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/paginate-search-results#search-after) afin de paginer efficacement les résultats.
    * `pit=true` pour garantir des résultats cohérents entre les requêtes à l’aide de la [Point in Time API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-open-point-in-time).

    <br />

    Voici les paramètres du client ClickHouse utilisés ici (hormis les informations d’authentification) :

    * `max_insert_block_size=1000` - le client ClickHouse enverra les données dès que ce nombre de lignes sera atteint. L’augmenter améliore le débit, au prix d’un temps de constitution du bloc plus long, ce qui allonge donc le délai avant que les données n’apparaissent dans ClickHouse.
    * `min_insert_block_size_bytes=0` - désactive le squashing des blocks côté server en fonction du nombre d’octets.
    * `min_insert_block_size_rows=1000` - regroupe les blocks provenant des clients côté server. Dans ce cas, nous le définissons sur `max_insert_block_size` afin que les lignes apparaissent immédiatement. Augmentez cette valeur pour améliorer le débit.
    * `query="INSERT INTO logs_system_syslog FORMAT JSONAsRow"` - insère les données au [format JSONEachRow](/fr/guides/clickhouse/data-formats/json/formats). Cela convient à un schéma bien défini tel que `logs_system_syslog.`

    <br />

    **Vous pouvez vous attendre à un débit de l’ordre de plusieurs milliers de lignes par seconde.**

    <Info>
      **Insertion dans une seule ligne JSON**

      Si vous insérez dans une seule colonne JSON (voir le schéma `syslog_json` ci-dessus), la même commande d’insertion peut être utilisée. Cependant, vous devez spécifier `JSONAsObject` comme format au lieu de `JSONEachRow`, par ex.

      ```shell theme={null}
      elasticdump --input=${ELASTICSEARCH_URL} --type=data --input-index ${ELASTICSEARCH_INDEX} --output=$ --sourceOnly --searchAfter --pit=true | 
      clickhouse-client --host ${CLICKHOUSE_HOST} --secure --password ${CLICKHOUSE_PASSWORD} --user ${CLICKHOUSE_USER} --max_insert_block_size=1000 \
      --min_insert_block_size_bytes=0 --min_insert_block_size_rows=1000 --query="INSERT INTO test.logs_system_syslog FORMAT JSONAsObject"
      ```

      Voir ["Lire du JSON comme un objet"](/fr/guides/clickhouse/data-formats/json/formats#reading-json-as-an-object) pour en savoir plus.
    </Info>
  </Step>

  <Step>
    ### Transformer les données (facultatif)

    Les commandes ci-dessus supposent une correspondance 1:1 entre les champs Elasticsearch et les colonnes ClickHouse. Il est souvent nécessaire de filtrer et de transformer les données Elasticsearch avant de les insérer dans ClickHouse.

    Pour cela, on peut utiliser la fonction de table [`input`](/fr/reference/functions/table-functions/input), qui permet d’exécuter n’importe quelle requête `SELECT` sur stdout.

    Supposons que nous souhaitions stocker uniquement les champs `timestamp` et `hostname` de nos données précédentes. Le schéma ClickHouse :

    ```sql theme={null}
    CREATE TABLE logs_system_syslog_v2
    (
        `timestamp` DateTime,
        `hostname` String
    )
    ENGINE = MergeTree
    ORDER BY (hostname, timestamp)
    ```

    Pour insérer des données depuis `elasticdump` dans cette table, nous pouvons simplement utiliser la fonction de table `input` pour que le type JSON détecte et sélectionne dynamiquement les colonnes requises. Notez que cette requête `SELECT` pourrait facilement inclure un filtre.

    ```shell theme={null}
    elasticdump --input=${ELASTICSEARCH_URL} --type=data --input-index ${ELASTICSEARCH_INDEX} --output=$ --sourceOnly --searchAfter --pit=true |
    clickhouse-client --host ${CLICKHOUSE_HOST} --secure --password ${CLICKHOUSE_PASSWORD} --user ${CLICKHOUSE_USER} --max_insert_block_size=1000 \
    --min_insert_block_size_bytes=0 --min_insert_block_size_rows=1000 --query="INSERT INTO test.logs_system_syslog_v2 SELECT json.\`@timestamp\` as timestamp, json.host.hostname as hostname FROM input('json JSON') FORMAT JSONAsObject"
    ```

    Notez qu’il faut échapper le nom du champ `@timestamp` et utiliser le format d’entrée `JSONAsObject`.
  </Step>
</Steps>
