From a5558100c0b9271bbfaf82cf2c3618d157565e86 Mon Sep 17 00:00:00 2001 From: yoshidan Date: Tue, 27 Feb 2024 18:03:22 +0900 Subject: [PATCH 1/2] update version --- bigquery/Cargo.toml | 6 ++-- foundation/auth/Cargo.toml | 2 +- pubsub/Cargo.toml | 2 +- pubsub/src/client.rs | 67 ++++++++++++++++++++++++++++++++++++++ spanner/Cargo.toml | 2 +- storage/Cargo.toml | 2 +- 6 files changed, 74 insertions(+), 7 deletions(-) diff --git a/bigquery/Cargo.toml b/bigquery/Cargo.toml index a02c3bad..01ca0a5b 100644 --- a/bigquery/Cargo.toml +++ b/bigquery/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "google-cloud-bigquery" -version = "0.6.0" +version = "0.7.0" edition = "2021" authors = ["yoshidan "] repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/bigquery" @@ -22,9 +22,9 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version="1.32", features=["macros"] } time = { version = "0.3", features = ["std", "macros", "formatting", "parsing", "serde"] } -arrow = { version="44.0", default-features = false, features = ["ipc"] } +arrow = { version="50.0", default-features = false, features = ["ipc"] } base64 = "0.21" -bigdecimal = { version="0.3", features=["serde"] } +bigdecimal = { version="0.4", features=["serde"] } num-bigint = "0.4" backon = "0.4" reqwest-middleware = "0.2" diff --git a/foundation/auth/Cargo.toml b/foundation/auth/Cargo.toml index ec2b37ab..9b65d24b 100644 --- a/foundation/auth/Cargo.toml +++ b/foundation/auth/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "google-cloud-auth" -version = "0.13.0" +version = "0.13.1" authors = ["yoshidan "] edition = "2021" repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/foundation/auth" diff --git a/pubsub/Cargo.toml b/pubsub/Cargo.toml index be660a25..e333b11f 100644 --- a/pubsub/Cargo.toml +++ b/pubsub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "google-cloud-pubsub" -version = "0.22.1" +version = "0.23.0" authors = ["yoshidan "] edition = "2021" repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/pubsub" diff --git a/pubsub/src/client.rs b/pubsub/src/client.rs index ff8c5c52..15c86f38 100644 --- a/pubsub/src/client.rs +++ b/pubsub/src/client.rs @@ -458,12 +458,16 @@ mod tests { #[cfg(test)] mod tests_in_gcp { + use std::collections::{HashMap, HashSet}; + use std::sync::{Arc, Mutex}; use crate::client::{Client, ClientConfig}; use crate::publisher::PublisherConfig; use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use serial_test::serial; use std::time::Duration; + use tokio_util::sync::CancellationToken; + use google_cloud_gax::grpc::codegen::tokio_stream::StreamExt; fn make_msg(key: &str) -> PubsubMessage { PubsubMessage { @@ -581,4 +585,67 @@ mod tests_in_gcp { tracing::info!("msg id {}", awaiter.get().await.unwrap()); } } + #[tokio::test] + #[serial] + #[ignore] + async fn test_subscribe_exactly_once_delivery() { + let client = Client::new(ClientConfig::default().with_auth().await.unwrap()) + .await + .unwrap(); + + // Check if the subscription is exactly_once_delivery + let subscription = client.subscription("eod-test"); + let config = subscription.config(None).await.unwrap().1; + assert!(config.enable_exactly_once_delivery); + + // publish message + let ctx = CancellationToken::new(); + let ctx_pub = ctx.clone(); + let publisher = client.topic("eod-test").new_publisher(None); + let pub_task = tokio::spawn(async move { + tracing::info!("start publisher"); + loop { + if ctx_pub.is_cancelled() { + tracing::info!("finish publisher"); + return; + } + publisher.publish_blocking(PubsubMessage { + data: "msg".into(), + ..Default::default() + }).get().await.unwrap(); + } + }); + + // subscribe message + let ctx_sub = ctx.clone(); + let sub_task = tokio::spawn(async move { + tracing::info!("start subscriber"); + let mut stream = subscription.subscribe(None).await.unwrap(); + let mut msgs = HashMap::new(); + while let Some(message) = stream.next().await { + if ctx_sub.is_cancelled() { + break; + } + let msg_id = &message.message.message_id; + *msgs.entry(msg_id.clone()).or_insert(0) += 1; + message.ack().await.unwrap(); + } + tracing::info!("finish subscriber"); + return msgs; + }); + + tokio::time::sleep(Duration::from_secs(30)).await; + + // check redelivered messages + ctx.cancel(); + let _ = pub_task.await.unwrap(); + let received_msgs = sub_task.await.unwrap(); + assert!(received_msgs.len() > 0); + + tracing::info!("Number of received messages = {}", received_msgs.len()); + for (msg_id , count) in received_msgs { + assert_eq!(count, 1, "msg_id = {msg_id}, count = {count}"); + } + } + } diff --git a/spanner/Cargo.toml b/spanner/Cargo.toml index acde0a2d..014775cd 100644 --- a/spanner/Cargo.toml +++ b/spanner/Cargo.toml @@ -21,7 +21,7 @@ parking_lot = "0.12" base64 = "0.21" serde = { version = "1.0", optional = true, features = ["derive"] } tokio-util = "0.7" -bigdecimal = { version="0.3", features=["serde"] } +bigdecimal = { version="0.4", features=["serde"] } google-cloud-token = { version = "0.1.1", path = "../foundation/token" } google-cloud-longrunning = { version = "0.17.0", path = "../foundation/longrunning" } diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 337254c3..c3da70c6 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "google-cloud-storage" -version = "0.15.0" +version = "0.16.0" edition = "2021" authors = ["yoshidan "] repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/storage" From 51079b8f0239b38911b2e1b520f4ffab83b19092 Mon Sep 17 00:00:00 2001 From: yoshidan Date: Tue, 27 Feb 2024 18:06:26 +0900 Subject: [PATCH 2/2] fix clippy --- pubsub/src/client.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/pubsub/src/client.rs b/pubsub/src/client.rs index 15c86f38..00bab54c 100644 --- a/pubsub/src/client.rs +++ b/pubsub/src/client.rs @@ -458,16 +458,16 @@ mod tests { #[cfg(test)] mod tests_in_gcp { - use std::collections::{HashMap, HashSet}; - use std::sync::{Arc, Mutex}; use crate::client::{Client, ClientConfig}; use crate::publisher::PublisherConfig; use google_cloud_gax::conn::Environment; + use google_cloud_gax::grpc::codegen::tokio_stream::StreamExt; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use serial_test::serial; + use std::collections::HashMap; + use std::time::Duration; use tokio_util::sync::CancellationToken; - use google_cloud_gax::grpc::codegen::tokio_stream::StreamExt; fn make_msg(key: &str) -> PubsubMessage { PubsubMessage { @@ -609,10 +609,14 @@ mod tests_in_gcp { tracing::info!("finish publisher"); return; } - publisher.publish_blocking(PubsubMessage { - data: "msg".into(), - ..Default::default() - }).get().await.unwrap(); + publisher + .publish_blocking(PubsubMessage { + data: "msg".into(), + ..Default::default() + }) + .get() + .await + .unwrap(); } }); @@ -631,21 +635,20 @@ mod tests_in_gcp { message.ack().await.unwrap(); } tracing::info!("finish subscriber"); - return msgs; + msgs }); tokio::time::sleep(Duration::from_secs(30)).await; // check redelivered messages ctx.cancel(); - let _ = pub_task.await.unwrap(); + pub_task.await.unwrap(); let received_msgs = sub_task.await.unwrap(); - assert!(received_msgs.len() > 0); + assert!(!received_msgs.is_empty()); tracing::info!("Number of received messages = {}", received_msgs.len()); - for (msg_id , count) in received_msgs { + for (msg_id, count) in received_msgs { assert_eq!(count, 1, "msg_id = {msg_id}, count = {count}"); } } - }