عميل Rust الرسمي للاتصال بـ ClickHouse، وقد طوّره في الأصل Paul Loyd. تتوفر الشفرة المصدرية للعميل في مستودع GitHub.
- يستخدم
serde لترميز الصفوف وفك ترميزها.
- يدعم سمات
serde: skip_serializing وskip_deserializing وrename.
- يستخدم تنسيق
RowBinary عبر بروتوكول HTTP.
- توجد خطط للانتقال إلى
Native عبر TCP.
- يدعم TLS (عبر ميزتَي
native-tls وrustls-tls).
- يدعم الضغط وفك الضغط (LZ4).
- يوفّر واجهات API للاستعلام عن البيانات أو إدراجها، وتنفيذ عبارات DDL، والتجميع على جانب العميل.
- يوفّر كائنات محاكاة ملائمة لاختبار الوحدات.
لاستخدام هذه الحزمة، أضف ما يلي إلى ملف Cargo.toml:
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
انظر أيضًا: صفحة crates.io.
lz4 (مفعّلة افتراضيًا) — تفعّل الخيارين Compression::Lz4 وCompression::Lz4Hc(_). وإذا كانت مفعّلة، فسيُستخدم Compression::Lz4 افتراضيًا لجميع queries باستثناء WATCH.
native-tls — تدعم عناوين URL ذات المخطط HTTPS عبر hyper-tls، والذي يرتبط بـ OpenSSL.
rustls-tls — تدعم عناوين URL ذات المخطط HTTPS عبر hyper-rustls، والذي لا يرتبط بـ OpenSSL.
inserter — تفعّل client.inserter().
test-util — تضيف كائنات محاكاة. راجع المثال. استخدمها فقط في dev-dependencies.
watch — تفعّل إمكانات client.watch. راجع القسم المقابل لمزيد من التفاصيل.
uuid — تضيف serde::uuid للعمل مع حزمة uuid.
time — تضيف serde::time للعمل مع حزمة time.
عند الاتصال بـ ClickHouse عبر عنوان URL باستخدام HTTPS، يجب تفعيل إحدى الميزتين native-tls أو rustls-tls.
وإذا كانت الميزتان كلتاهما مفعّلتين، فستكون الأولوية للميزة rustls-tls.
يتوافق العميل مع إصدارات LTS من ClickHouse أو الإصدارات الأحدث، وكذلك مع ClickHouse Cloud.
يتعامل ClickHouse server الأقدم من v22.6 مع RowBinary على نحو غير صحيح في بعض الحالات النادرة.
يمكنك استخدام v0.11+ وتمكين الميزة wa-37420 لحل هذه المشكلة. ملاحظة: لا ينبغي استخدام هذه الميزة مع إصدارات ClickHouse الأحدث.
نسعى إلى تغطية سيناريوهات متنوعة لاستخدام العميل من خلال الأمثلة الموجودة في مستودع client. وتتوفر لمحة عامة في README الخاص بالأمثلة.
إذا كان هناك أي شيء غير واضح أو مفقود في الأمثلة أو في الوثائق التالية، فلا تتردد في التواصل معنا.
تُعد حزمة ch2rs مفيدة لتوليد نوع صف من ClickHouse.
أعِد استخدام العملاء المُنشأين أو انسخهم للاستفادة مجددًا من مجمع اتصالات hyper الأساسي.
use clickhouse::Client;
let client = Client::default()
// should include both protocol and port
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");
اتصال HTTPS أو ClickHouse Cloud
يعمل HTTPS مع ميزتَي Cargo rustls-tls أو native-tls.
بعد ذلك، أنشئ العميل كالمعتاد. في هذا المثال، تُستخدم متغيرات البيئة لتخزين تفاصيل الاتصال:
يجب أن يتضمن عنوان URL كلاً من البروتوكول والمنفذ، على سبيل المثال: https://instance.clickhouse.cloud:8443.
fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));
راجع أيضًا:
use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- يُستبدل العنصر النائب
?fields بـ no, name (حقول Row).
- يُستبدل العنصر النائب
? بالقيم في استدعاءات bind() التالية.
- يمكن استخدام الطريقتين الملائمتين
fetch_one::<Row>() وfetch_all::<Row>() للحصول على الصف الأول أو جميع الصفوف، على التوالي.
- يمكن استخدام
sql::Identifier لربط أسماء الجداول.
ملاحظة: بما أن الاستجابة بالكامل تُبث كتدفق، فقد تُرجع المؤشرات خطأً حتى بعد إرجاع بعض الصفوف. إذا حدث ذلك في حالة الاستخدام لديك، يمكنك تجربة query(...).with_option("wait_end_of_query", "1") لتمكين تخزين الاستجابة مؤقتًا على جانب الخادم. مزيد من التفاصيل. وقد يكون الخيار buffer_size مفيدًا أيضًا.
استخدم wait_end_of_query بحذر عند تحديد الصفوف، إذ قد يؤدي ذلك إلى زيادة استهلاك الذاكرة على جانب الخادم، ومن المرجح أن يُضعف الأداء العام.
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- إذا لم تُستدعَ
end()، تُلغى عملية INSERT.
- تُرسَل الصفوف تدريجيًا كتدفّق لتوزيع حمل الشبكة.
- يُدرج ClickHouse الدُفعات بصورة ذرّية فقط إذا كانت جميع الصفوف ضمن partition نفسها وكان عددها أقل من
max_insert_block_size.
الإدراج غير المتزامن (التجميع على جانب الخادم)
يمكنك استخدام عمليات الإدراج غير المتزامنة في ClickHouse لتجنّب تجميع البيانات الواردة على جانب العميل. ويمكن تنفيذ ذلك ببساطة من خلال تمرير الخيار async_insert إلى الطريقة insert (أو حتى إلى مثيل Client نفسه، بحيث يسري ذلك على جميع استدعاءات insert).
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
انظر أيضًا:
ميزة Inserter (التجميع من جهة العميل)
يتطلب تفعيل ميزة inserter في Cargo.
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
- يُنهي
Inserter عملية الإدراج النشطة في commit() إذا تم بلوغ أيٍّ من الحدود (max_bytes أو max_rows أو period).
- يمكن إزاحة الفاصل الزمني بين إنهاء عمليات
INSERT النشطة باستخدام with_period_bias لتجنّب ارتفاعات الحمل الناتجة عن المُدرِجات المتوازية.
- يمكن استخدام
Inserter::time_left() لاكتشاف وقت انتهاء الفترة الحالية. استدعِ Inserter::commit() مرة أخرى للتحقق من الحدود إذا كان التدفق لديك يُصدر العناصر على فترات متباعدة.
- تُنفَّذ الحدود الزمنية باستخدام crate quanta لتسريع
inserter. ولا تُستخدم إذا كان test-util مفعّلًا (وبالتالي يمكن إدارة الوقت بواسطة tokio::time::advance() في الاختبارات المخصّصة).
- تُدرَج جميع الصفوف بين استدعاءات
commit() ضمن تعليمة INSERT نفسها.
لا تنسَ تنفيذ flush إذا كنت تريد إنهاء عملية الإدراج أو إتمامها:
في عملية نشر بعقدة واحدة، يكفي تنفيذ عبارات DDLs كما يلي:
client.query("DROP TABLE IF EXISTS some").execute().await?;
ومع ذلك، في عمليات النشر العنقودية التي تستخدم موازن حمل أو ClickHouse Cloud، يُوصى بانتظار اكتمال تطبيق DDL على جميع النُسخ المتماثلة باستخدام الخيار wait_end_of_query. ويمكن القيام بذلك على النحو التالي:
client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;
يمكنك استخدام مجموعة متنوعة من إعدادات ClickHouse عبر الأسلوب with_option. على سبيل المثال:
let numbers = client
.query("SELECT number FROM system.numbers")
// This setting will be applied to this particular query only;
// it will override the global client setting.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;
إلى جانب query، ينطبق الأمر نفسه على الطريقتين insert وinserter؛ كما يمكن أيضًا استدعاء الطريقة نفسها على مثيل Client لضبط الإعدادات العامة لجميع الاستعلامات.
باستخدام .with_option، يمكنك تعيين الخيار query_id لتحديد الاستعلامات في سجل استعلامات ClickHouse.
let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;
وبالإضافة إلى query، فهو يعمل بطريقة مماثلة مع الطريقتين insert وinserter.
إذا عيّنت query_id يدويًا، فتأكد من أنه فريد. تُعد UUIDs خيارًا جيدًا لهذا الغرض.
انظر أيضًا: مثال query_id في مستودع العميل.
على غرار query_id، يمكنك تعيين session_id لتنفيذ التعليمات ضمن الجلسة نفسها. ويمكن تعيين session_id إما على مستوى العميل بشكل عام، أو لكل استدعاء query أو insert أو inserter على حدة.
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", "my-session");
في عمليات النشر العنقودية، ونظرًا إلى عدم توفّر “الجلسات الثابتة”، يجب أن تكون متصلًا بـ عقدة معيّنة في العنقود لكي تتمكّن من استخدام هذه الميزة بشكل صحيح، لأن موازن حمل من نوع round-robin، على سبيل المثال، لا يضمن أن الطلبات اللاحقة ستُعالَج على عقدة ClickHouse نفسها.
راجع أيضًا: مثال session_id في مستودع العميل.
إذا كنت تستخدم المصادقة عبر وكيل أو تحتاج إلى تمرير رؤوس مخصّصة، فيمكنك القيام بذلك كما يلي:
let client = Client::default()
.with_url("http://localhost:8123")
.with_header("X-My-Header", "hello");
راجع أيضًا: مثال على رؤوس HTTP مخصصة في مستودع مكتبة العميل.
قد يكون هذا مفيدًا لتعديل إعدادات تجمّع اتصالات HTTP الداخلي.
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;
let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// For how long keep a particular idle socket alive on the client side (in milliseconds).
// It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
// which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
.pool_idle_timeout(Duration::from_millis(2_500))
// Sets the maximum idle Keep-Alive connections allowed in the pool.
.pool_max_idle_per_host(4)
.build(connector);
let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
يعتمد هذا المثال على واجهة Hyper API القديمة، وقد تتغير في المستقبل.
انظر أيضًا: مثال على عميل HTTP مخصص في مستودع العميل.
راجع أيضًا الأمثلة الإضافية التالية:
- يقابل
(U)Int(8|16|32|64|128) الأنواع المناظرة (u|i)(8|16|32|64|128) ذهابًا وإيابًا، أو newtypes الملتفة حولها.
- لا يدعم
(U)Int256 مباشرةً، ولكن يوجد حل بديل لذلك.
- يقابل
Float(32|64) الأنواع المناظرة f(32|64) ذهابًا وإيابًا، أو newtypes الملتفة حولها.
- يقابل
Decimal(32|64|128) الأنواع المناظرة i(32|64|128) ذهابًا وإيابًا، أو newtypes الملتفة حولها. ومن العملي أكثر استخدام fixnum أو أي تنفيذ آخر للأعداد العشرية الثابتة الموقعة.
- يقابل
Boolean النوع bool ذهابًا وإيابًا، أو newtypes الملتفة حوله.
- يقابل
String أي نوع من أنواع السلاسل النصية أو البايتات ذهابًا وإيابًا، مثل &str و&[u8] وString وVec<u8> أو SmartString. كما أن الأنواع الجديدة مدعومة أيضًا. ولتخزين البايتات، يُفضَّل استخدام serde_bytes، لأنه أكثر كفاءة.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
FixedString(N) مدعوم بوصفه مصفوفة من البايتات، مثل [u8; N].
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}
#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
- يُحوَّل
UUID من/إلى uuid::Uuid باستخدام serde::uuid. ويتطلب ذلك feature uuid.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
- يمكن تحويل
Date من/إلى u16 أو newtype مبني عليه، وهو يمثّل عدد الأيام المنقضية منذ 1970-01-01. كما أن time::Date مدعوم أيضًا باستخدام serde::time::date، وهذا يتطلب تفعيل الميزة time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
- يمكن تحويل
Date32 من/إلى i32 أو newtype مبني عليه، وهو يمثّل عدد الأيام المنقضية منذ 1970-01-01. كما أن time::Date مدعوم عند استخدام serde::time::date32، ويتطلب ذلك تفعيل الميزة time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
- يقابل
DateTime القيمة u32 ذهابًا وإيابًا، أو newtype مبنيًا عليها، ويمثل عدد الثواني المنقضية منذ حقبة UNIX. كما أن time::OffsetDateTime مدعوم عند استخدام serde::time::datetime، لكن ذلك يتطلب تفعيل الميزة time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
- يُحوَّل
DateTime64(_) من/إلى i32 أو newtype يغلّفه، ويمثل وقتًا منقضيًا منذ حقبة UNIX. كما أن time::OffsetDateTime مدعوم عبر استخدام serde::time::datetime64::*، ويتطلب ذلك تفعيل الميزة time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
-
Tuple(A, B, ...) يمكن تحويله من/إلى (A, B, ...) أو نوع جديد يغلّفه.
-
Array(_) يمكن تحويله من/إلى أي slice، مثل Vec<_> و&[_]. الأنواع الجديدة مدعومة أيضًا.
-
Map(K, V) يتعامل مثل Array((K, V)).
-
LowCardinality(_) مدعوم بسلاسة.
-
Nullable(_) يمكن تحويله من/إلى Option<_>. بالنسبة إلى مساعدات clickhouse::serde::*، أضف ::option.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
- يُدعَم
Nested عبر توفير عدة مصفوفات مع إعادة تسميتها.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
- الأنواع
Geo مدعومة. ويتصرف Point مثل زوج مرتب (f64, f64)، أما بقية الأنواع فهي مجرد مقاطع من النقاط.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
- أنواع البيانات
Variant وDynamic وJSON الجديدة غير مدعومة بعد.
توفّر الحزمة أدوات مساعدة لمحاكاة خادم CH واختبار استعلامات DDL وSELECT وINSERT وWATCH. يمكن تفعيل هذه الإمكانية باستخدام الميزة test-util. استخدمها فقط كتبعية للتطوير.
اطّلع على المثال.
السبب الأكثر شيوعًا لخطأ CANNOT_READ_ALL_DATA هو أن تعريف الصف في جهة التطبيق لا يطابق التعريف المقابل له في ClickHouse.
تأمل الجدول التالي:
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
ثم إذا كان EventLog مُعرَّفًا من جهة التطبيق بأنواع غير متطابقة، على سبيل المثال:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- should be u32 instead!
}
عند إدخال البيانات، قد يظهر الخطأ التالي:
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
في هذا المثال، يُعالَج ذلك بالتعريف الصحيح للبنية EventLog:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}
- أنواع البيانات
Variant وDynamic وJSON (الجديدة) غير مدعومة حتى الآن.
- ربط المعلّمات على جهة الخادم غير مدعوم حتى الآن؛ راجع هذه المشكلة لمتابعتها.
إذا كانت لديك أي أسئلة أو كنت بحاجة إلى مساعدة، فلا تتردد في التواصل معنا عبر Community Slack أو من خلال مشكلات GitHub.