Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates unmaintained tokio-retry to tokio-retry2 #311

Merged
merged 1 commit into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion foundation/gax/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ thiserror = "1.0"
tower = { version = "0.4", features = ["filter"] }
http = "1.1"
google-cloud-token = { version = "0.1.2", path = "../token" }
tokio-retry = "0.3"
tokio-retry2 = "0.5.3"
17 changes: 11 additions & 6 deletions foundation/gax/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::future::Future;
use std::iter::Take;
use std::time::Duration;

pub use tokio_retry::strategy::ExponentialBackoff;
pub use tokio_retry::Condition;
use tokio_retry::{Action, RetryIf};
pub use tokio_retry2::strategy::ExponentialBackoff;
use tokio_retry2::{Action, RetryIf};
pub use tokio_retry2::{Condition, MapErr};

use crate::grpc::{Code, Status};

Expand All @@ -21,6 +21,7 @@ impl TryAs<Status> for Status {
pub trait Retry<E: TryAs<Status>, T: Condition<E>> {
fn strategy(&self) -> Take<ExponentialBackoff>;
fn condition(&self) -> T;
fn notify(error: &E, duration: Duration);
}

pub struct CodeCondition {
Expand Down Expand Up @@ -70,6 +71,10 @@ impl Retry<Status, CodeCondition> for RetrySetting {
fn condition(&self) -> CodeCondition {
CodeCondition::new(self.codes.clone())
}

fn notify(_error: &Status, _duration: Duration) {
tracing::trace!("retry fn");
}
}

impl Default for RetrySetting {
Expand All @@ -92,7 +97,7 @@ where
RT: Retry<E, C> + Default,
{
let retry = retry.unwrap_or_default();
RetryIf::spawn(retry.strategy(), action, retry.condition()).await
RetryIf::spawn(retry.strategy(), action, retry.condition(), RT::notify).await
}
/// Repeats retries when the specified error is detected.
/// The argument specified by 'v' can be reused for each retry.
Expand All @@ -117,7 +122,6 @@ where
if retry.condition().should_retry(&status) {
let duration = strategy.next().ok_or(status)?;
tokio::time::sleep(duration).await;
tracing::trace!("retry fn");
} else {
return Err(status);
}
Expand All @@ -128,6 +132,7 @@ where
mod tests {
use std::sync::{Arc, Mutex};

use tokio_retry2::MapErr;
use tonic::{Code, Status};

use crate::retry::{invoke, RetrySetting};
Expand All @@ -140,7 +145,7 @@ mod tests {
let mut lock = counter.lock().unwrap();
*lock += 1;
let result: Result<i32, Status> = Err(Status::new(Code::Aborted, "error"));
result
result.map_transient_err()
};
let actual = invoke(Some(retry), action).await.unwrap_err();
let expected = Status::new(Code::Aborted, "error");
Expand Down
10 changes: 5 additions & 5 deletions foundation/longrunning/src/autogen/operations_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tonic::Response;
use google_cloud_gax::conn::{Channel, Error};
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, RetrySetting};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::longrunning::operations_client::OperationsClient as InternalOperationsClient;
use google_cloud_googleapis::longrunning::{
CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation, WaitOperationRequest,
Expand Down Expand Up @@ -44,7 +44,7 @@ impl OperationsClient {
let name = &req.name;
let action = || async {
let request = create_request(format!("name={name}"), req.clone());
self.inner.clone().get_operation(request).await
self.inner.clone().get_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -62,7 +62,7 @@ impl OperationsClient {
let name = &req.name;
let action = || async {
let request = create_request(format!("name={name}"), req.clone());
self.inner.clone().delete_operation(request).await
self.inner.clone().delete_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -86,7 +86,7 @@ impl OperationsClient {
let name = &req.name;
let action = || async {
let request = create_request(format!("name={name}"), req.clone());
self.inner.clone().cancel_operation(request).await
self.inner.clone().cancel_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -108,7 +108,7 @@ impl OperationsClient {
let setting = retry.unwrap_or_else(default_retry_setting);
let action = || async {
let request = create_request("".to_string(), req.clone());
self.inner.clone().wait_operation(request).await
self.inner.clone().wait_operation(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand Down
48 changes: 30 additions & 18 deletions kms/src/grpc/apiv1/kms_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::grpc::apiv1::conn_pool::ConnectionManager;

use google_cloud_gax::create_request;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, RetrySetting};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};

use crate::grpc::kms::v1::CreateCryptoKeyVersionRequest;
use crate::grpc::kms::v1::CreateKeyRingRequest;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Client {
) -> Result<GenerateRandomBytesResponse, Status> {
let action = || async {
let request = create_request(format!("location={}", req.location), req.clone());
self.cm.conn().generate_random_bytes(request).await
self.cm.conn().generate_random_bytes(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -81,7 +81,7 @@ impl Client {
) -> Result<CryptoKey, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().create_crypto_key(request).await
self.cm.conn().create_crypto_key(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -100,7 +100,11 @@ impl Client {
) -> Result<CryptoKeyVersion, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().create_crypto_key_version(request).await
self.cm
.conn()
.create_crypto_key_version(request)
.await
.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -119,7 +123,7 @@ impl Client {
) -> Result<KeyRing, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().create_key_ring(request).await
self.cm.conn().create_key_ring(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -138,7 +142,11 @@ impl Client {
) -> Result<CryptoKeyVersion, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().destroy_crypto_key_version(request).await
self.cm
.conn()
.destroy_crypto_key_version(request)
.await
.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -157,7 +165,7 @@ impl Client {
) -> Result<CryptoKey, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_crypto_key(request).await
self.cm.conn().get_crypto_key(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -176,7 +184,7 @@ impl Client {
) -> Result<CryptoKeyVersion, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_crypto_key_version(request).await
self.cm.conn().get_crypto_key_version(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -191,7 +199,7 @@ impl Client {
pub async fn get_key_ring(&self, req: GetKeyRingRequest, retry: Option<RetrySetting>) -> Result<KeyRing, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_key_ring(request).await
self.cm.conn().get_key_ring(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -210,7 +218,11 @@ impl Client {
) -> Result<ListCryptoKeyVersionsResponse, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().list_crypto_key_versions(request).await
self.cm
.conn()
.list_crypto_key_versions(request)
.await
.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -229,7 +241,7 @@ impl Client {
) -> Result<ListCryptoKeysResponse, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().list_crypto_keys(request).await
self.cm.conn().list_crypto_keys(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -248,7 +260,7 @@ impl Client {
) -> Result<ListKeyRingsResponse, Status> {
let action = || async {
let request = create_request(format!("parent={}", req.parent), req.clone());
self.cm.conn().list_key_rings(request).await
self.cm.conn().list_key_rings(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -263,7 +275,7 @@ impl Client {
pub async fn encrypt(&self, req: EncryptRequest, retry: Option<RetrySetting>) -> Result<EncryptResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().encrypt(request).await
self.cm.conn().encrypt(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -278,7 +290,7 @@ impl Client {
pub async fn decrypt(&self, req: DecryptRequest, retry: Option<RetrySetting>) -> Result<DecryptResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().decrypt(request).await
self.cm.conn().decrypt(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -297,7 +309,7 @@ impl Client {
) -> Result<AsymmetricSignResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().asymmetric_sign(request).await
self.cm.conn().asymmetric_sign(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -312,7 +324,7 @@ impl Client {
pub async fn mac_sign(&self, req: MacSignRequest, retry: Option<RetrySetting>) -> Result<MacSignResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().mac_sign(request).await
self.cm.conn().mac_sign(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -331,7 +343,7 @@ impl Client {
) -> Result<MacVerifyResponse, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().mac_verify(request).await
self.cm.conn().mac_verify(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand All @@ -350,7 +362,7 @@ impl Client {
) -> Result<PublicKey, Status> {
let action = || async {
let request = create_request(format!("name={}", req.name), req.clone());
self.cm.conn().get_public_key(request).await
self.cm.conn().get_public_key(request).await.map_transient_err()
};
invoke(Some(retry.unwrap_or_else(default_setting)), action)
.await
Expand Down
32 changes: 22 additions & 10 deletions pubsub/src/apiv1/publisher_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::Response;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke, RetrySetting};
use google_cloud_gax::retry::{invoke, MapErr, RetrySetting};
use google_cloud_googleapis::pubsub::v1::publisher_client::PublisherClient as InternalPublisherClient;
use google_cloud_googleapis::pubsub::v1::{
DeleteTopicRequest, DetachSubscriptionRequest, DetachSubscriptionResponse, GetTopicRequest,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.create_topic(request).await
client.create_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand All @@ -61,7 +61,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.update_topic(request).await
client.update_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand Down Expand Up @@ -92,7 +92,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("name={name}"), req.clone());
client.publish(request).await
client.publish(request).await.map_transient_err()
};
invoke(Some(setting), action).await
}
Expand All @@ -108,7 +108,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.get_topic(request).await
client.get_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand All @@ -127,7 +127,11 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("project={project}"), req.clone());
client.list_topics(request).await.map(|d| d.into_inner())
client
.list_topics(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.topics.into_iter());
Expand All @@ -152,7 +156,11 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.list_topic_subscriptions(request).await.map(|d| d.into_inner())
client
.list_topic_subscriptions(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.subscriptions.into_iter());
Expand Down Expand Up @@ -181,7 +189,11 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.list_topic_snapshots(request).await.map(|d| d.into_inner())
client
.list_topic_snapshots(request)
.await
.map(|d| d.into_inner())
.map_transient_err()
};
let response = invoke(retry.clone(), action).await?;
all.extend(response.snapshots.into_iter());
Expand All @@ -207,7 +219,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("topic={topic}"), req.clone());
client.delete_topic(request).await
client.delete_topic(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand All @@ -226,7 +238,7 @@ impl PublisherClient {
let action = || async {
let mut client = self.client();
let request = create_request(format!("subscription={subscription}"), req.clone());
client.detach_subscription(request).await
client.detach_subscription(request).await.map_transient_err()
};
invoke(retry, action).await
}
Expand Down
Loading
Loading