From f4b6c628c9ddb7dc0e366de8f19382fe7f861c40 Mon Sep 17 00:00:00 2001 From: yoshidan Date: Sat, 25 May 2024 12:29:54 +0900 Subject: [PATCH] tweak doc --- pubsub/README.md | 37 ++++++++++++++++++------------------- pubsub/src/lib.rs | 23 +++++++++++------------ 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/pubsub/README.md b/pubsub/README.md index 43c8c151..63ce7538 100644 --- a/pubsub/README.md +++ b/pubsub/README.md @@ -179,31 +179,30 @@ async fn run(config: ClientConfig) -> Result<(), Status> { ### Subscribe Message (Alternative Way) -```no_run +```rust use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::subscription::SubscriptionConfig; use google_cloud_gax::grpc::Status; -use std::time::Duration; use tokio_util::sync::CancellationToken; use futures_util::StreamExt; -async fn run(config: ClientConfig) -> Result<(), Status> { - // Creating Client, Topic and Subscription... - let client = Client::new(config).await.unwrap(); - let subscription = client.subscription("test-subscription"); - - // Read the messages as a stream - // (needs futures_util::StreamExt as import) - // Note: This blocks the current thread but helps working with non clonable data - let mut stream = subscription.subscribe(None).await?; - while let Some(message) = stream.next().await { - // Handle data. - println!("Got Message: {:?}", message.message); - - // Ack or Nack message. - let _ = message.ack().await; - } - Ok(()) +async fn run(config: ClientConfig, cancel: CancellationToken) -> Result<(), Status> { + // Creating Client, Topic and Subscription... + let client = Client::new(config).await.unwrap(); + let subscription = client.subscription("test-subscription"); + + // Read the messages as a stream + let mut stream = subscription.subscribe(None).await.unwrap(); + while let Some(message) = tokio::select! { + msg = stream.next() => msg, + _ = cancel.cancelled() => None + } { + message.ack().await.unwrap(); + } + stream.dispose().await; + + // Wait for stream dispose + Ok(()) } ``` \ No newline at end of file diff --git a/pubsub/src/lib.rs b/pubsub/src/lib.rs index 23ca124b..7c4971bb 100644 --- a/pubsub/src/lib.rs +++ b/pubsub/src/lib.rs @@ -169,31 +169,30 @@ //! //! ### Subscribe Message (Alternative Way) //! -//! ```no_run +//! ``` //! use google_cloud_pubsub::client::{Client, ClientConfig}; //! use google_cloud_googleapis::pubsub::v1::PubsubMessage; //! use google_cloud_pubsub::subscription::SubscriptionConfig; //! use google_cloud_gax::grpc::Status; -//! use std::time::Duration; //! use tokio_util::sync::CancellationToken; //! use futures_util::StreamExt; //! -//! async fn run(config: ClientConfig) -> Result<(), Status> { +//! async fn run(config: ClientConfig, cancel: CancellationToken) -> Result<(), Status> { //! // Creating Client, Topic and Subscription... //! let client = Client::new(config).await.unwrap(); //! let subscription = client.subscription("test-subscription"); //! //! // Read the messages as a stream -//! // (needs futures_util::StreamExt as import) -//! // Note: This blocks the current thread but helps working with non clonable data -//! let mut stream = subscription.subscribe(None).await?; -//! while let Some(message) = stream.next().await { -//! // Handle data. -//! println!("Got Message: {:?}", message.message); -//! -//! // Ack or Nack message. -//! let _ = message.ack().await; +//! let mut stream = subscription.subscribe(None).await.unwrap(); +//! while let Some(message) = tokio::select! { +//! msg = stream.next() => msg, +//! _ = cancel.cancelled() => None +//! } { +//! message.ack().await.unwrap(); //! } +//! stream.dispose().await; +//! +//! // Wait for stream dispose //! Ok(()) //! } //! ```