diff --git a/bigquery/src/client.rs b/bigquery/src/client.rs index c1ab35f5..b24f492f 100644 --- a/bigquery/src/client.rs +++ b/bigquery/src/client.rs @@ -9,6 +9,7 @@ use google_cloud_token::{TokenSource, TokenSourceProvider}; use std::borrow::Cow; use std::collections::VecDeque; use std::fmt::Debug; +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; @@ -29,17 +30,99 @@ use crate::query::{QueryOption, QueryResult}; use crate::storage; use crate::{http, query}; +#[cfg(feature = "auth")] +pub use google_cloud_auth; + const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"]; #[derive(Debug)] -pub struct ClientConfig { - http: Option, +pub struct HttpClientConfig { + client: Option, bigquery_endpoint: Cow<'static, str>, token_source_provider: Box, + debug: bool, +} + +impl HttpClientConfig { + pub fn new_with_emulator(http_addr: impl Into>) -> Self { + Self { + client: None, + bigquery_endpoint: http_addr.into(), + token_source_provider: Box::new(EmptyTokenSourceProvider {}), + debug: false, + } + } + + pub fn new(http_token_source_provider: Box) -> Self { + Self { + client: None, + bigquery_endpoint: "https://bigquery.googleapis.com".into(), + token_source_provider: http_token_source_provider, + debug: false, + } + } + + pub fn with_debug(mut self, value: bool) -> Self { + self.debug = value; + self + } + + pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self { + self.client = Some(value); + self + } + + pub fn with_endpoint(mut self, value: impl Into>) -> Self { + self.bigquery_endpoint = value.into(); + self + } + + pub fn create_client(self) -> Arc { + let ts = self.token_source_provider.token_source(); + Arc::new(BigqueryClient::new( + ts, + self.bigquery_endpoint.as_ref(), + self.client + .unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()), + self.debug, + )) + } +} + +#[cfg(feature = "auth")] +impl HttpClientConfig { + fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> { + google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES) + } + + ///Creates new token provider for HTTP client + pub fn default_token_provider() -> impl Future< + Output = Result, + > + Send + + 'static { + google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config()) + } + + ///Creates new token provider for HTTP client with specified `credentials` + pub fn default_token_provider_with( + credentials: google_cloud_auth::credentials::CredentialsFile, + ) -> impl Future< + Output = Result, + > + Send + + 'static { + google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( + HttpClientConfig::bigquery_http_auth_config(), + Box::new(credentials.clone()), + ) + } +} + +#[derive(Debug)] +pub struct ClientConfig { + http: HttpClientConfig, environment: Environment, streaming_read_config: ChannelConfig, streaming_write_config: StreamingWriteConfig, - debug: bool, } #[derive(Clone, Debug, Default)] @@ -129,13 +212,10 @@ impl TokenSourceProvider for EmptyTokenSourceProvider { impl ClientConfig { pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into>) -> Self { Self { - http: None, - bigquery_endpoint: http_addr.into(), - token_source_provider: Box::new(EmptyTokenSourceProvider {}), + http: HttpClientConfig::new_with_emulator(http_addr), environment: Environment::Emulator(grpc_host.to_string()), streaming_read_config: ChannelConfig::default(), streaming_write_config: StreamingWriteConfig::default(), - debug: false, } } @@ -144,33 +224,35 @@ impl ClientConfig { grpc_token_source_provider: Box, ) -> Self { Self { - http: None, - bigquery_endpoint: "https://bigquery.googleapis.com".into(), - token_source_provider: http_token_source_provider, + http: HttpClientConfig::new(http_token_source_provider), environment: Environment::GoogleCloud(grpc_token_source_provider), streaming_read_config: ChannelConfig::default(), streaming_write_config: StreamingWriteConfig::default(), - debug: false, } } + pub fn with_debug(mut self, value: bool) -> Self { - self.debug = value; + self.http.debug = value; self } + pub fn with_streaming_read_config(mut self, value: ChannelConfig) -> Self { self.streaming_read_config = value; self } + pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self { self.streaming_write_config = value; self } + pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self { - self.http = Some(value); + self.http.client = Some(value); self } + pub fn with_endpoint(mut self, value: impl Into>) -> Self { - self.bigquery_endpoint = value.into(); + self.http.bigquery_endpoint = value.into(); self } } @@ -180,15 +262,12 @@ use crate::http::job::list::ListJobsRequest; use crate::grpc::apiv1::bigquery_client::StreamingReadClient; use crate::storage_write::stream::{buffered, committed, default, pending}; -#[cfg(feature = "auth")] -pub use google_cloud_auth; use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient; #[cfg(feature = "auth")] impl ClientConfig { pub async fn new_with_auth() -> Result<(Self, Option), google_cloud_auth::error::Error> { - let ts_http = - google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config()).await?; + let ts_http = HttpClientConfig::default_token_provider().await?; let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_grpc_auth_config()).await?; let project_id = ts_grpc.project_id.clone(); @@ -199,11 +278,7 @@ impl ClientConfig { pub async fn new_with_credentials( credentials: google_cloud_auth::credentials::CredentialsFile, ) -> Result<(Self, Option), google_cloud_auth::error::Error> { - let ts_http = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( - Self::bigquery_http_auth_config(), - Box::new(credentials.clone()), - ) - .await?; + let ts_http = HttpClientConfig::default_token_provider_with(credentials.clone()).await?; let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials( Self::bigquery_grpc_auth_config(), Box::new(credentials), @@ -214,10 +289,6 @@ impl ClientConfig { Ok((config, project_id)) } - fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> { - google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES) - } - fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> { google_cloud_auth::project::Config::default() .with_audience(crate::grpc::apiv1::conn_pool::AUDIENCE) @@ -258,15 +329,7 @@ pub struct Client { impl Client { /// New client pub async fn new(config: ClientConfig) -> Result { - let ts = config.token_source_provider.token_source(); - let client = Arc::new(BigqueryClient::new( - ts, - config.bigquery_endpoint.into_owned().as_str(), - config - .http - .unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()), - config.debug, - )); + let client = config.http.create_client(); Ok(Self { dataset_client: BigqueryDatasetClient::new(client.clone()),