Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanner: set google-cloud-resource-prefix metadata #332

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spanner/src/apiv1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod tests {

use crate::apiv1::conn_pool::ConnectionManager;
use crate::apiv1::spanner_client::Client;
use crate::session::client_metadata;

const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";

Expand All @@ -31,7 +32,7 @@ mod tests {
)
.await
.unwrap();
cm.conn()
cm.conn().with_metadata(client_metadata(&DATABASE))
}

async fn create_session(client: &mut Client) -> Session {
Expand Down
186 changes: 90 additions & 96 deletions spanner/src/apiv1/spanner_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::time::Duration;

use google_cloud_gax::conn::Channel;
use google_cloud_gax::create_request;
use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
use google_cloud_gax::retry::{invoke_fn, RetrySetting};
use google_cloud_gax::{create_request, grpc};
use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
use google_cloud_googleapis::spanner::v1::{
BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
Expand Down Expand Up @@ -43,6 +44,7 @@ fn default_setting() -> RetrySetting {
#[derive(Clone)]
pub struct Client {
inner: SpannerClient<Channel>,
metadata: MetadataMap,
}

impl Client {
Expand All @@ -51,6 +53,15 @@ impl Client {
// https://github.com/googleapis/google-cloud-go/blob/65a9ba55ed3777f520bd881d891e8917323549a5/spanner/apiv1/spanner_client.go#L73
Client {
inner: inner.max_decoding_message_size(i32::MAX as usize),
metadata: Default::default(),
}
}

/// set metadata for spanner client
pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
Client {
inner: self.inner,
metadata,
}
}

Expand Down Expand Up @@ -83,14 +94,11 @@ impl Client {
let database = &req.database;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("database={database}"), req.clone());
spanner_client
.create_session(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("database={database}"), req.clone());
this.inner.create_session(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -109,14 +117,11 @@ impl Client {
let database = &req.database;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("database={database}"), req.clone());
spanner_client
.batch_create_sessions(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("database={database}"), req.clone());
this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -133,14 +138,11 @@ impl Client {
let name = &req.name;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("name={name}"), req.clone());
spanner_client
.get_session(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("name={name}"), req.clone());
this.inner.get_session(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -156,14 +158,11 @@ impl Client {
let database = &req.database;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("database={database}"), req.clone());
spanner_client
.list_sessions(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("database={database}"), req.clone());
this.inner.list_sessions(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -181,14 +180,11 @@ impl Client {
let name = &req.name;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("name={name}"), req.clone());
spanner_client
.delete_session(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("name={name}"), req.clone());
this.inner.delete_session(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -214,14 +210,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.execute_sql(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.execute_sql(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -241,14 +234,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.execute_streaming_sql(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -274,25 +264,25 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
let result = spanner_client.execute_batch_dml(request).await;
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
let result = this.inner.execute_batch_dml(request).await;
match result {
Ok(response) => match response.get_ref().status.as_ref() {
Some(s) => {
let code = Code::from(s.code);
if code == Code::Ok {
Ok(response)
} else {
Err((Status::new(code, s.message.to_string()), spanner_client))
Err((Status::new(code, s.message.to_string()), this))
}
}
None => Ok(response),
},
Err(err) => Err((err, spanner_client)),
Err(err) => Err((err, this)),
}
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -316,11 +306,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client.read(request).await.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.read(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -340,14 +330,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.streaming_read(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.streaming_read(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -366,14 +353,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.begin_transaction(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.begin_transaction(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand Down Expand Up @@ -402,11 +386,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client.commit(request).await.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.commit(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -429,11 +413,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client.rollback(request).await.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.rollback(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand All @@ -459,14 +443,11 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.partition_query(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.partition_query(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}
Expand Down Expand Up @@ -494,15 +475,28 @@ impl Client {
let session = &req.session;
invoke_fn(
Some(setting),
|spanner_client| async {
let request = create_request(format!("session={session}"), req.clone());
spanner_client
.partition_read(request)
.await
.map_err(|e| (e, spanner_client))
|this| async {
let request = this.create_request(format!("session={session}"), req.clone());
this.inner.partition_read(request).await.map_err(|e| (e, this))
},
&mut self.inner,
self,
)
.await
}

fn create_request<T>(&self, param_string: String, into_request: impl grpc::IntoRequest<T>) -> grpc::Request<T> {
let mut req = create_request(param_string, into_request);
let target = req.metadata_mut();
for entry in self.metadata.iter() {
match entry {
KeyAndValueRef::Ascii(k, v) => {
target.append(k, v.clone());
}
KeyAndValueRef::Binary(k, v) => {
target.append_bin(k, v.clone());
}
}
}
req
}
}
Loading
Loading