diff --git a/google/auth/environment_vars.py b/google/auth/environment_vars.py index ca041ca16..c7d706467 100644 --- a/google/auth/environment_vars.py +++ b/google/auth/environment_vars.py @@ -113,6 +113,18 @@ """Environment variable defining the location of Google API certificate config file.""" +CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE = ( + "CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE" +) +"""Environment variable controlling whether to use client certificate or not. +This variable is the fallback of GOOGLE_API_USE_CLIENT_CERTIFICATE.""" + +CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH = ( + "CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH" +) +"""Environment variable defining the location of Google API certificate config +file. This variable is the fallback of GOOGLE_API_CERTIFICATE_CONFIG.""" + GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES = ( "GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES" ) diff --git a/google/auth/iam.py b/google/auth/iam.py index 22684a9d1..b9ab0bacc 100644 --- a/google/auth/iam.py +++ b/google/auth/iam.py @@ -22,14 +22,13 @@ import base64 import http.client as http_client import json -import os from google.auth import _exponential_backoff from google.auth import _helpers from google.auth import credentials from google.auth import crypt from google.auth import exceptions -from google.auth.transport import mtls +from google.auth.transport import _mtls_helper IAM_RETRY_CODES = { http_client.INTERNAL_SERVER_ERROR, @@ -40,20 +39,9 @@ _IAM_SCOPE = ["https://www.googleapis.com/auth/iam"] -# 1. Determine if we should use mTLS. -# Note: We only support automatic mTLS on the default googleapis.com universe. -if hasattr(mtls, "should_use_client_cert"): - use_client_cert = mtls.should_use_client_cert() -else: # pragma: NO COVER - # if unsupported, fallback to reading from env var - use_client_cert = ( - os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false").lower() == "true" - ) - -# 2. Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant. -# This ensures that the .replace() calls in the classes will work correctly. -if use_client_cert: - # We use the .mtls. prefix only for the default universe template +# Determine if we should use mTLS. +if hasattr(_mtls_helper, "check_use_client_cert") and _mtls_helper.check_use_client_cert(): + # Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant. _IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}" else: _IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}" diff --git a/google/auth/transport/_mtls_helper.py b/google/auth/transport/_mtls_helper.py index 623b435ee..d5ccbdc4e 100644 --- a/google/auth/transport/_mtls_helper.py +++ b/google/auth/transport/_mtls_helper.py @@ -151,7 +151,14 @@ def _get_cert_config_path(certificate_config_path=None): if env_path is not None and env_path != "": certificate_config_path = env_path else: - certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH + env_path = environ.get( + environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH, + None, + ) + if env_path is not None and env_path != "": + certificate_config_path = env_path + else: + certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH certificate_config_path = path.expanduser(certificate_config_path) if not path.exists(certificate_config_path): @@ -452,13 +459,23 @@ def check_use_client_cert(): Returns: bool: Whether the client certificate should be used for mTLS connection. """ - use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") + use_client_cert = getenv(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE) + if use_client_cert is None: + use_client_cert = getenv( + environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE + ) + # Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set. if use_client_cert: return use_client_cert.lower() == "true" else: # Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set. - cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG") + cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG) + if cert_path is None: + cert_path = getenv( + environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH + ) + if cert_path: try: with open(cert_path, "r") as f: diff --git a/tests/transport/test_mtls_env_vars.py b/tests/transport/test_mtls_env_vars.py new file mode 100644 index 000000000..3f62a3a51 --- /dev/null +++ b/tests/transport/test_mtls_env_vars.py @@ -0,0 +1,97 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest import mock +import pytest +from google.auth.transport import _mtls_helper +from google.auth import environment_vars + +class TestEnvVarsPrecedence: + def test_use_client_cert_precedence(self): + # GOOGLE_API_USE_CLIENT_CERTIFICATE takes precedence + with mock.patch.dict(os.environ, { + environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true", + environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false" + }): + assert _mtls_helper.check_use_client_cert() is True + + with mock.patch.dict(os.environ, { + environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "false", + environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true" + }): + assert _mtls_helper.check_use_client_cert() is False + + def test_use_client_cert_fallback(self): + # Fallback to CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE if GOOGLE_API_USE_CLIENT_CERTIFICATE is unset + with mock.patch.dict(os.environ, { + environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true" + }): + # Ensure GOOGLE_API_USE_CLIENT_CERTIFICATE is not set + if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ: + del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE] + assert _mtls_helper.check_use_client_cert() is True + + with mock.patch.dict(os.environ, { + environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false" + }): + if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ: + del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE] + assert _mtls_helper.check_use_client_cert() is False + + def test_cert_config_path_precedence(self): + # GOOGLE_API_CERTIFICATE_CONFIG takes precedence + google_path = "/path/to/google/config" + cloudsdk_path = "/path/to/cloudsdk/config" + + with mock.patch.dict(os.environ, { + environment_vars.GOOGLE_API_CERTIFICATE_CONFIG: google_path, + environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path + }): + with mock.patch("os.path.exists", return_value=True): + assert _mtls_helper._get_cert_config_path() == google_path + + def test_cert_config_path_fallback(self): + # Fallback to CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH if GOOGLE_API_CERTIFICATE_CONFIG is unset + cloudsdk_path = "/path/to/cloudsdk/config" + + with mock.patch.dict(os.environ, { + environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path + }): + if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ: + del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG] + + with mock.patch("os.path.exists", return_value=True): + assert _mtls_helper._get_cert_config_path() == cloudsdk_path + + @mock.patch("builtins.open", autospec=True) + def test_check_use_client_cert_config_fallback(self, mock_file): + # Test fallback for config file when determining if client cert should be used + cloudsdk_path = "/path/to/cloudsdk/config" + + mock_file.side_effect = mock.mock_open( + read_data='{"cert_configs": {"workload": "exists"}}' + ) + + with mock.patch.dict(os.environ, { + environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path + }): + if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ: + del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG] + if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ: + del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE] + if environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE in os.environ: + del os.environ[environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE] + + assert _mtls_helper.check_use_client_cert() is True