Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* (Python) Some Python dependencies have been split out into extras. To ensure all previously installed dependencies are installed, when installing Beam you can `pip install apache-beam[gcp,interactive,yaml,redis,hadoop,tfrecord]`, though most users will not need all of these extras ([#34554](https://github.com/apache/beam/issues/34554)).

## Deprecations

Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/io/hadoopfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import re
from typing import BinaryIO # pylint: disable=unused-import

import hdfs

from apache_beam.io import filesystemio
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
Expand All @@ -37,6 +35,11 @@
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.options.pipeline_options import PipelineOptions

try:
import hdfs
except ImportError:
hdfs = None

__all__ = ['HadoopFileSystem']

_HDFS_PREFIX = 'hdfs:/'
Expand Down Expand Up @@ -108,6 +111,10 @@ def __init__(self, pipeline_options):
See :class:`~apache_beam.options.pipeline_options.HadoopFileSystemOptions`.
"""
super().__init__(pipeline_options)
if hdfs is None:
raise ImportError(
'Failed to import hdfs. You can ensure it is '
'installed by installing the hadoop beam extra')
logging.getLogger('hdfs.client').setLevel(logging.WARN)
if pipeline_options is None:
raise ValueError('pipeline_options is not set')
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.options.pipeline_options import PipelineOptions

try:
import hdfs as actual_hdfs
except ImportError:
actual_hdfs = None


class FakeFile(io.BytesIO):
"""File object for FakeHdfs"""
Expand Down Expand Up @@ -201,6 +206,7 @@ def checksum(self, path):


@parameterized_class(('full_urls', ), [(False, ), (True, )])
@unittest.skipIf(actual_hdfs is None, "hdfs extra not installed")
class HadoopFileSystemTest(unittest.TestCase):
def setUp(self):
self._fake_hdfs = FakeHdfs()
Expand Down Expand Up @@ -607,6 +613,7 @@ def test_delete_error(self):
self.assertFalse(self.fs.exists(url2))


@unittest.skipIf(actual_hdfs is None, "hdfs extra not installed")
class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase):
"""Tests pipeline_options, in the form of a
RuntimeValueProvider.runtime_options object."""
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ def get_portability_package_data():
# TODO(https://github.com/grpc/grpc/issues/37710): Unpin grpc
'grpcio>=1.33.1,<2,!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1,<1.66.0; python_version <= "3.12"', # pylint: disable=line-too-long
'grpcio>=1.67.0; python_version >= "3.13"',
'hdfs>=2.1.0,<3.0.0',
'httplib2>=0.8,<0.23.0',
'jsonpickle>=3.0.0,<4.0.0',
# numpy can have breaking changes in minor versions.
Expand Down Expand Up @@ -564,6 +563,7 @@ def get_portability_package_data():
# `--update` / `-U` flag to replace the dask release brought in
# by distributed.
],
'hadoop': ['hdfs>=2.1.0,<3.0.0'],
'yaml': [
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.2',
Expand Down
7 changes: 4 additions & 3 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pip_pre = True
# allow apps that support color to use it.
passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_*,ALLOYDB_PASSWORD
# Set [] options for pip installation of apache-beam tarball.
extras = test,dataframe,tfrecord,yaml
extras = test,dataframe,hdfs,tfrecord,yaml
# Don't warn that these commands aren't installed.
allowlist_externals =
false
Expand Down Expand Up @@ -98,7 +98,7 @@ list_dependencies_command = {envbindir}/python.exe {envbindir}/pip.exe freeze

[testenv:py{310,311,312,313}-cloud]
; extras = test,gcp,interactive,dataframe,aws,azure
extras = test,gcp,interactive,dataframe,aws,azure
extras = test,hdfs,gcp,interactive,dataframe,aws,azure
commands =
python apache_beam/examples/complete/autocomplete_test.py
bash {toxinidir}/scripts/run_pytest.sh {envname} "{posargs}"
Expand Down Expand Up @@ -173,7 +173,7 @@ setenv =
TC_SLEEP_TIME = {env:TC_SLEEP_TIME:1}

# NOTE: we could add ml_test to increase the collected code coverage metrics, but it would make the suite slower.
extras = test,gcp,interactive,dataframe,aws
extras = test,hdfs,gcp,interactive,dataframe,aws
commands =
bash {toxinidir}/scripts/run_pytest.sh {envname} "{posargs}" "--cov-report=xml --cov=. --cov-append"

Expand Down Expand Up @@ -228,6 +228,7 @@ deps =
holdup==1.8.0
extras =
gcp
hdfs
allowlist_externals =
bash
echo
Expand Down
Loading