Apache Avro est un format de sérialisation orienté ligne qui utilise un encodage binaire pour traiter efficacement les données. Le format AvroConfluent prend en charge la lecture et l’écriture de messages encodés en Avro à l’aide du Confluent Schema Registry (ou de services compatibles avec l’API).
Chaque message utilise le wire format de Confluent : un octet magique (0x00), suivi d’un schema ID de 4 octets en big-endian, puis de la donnée binaire Avro. En lecture, ClickHouse résout le schema ID en interrogeant le registry. En écriture, ClickHouse enregistre le schéma dérivé des colonnes de sortie et ajoute l’ID obtenu en préfixe de chaque ligne. Les schémas sont mis en cache pour des performances optimales.
Correspondance des types de données
Le tableau ci-dessous indique tous les types de données pris en charge par le format Apache Avro, ainsi que les types de données ClickHouse correspondants dans les requêtes INSERT et SELECT.
Type de données Avro INSERT | Type de données ClickHouse | Type de données Avro SELECT |
|---|
boolean, int, long, float, double | Int(8\16\32), UInt(8\16\32) | int |
boolean, int, long, float, double | Int64, UInt64 | long |
boolean, int, long, float, double | Float32 | float |
boolean, int, long, float, double | Float64 | double |
bytes, string, fixed, enum | String | bytes ou string * |
bytes, string, fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T), union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
** Le type Variant accepte implicitement null comme valeur de champ. Ainsi, par exemple, le union(T1, T2, null) d’Avro sera converti en Variant(T1, T2).
Par conséquent, lors de la génération d’Avro à partir de ClickHouse, nous devons toujours inclure le type null dans le union d’Avro, car nous ne savons pas, lors de l’inférence du schéma, si une valeur est effectivement null.
*** Types logiques Avro
Types de données logiques Avro non pris en charge :
time-millis
time-micros
duration
| Paramètre | Description | Valeur par défaut |
|---|
input_format_avro_allow_missing_fields | Indique s’il faut utiliser une valeur par défaut au lieu de renvoyer une erreur lorsqu’un champ est introuvable dans le schéma. | 0 |
input_format_avro_null_as_default | Indique s’il faut utiliser une valeur par défaut au lieu de renvoyer une erreur lors de l’insertion d’une valeur null dans une colonne non nullable. | 0 |
format_avro_schema_registry_url | L’URL du Confluent Schema Registry. Pour l’authentification de base, des identifiants encodés dans l’URL peuvent être inclus directement dans le chemin de l’URL. | |
format_avro_schema_registry_connection_timeout | Délai d’expiration de connexion, en secondes, pour le client HTTP du Schema Registry (utilisé à la fois pour la récupération et l’enregistrement du schéma). Doit être supérieur à 0 et inférieur à 600 (10 minutes). | 1 |
format_avro_schema_registry_send_timeout | Délai d’expiration d’envoi, en secondes, pour le client HTTP du Schema Registry. Doit être supérieur à 0 et inférieur à 600 (10 minutes). | 1 |
format_avro_schema_registry_receive_timeout | Délai d’expiration de réception, en secondes, pour le client HTTP du Schema Registry. Doit être supérieur à 0 et inférieur à 600 (10 minutes). | 1 |
output_format_avro_confluent_subject | En sortie : nom du sujet sous lequel le schéma est enregistré dans le Schema Registry. Obligatoire lors de l’écriture. | |
output_format_avro_string_column_pattern | En sortie : expression régulière des colonnes String à sérialiser en string Avro (par défaut, bytes). | |
Lecture à partir de Kafka
Pour lire un topic Kafka encodé en Avro à l’aide du Kafka table engine, utilisez le paramètre format_avro_schema_registry_url pour fournir l’URL du registre de schémas.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';
SELECT * FROM topic1_stream;
Pour écrire des messages AvroConfluent dans un topic Kafka, définissez l’URL du registre de schémas ainsi que le nom du sujet. Le schéma est automatiquement enregistré dans le registre de schémas lors de la première écriture.
CREATE TABLE topic1_sink
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';
INSERT INTO topic1_sink VALUES ('hello', 'world');
Utiliser l’authentification de base
Si votre registre de schémas nécessite une authentification de base (par exemple, si vous utilisez Confluent Cloud), vous pouvez fournir des identifiants encodés au format URL dans le paramètre format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
Pour surveiller la progression de l’ingestion et déboguer les erreurs du consumer Kafka, vous pouvez interroger la table système system.kafka_consumers. Si votre déploiement comporte plusieurs répliques (par exemple, ClickHouse Cloud), vous devez utiliser la fonction de table clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
Si vous rencontrez des problèmes de résolution du schéma, vous pouvez utiliser kafkacat avec clickhouse-local pour effectuer le dépannage :
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
Dernière modification le 25 juin 2026