Skip to content

Commit

Permalink
tweak doc
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshidan committed May 25, 2024
1 parent ef051dc commit f4b6c62
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 31 deletions.
37 changes: 18 additions & 19 deletions pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,30 @@ async fn run(config: ClientConfig) -> Result<(), Status> {

### Subscribe Message (Alternative Way)

```no_run
```rust
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use google_cloud_gax::grpc::Status;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use futures_util::StreamExt;

async fn run(config: ClientConfig) -> Result<(), Status> {
// Creating Client, Topic and Subscription...
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");
// Read the messages as a stream
// (needs futures_util::StreamExt as import)
// Note: This blocks the current thread but helps working with non clonable data
let mut stream = subscription.subscribe(None).await?;
while let Some(message) = stream.next().await {
// Handle data.
println!("Got Message: {:?}", message.message);
// Ack or Nack message.
let _ = message.ack().await;
}
Ok(())
async fn run(config: ClientConfig, cancel: CancellationToken) -> Result<(), Status> {
// Creating Client, Topic and Subscription...
let client = Client::new(config).await.unwrap();
let subscription = client.subscription("test-subscription");

// Read the messages as a stream
let mut stream = subscription.subscribe(None).await.unwrap();
while let Some(message) = tokio::select! {
msg = stream.next() => msg,
_ = cancel.cancelled() => None
} {
message.ack().await.unwrap();
}
stream.dispose().await;

// Wait for stream dispose
Ok(())
}
```
23 changes: 11 additions & 12 deletions pubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,30 @@
//!
//! ### Subscribe Message (Alternative Way)
//!
//! ```no_run
//! ```
//! use google_cloud_pubsub::client::{Client, ClientConfig};
//! use google_cloud_googleapis::pubsub::v1::PubsubMessage;
//! use google_cloud_pubsub::subscription::SubscriptionConfig;
//! use google_cloud_gax::grpc::Status;
//! use std::time::Duration;
//! use tokio_util::sync::CancellationToken;
//! use futures_util::StreamExt;
//!
//! async fn run(config: ClientConfig) -> Result<(), Status> {
//! async fn run(config: ClientConfig, cancel: CancellationToken) -> Result<(), Status> {
//! // Creating Client, Topic and Subscription...
//! let client = Client::new(config).await.unwrap();
//! let subscription = client.subscription("test-subscription");
//!
//! // Read the messages as a stream
//! // (needs futures_util::StreamExt as import)
//! // Note: This blocks the current thread but helps working with non clonable data
//! let mut stream = subscription.subscribe(None).await?;
//! while let Some(message) = stream.next().await {
//! // Handle data.
//! println!("Got Message: {:?}", message.message);
//!
//! // Ack or Nack message.
//! let _ = message.ack().await;
//! let mut stream = subscription.subscribe(None).await.unwrap();
//! while let Some(message) = tokio::select! {
//! msg = stream.next() => msg,
//! _ = cancel.cancelled() => None
//! } {
//! message.ack().await.unwrap();
//! }
//! stream.dispose().await;
//!
//! // Wait for stream dispose
//! Ok(())
//! }
//! ```
Expand Down

0 comments on commit f4b6c62

Please sign in to comment.