Passer au contenu principal
Ce moteur s’intègre à l’écosystème Azure Blob Storage, permettant l’importation de données en streaming.

Créer une table

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
Paramètres du moteur Les paramètres de AzureQueue sont les mêmes que ceux pris en charge par le moteur de table AzureBlobStorage. Consultez la section des paramètres ici. Comme avec le moteur de table AzureBlobStorage, les utilisateurs peuvent utiliser l’émulateur Azurite pour le développement local avec Azure Storage. Pour plus de détails, cliquez ici. Exemple
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

Paramètres

L’ensemble des paramètres pris en charge est pour l’essentiel le même que pour le moteur de table S3Queue, mais sans le préfixe s3queue_. Voir la liste complète des paramètres. Pour obtenir la liste des paramètres configurés pour la table, utilisez la table system.azure_queue_settings. Disponible à partir de 24.10. Vous trouverez ci-dessous les paramètres compatibles uniquement avec AzureQueue et non applicables à S3Queue.

after_processing_move_connection_string

Chaîne de connexion à Azure Blob Storage vers laquelle déplacer les fichiers traités avec succès, si la destination est un autre conteneur Azure. Valeurs possibles :
  • String.
Valeur par défaut : chaîne vide.

after_processing_move_container

Nom du conteneur vers lequel déplacer les fichiers traités avec succès si la destination est un autre conteneur Azure. Valeurs possibles :
  • String.
Valeur par défaut : chaîne vide. Exemple :
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

SELECT à partir du moteur de table AzureQueue

Les requêtes SELECT sont interdites par défaut sur les tables AzureQueue. Cela suit le modèle classique des files d’attente, où les données sont lues une seule fois, puis retirées de la file d’attente. SELECT est interdit afin d’éviter toute perte accidentelle de données. Cependant, cela peut parfois s’avérer utile. Pour ce faire, vous devez définir le paramètre stream_like_engine_allow_direct_select sur True. Le moteur AzureQueue dispose d’un paramètre spécial pour les requêtes SELECT : commit_on_select. Définissez-le sur False pour conserver les données dans la file d’attente après leur lecture, ou sur True pour les supprimer.

Description

SELECT n’est pas particulièrement utile pour l’importation en continu (sauf pour le débogage), car chaque fichier ne peut être importé qu’une seule fois. Il est plus pratique de mettre en place un traitement en temps réel à l’aide de vues matérialisées. Pour ce faire :
  1. Utilisez le moteur pour créer une table qui lit depuis le chemin spécifié dans Azure Blob Storage et traitez-la comme un flux de données.
  2. Créez une table avec la structure souhaitée.
  3. Créez une vue matérialisée qui convertit les données du moteur et les insère dans une table créée précédemment.
Lorsque la MATERIALIZED VIEW est associée au moteur, elle commence à collecter les données en arrière-plan. Les arguments du moteur ont la forme AzureQueue(connection_string, container_name, blobpath, format[, compression]). Exemple :
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

Colonnes virtuelles

  • _path — Chemin d’accès au fichier.
  • _file — Nom du fichier.
Pour plus d’informations sur les colonnes virtuelles, consultez ici.

Introspection

Activez la journalisation pour la table via le paramètre de table enable_logging_to_queue_log=1. Les capacités d’introspection sont les mêmes que pour le moteur de table S3Queue, avec toutefois quelques différences notables :
  1. Utilisez system.azure_queue_metadata_cache pour l’état en mémoire de la file d’attente pour les versions du serveur >= 25.1. Pour les versions antérieures, utilisez system.s3queue_metadata_cache (il contiendrait également des informations sur les tables azure).
  2. Activez system.azure_queue_log via la configuration principale de ClickHouse, par exemple.
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
Cette table persistante contient les mêmes informations que system.s3queue_metadata_cache, mais pour les fichiers traités et ceux ayant échoué. La table a la structure suivante :

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'Hostname',
    `event_date` Date COMMENT 'Event date of writing this log row',
    `event_time` DateTime COMMENT 'Event time of writing this log row',
    `database` String COMMENT 'The name of a database where current S3Queue table lives.',
    `table` String COMMENT 'The name of S3Queue table.',
    `uuid` String COMMENT 'The UUID of S3Queue table',
    `file_name` String COMMENT 'File name of the processing file',
    `rows_processed` UInt64 COMMENT 'Number of processed rows',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Status of the processing file',
    `processing_start_time` Nullable(DateTime) COMMENT 'Time of the start of processing the file',
    `processing_end_time` Nullable(DateTime) COMMENT 'Time of the end of processing the file',
    `exception` String COMMENT 'Exception message if happened'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Contains logging entries with the information files processes by S3Queue engine.'

Exemple :
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

Dernière modification le 25 juin 2026