الانتقال إلى المحتوى الرئيسي
يوفّر هذا المحرك تكاملًا مع منظومة Azure Blob Storage، ما يتيح استيراد البيانات المتدفقة.

إنشاء جدول

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
معلمات المحرّك معلمات AzureQueue هي نفسها المعلمات التي يدعمها محرك الجدول AzureBlobStorage. راجع قسم المعلمات هنا. وعلى غرار محرك الجدول AzureBlobStorage، يمكن للمستخدمين استخدام المحاكي Azurite لاستخدام Azure Storage محليًا أثناء التطوير. مزيد من التفاصيل هنا. مثال
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

الإعدادات

مجموعة الإعدادات المدعومة هي إلى حدّ كبير نفسها الخاصة بمحرك الجدول S3Queue، ولكن من دون البادئة s3queue_. راجع القائمة الكاملة للإعدادات. للحصول على قائمة بالإعدادات المهيأة للجدول، استخدم الجدول system.azure_queue_settings. هذا متاح بدءًا من 24.10. فيما يلي الإعدادات المتوافقة فقط مع AzureQueue ولا تنطبق على S3Queue.

after_processing_move_connection_string

سلسلة الاتصال الخاصة بـ Azure Blob Storage لنقل الملفات التي تمت معالجتها بنجاح إليها، إذا كانت الوجهة حاوية Azure أخرى. القيم الممكنة:
  • String.
القيمة الافتراضية: سلسلة فارغة.

after_processing_move_container

اسم الحاوية التي تُنقل إليها الملفات التي تمت معالجتها بنجاح، إذا كانت الوجهة حاوية Azure أخرى. القيم المحتملة:
  • String.
القيمة الافتراضية: سلسلة فارغة. مثال:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

SELECT من محرك الجدول AzureQueue

تكون استعلامات SELECT محظورة افتراضيًا على جداول AzureQueue. ويتبع ذلك نمط قائمة الانتظار الشائع، حيث تُقرأ البيانات مرة واحدة ثم تُزال من قائمة الانتظار. ويُحظر SELECT لمنع فقدان البيانات عن طريق الخطأ. ومع ذلك، قد يكون هذا مفيدًا أحيانًا. وللقيام بذلك، تحتاج إلى تعيين الإعداد stream_like_engine_allow_direct_select إلى True. يحتوي محرك AzureQueue على إعداد خاص لاستعلامات SELECT: commit_on_select. عيّنه إلى False للاحتفاظ بالبيانات في قائمة الانتظار بعد قراءتها، أو إلى True لإزالتها.

الوصف

لا يُعد SELECT مفيدًا كثيرًا في streaming import (إلا لأغراض debugging)، لأن كل ملف لا يمكن استيراده إلا مرة واحدة. ومن العملي أكثر إنشاء مسارات معالجة آنية باستخدام العروض المادية. وللقيام بذلك:
  1. استخدم المحرّك لإنشاء جدول للاستهلاك من المسار المحدد في Azure Blob Storage واعتبره تدفق بيانات.
  2. أنشئ جدولًا بالبنية المطلوبة.
  3. أنشئ عرضًا ماديًا يحوّل البيانات من المحرّك ويضعها في جدول تم إنشاؤه مسبقًا.
عند ربط MATERIALIZED VIEW بالمحرّك، يبدأ في جمع البيانات في الخلفية. تكون وسيطات المحرّك بالصيغة AzureQueue(connection_string, container_name, blobpath, format[, compression]). مثال:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

الأعمدة الافتراضية

  • _path — مسار الملف.
  • _file — اسم الملف.
لمزيد من المعلومات عن الأعمدة الافتراضية، راجع هنا.

الاستقصاء الداخلي

فعِّل التسجيل للجدول عبر إعداد الجدول enable_logging_to_queue_log=1. قدرات الاستقصاء الداخلي مماثلة لتلك الخاصة بـ محرك جدول S3Queue، مع بعض الاختلافات الواضحة:
  1. استخدم system.azure_queue_metadata_cache للحالة الموجودة في الذاكرة الخاصة بالطابور في إصدارات الخادم >= 25.1. أمّا في الإصدارات الأقدم، فاستخدم system.s3queue_metadata_cache (وسيحتوي أيضًا على معلومات عن جداول azure).
  2. فعِّل system.azure_queue_log عبر إعداد ClickHouse الرئيسي، على سبيل المثال:
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
يحتوي هذا الجدول الدائم على المعلومات نفسها الموجودة في system.s3queue_metadata_cache، لكنها تخص الملفات التي تمت معالجتها والملفات الفاشلة. يحتوي الجدول على البنية التالية:

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'Hostname',
    `event_date` Date COMMENT 'Event date of writing this log row',
    `event_time` DateTime COMMENT 'Event time of writing this log row',
    `database` String COMMENT 'The name of a database where current S3Queue table lives.',
    `table` String COMMENT 'The name of S3Queue table.',
    `uuid` String COMMENT 'The UUID of S3Queue table',
    `file_name` String COMMENT 'File name of the processing file',
    `rows_processed` UInt64 COMMENT 'Number of processed rows',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Status of the processing file',
    `processing_start_time` Nullable(DateTime) COMMENT 'Time of the start of processing the file',
    `processing_end_time` Nullable(DateTime) COMMENT 'Time of the end of processing the file',
    `exception` String COMMENT 'Exception message if happened'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Contains logging entries with the information files processes by S3Queue engine.'

مثال:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

آخر تعديل في ٢٥ يونيو ٢٠٢٦