Skip to content

Commit

Permalink
Merge pull request #346 from DoumanAsh/split_bq_client
Browse files Browse the repository at this point in the history
bigquery: Separate HTTP part of client to allow user to use HTTP API
  • Loading branch information
yoshidan authored Feb 1, 2025
2 parents adf80d3 + 2b24963 commit 8345316
Showing 1 changed file with 99 additions and 36 deletions.
135 changes: 99 additions & 36 deletions bigquery/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use google_cloud_token::{TokenSource, TokenSourceProvider};
use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;

Expand All @@ -29,17 +30,99 @@ use crate::query::{QueryOption, QueryResult};
use crate::storage;
use crate::{http, query};

#[cfg(feature = "auth")]
pub use google_cloud_auth;

const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"];

#[derive(Debug)]
pub struct ClientConfig {
http: Option<reqwest_middleware::ClientWithMiddleware>,
pub struct HttpClientConfig {
client: Option<reqwest_middleware::ClientWithMiddleware>,
bigquery_endpoint: Cow<'static, str>,
token_source_provider: Box<dyn TokenSourceProvider>,
debug: bool,
}

impl HttpClientConfig {
pub fn new_with_emulator(http_addr: impl Into<Cow<'static, str>>) -> Self {
Self {
client: None,
bigquery_endpoint: http_addr.into(),
token_source_provider: Box::new(EmptyTokenSourceProvider {}),
debug: false,
}
}

pub fn new(http_token_source_provider: Box<dyn TokenSourceProvider>) -> Self {
Self {
client: None,
bigquery_endpoint: "https://bigquery.googleapis.com".into(),
token_source_provider: http_token_source_provider,
debug: false,
}
}

pub fn with_debug(mut self, value: bool) -> Self {
self.debug = value;
self
}

pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
self.client = Some(value);
self
}

pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
self.bigquery_endpoint = value.into();
self
}

pub fn create_client(self) -> Arc<BigqueryClient> {
let ts = self.token_source_provider.token_source();
Arc::new(BigqueryClient::new(
ts,
self.bigquery_endpoint.as_ref(),
self.client
.unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()),
self.debug,
))
}
}

#[cfg(feature = "auth")]
impl HttpClientConfig {
fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> {
google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES)
}

///Creates new token provider for HTTP client
pub fn default_token_provider() -> impl Future<
Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
> + Send
+ 'static {
google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config())
}

///Creates new token provider for HTTP client with specified `credentials`
pub fn default_token_provider_with(
credentials: google_cloud_auth::credentials::CredentialsFile,
) -> impl Future<
Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
> + Send
+ 'static {
google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
HttpClientConfig::bigquery_http_auth_config(),
Box::new(credentials.clone()),
)
}
}

#[derive(Debug)]
pub struct ClientConfig {
http: HttpClientConfig,
environment: Environment,
streaming_read_config: ChannelConfig,
streaming_write_config: StreamingWriteConfig,
debug: bool,
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -129,13 +212,10 @@ impl TokenSourceProvider for EmptyTokenSourceProvider {
impl ClientConfig {
pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into<Cow<'static, str>>) -> Self {
Self {
http: None,
bigquery_endpoint: http_addr.into(),
token_source_provider: Box::new(EmptyTokenSourceProvider {}),
http: HttpClientConfig::new_with_emulator(http_addr),
environment: Environment::Emulator(grpc_host.to_string()),
streaming_read_config: ChannelConfig::default(),
streaming_write_config: StreamingWriteConfig::default(),
debug: false,
}
}

Expand All @@ -144,33 +224,35 @@ impl ClientConfig {
grpc_token_source_provider: Box<dyn TokenSourceProvider>,
) -> Self {
Self {
http: None,
bigquery_endpoint: "https://bigquery.googleapis.com".into(),
token_source_provider: http_token_source_provider,
http: HttpClientConfig::new(http_token_source_provider),
environment: Environment::GoogleCloud(grpc_token_source_provider),
streaming_read_config: ChannelConfig::default(),
streaming_write_config: StreamingWriteConfig::default(),
debug: false,
}
}

pub fn with_debug(mut self, value: bool) -> Self {
self.debug = value;
self.http.debug = value;
self
}

pub fn with_streaming_read_config(mut self, value: ChannelConfig) -> Self {
self.streaming_read_config = value;
self
}

pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self {
self.streaming_write_config = value;
self
}

pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
self.http = Some(value);
self.http.client = Some(value);
self
}

pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
self.bigquery_endpoint = value.into();
self.http.bigquery_endpoint = value.into();
self
}
}
Expand All @@ -180,15 +262,12 @@ use crate::http::job::list::ListJobsRequest;

use crate::grpc::apiv1::bigquery_client::StreamingReadClient;
use crate::storage_write::stream::{buffered, committed, default, pending};
#[cfg(feature = "auth")]
pub use google_cloud_auth;
use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;

#[cfg(feature = "auth")]
impl ClientConfig {
pub async fn new_with_auth() -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
let ts_http =
google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config()).await?;
let ts_http = HttpClientConfig::default_token_provider().await?;
let ts_grpc =
google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_grpc_auth_config()).await?;
let project_id = ts_grpc.project_id.clone();
Expand All @@ -199,11 +278,7 @@ impl ClientConfig {
pub async fn new_with_credentials(
credentials: google_cloud_auth::credentials::CredentialsFile,
) -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
let ts_http = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
Self::bigquery_http_auth_config(),
Box::new(credentials.clone()),
)
.await?;
let ts_http = HttpClientConfig::default_token_provider_with(credentials.clone()).await?;
let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
Self::bigquery_grpc_auth_config(),
Box::new(credentials),
Expand All @@ -214,10 +289,6 @@ impl ClientConfig {
Ok((config, project_id))
}

fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> {
google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES)
}

fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
google_cloud_auth::project::Config::default()
.with_audience(crate::grpc::apiv1::conn_pool::AUDIENCE)
Expand Down Expand Up @@ -258,15 +329,7 @@ pub struct Client {
impl Client {
/// New client
pub async fn new(config: ClientConfig) -> Result<Self, google_cloud_gax::conn::Error> {
let ts = config.token_source_provider.token_source();
let client = Arc::new(BigqueryClient::new(
ts,
config.bigquery_endpoint.into_owned().as_str(),
config
.http
.unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()),
config.debug,
));
let client = config.http.create_client();

Ok(Self {
dataset_client: BigqueryDatasetClient::new(client.clone()),
Expand Down

0 comments on commit 8345316

Please sign in to comment.