6464
6565class AsyncComputationResult :
6666 """Represents the result of an asynchronous computation."""
67-
6867 def __init__ (
6968 self ,
7069 future : Future ,
@@ -81,19 +80,15 @@ def __init__(
8180 self ._display_id = str (uuid .uuid4 ())
8281 self ._output_widget = widgets .Output () if IS_IPYTHON else None
8382 self ._cancel_button = (
84- widgets .Button (description = 'Cancel' ) if IS_IPYTHON else None
85- )
83+ widgets .Button (description = 'Cancel' ) if IS_IPYTHON else None )
8684 self ._progress_bar = (
8785 widgets .FloatProgress (
8886 value = 0.0 ,
8987 min = 0.0 ,
9088 max = 1.0 ,
9189 description = 'Running:' ,
9290 bar_style = 'info' ,
93- )
94- if IS_IPYTHON
95- else None
96- )
91+ ) if IS_IPYTHON else None )
9792 self ._cancel_requested = False
9893
9994 if IS_IPYTHON :
@@ -143,8 +138,7 @@ def update_display(self, msg: str, progress: Optional[float] = None):
143138 self ._cancel_button .disabled = False
144139
145140 def set_pipeline_result (
146- self , pipeline_result : beam .runners .runner .PipelineResult
147- ):
141+ self , pipeline_result : beam .runners .runner .PipelineResult ):
148142 self ._pipeline_result = pipeline_result
149143 if self ._cancel_requested :
150144 self .cancel ()
@@ -181,8 +175,7 @@ def _on_done(self, future: Future):
181175 else :
182176 _LOGGER .warning (
183177 'Async computation finished but state is not DONE:'
184- f" { res .state if res else 'Unknown' } "
185- )
178+ f" { res .state if res else 'Unknown' } " )
186179
187180 def cancel (self ):
188181 if self ._future .done ():
@@ -200,8 +193,7 @@ def cancel(self):
200193 if PipelineState .is_terminal (current_state ):
201194 self .update_display (
202195 'Cannot cancel: Pipeline already in terminal state'
203- f' { current_state } .'
204- )
196+ f' { current_state } .' )
205197 return False
206198
207199 self ._pipeline_result .cancel ()
@@ -221,8 +213,7 @@ def __repr__(self):
221213 return (
222214 f'<AsyncComputationResult({ self ._display_id } ) for'
223215 f' { len (self ._pcolls )} PCollections, status:'
224- f" { 'done' if self .done () else 'running' } >"
225- )
216+ f" { 'done' if self .done () else 'running' } >" )
226217
227218
228219class ElementStream :
@@ -475,7 +466,7 @@ def _execute_pipeline_fragment(
475466 })
476467
477468 fragment = pf .PipelineFragment (
478- list (pcolls_to_compute ), merged_options , runner = runner )
469+ list (pcolls_to_compute ), merged_options , runner = runner )
479470
480471 if async_result :
481472 async_result .update_display ('Building pipeline fragment...' , 0.1 )
@@ -508,12 +499,10 @@ def _run_async_computation(
508499 self ._env .mark_pcollection_computing (pcolls_to_compute )
509500 _LOGGER .info (
510501 'Starting asynchronous computation for'
511- f' { len (pcolls_to_compute )} PCollections.'
512- )
502+ f' { len (pcolls_to_compute )} PCollections.' )
513503
514504 pipeline_result = self ._execute_pipeline_fragment (
515- pcolls_to_compute , async_result , runner , options
516- )
505+ pcolls_to_compute , async_result , runner , options )
517506
518507 # if pipeline_result.state == PipelineState.DONE:
519508 # self._env.mark_pcollection_computed(pcolls_to_compute)
@@ -668,18 +657,19 @@ def compute_async(
668657 self ._env .evict_computed_pcollections (self .user_pipeline )
669658
670659 computed_pcolls = {
671- pcoll for pcoll in pcolls if pcoll in self ._env .computed_pcollections
660+ pcoll
661+ for pcoll in pcolls if pcoll in self ._env .computed_pcollections
672662 }
673663 computing_pcolls = {
674- pcoll for pcoll in pcolls if self ._env .is_pcollection_computing (pcoll )
664+ pcoll
665+ for pcoll in pcolls if self ._env .is_pcollection_computing (pcoll )
675666 }
676667 pcolls_to_compute = pcolls - computed_pcolls - computing_pcolls
677668
678669 if not pcolls_to_compute :
679670 _LOGGER .info (
680671 'All requested PCollections are already computed or are being'
681- ' computed.'
682- )
672+ ' computed.' )
683673 return None
684674
685675 self ._watch (list (pcolls_to_compute ))
@@ -691,36 +681,30 @@ def compute_async(
691681 if wait_for_inputs :
692682 if not self ._wait_for_dependencies (pcolls_to_compute ):
693683 raise RuntimeError (
694- 'Dependency computation failed or was cancelled.'
695- )
684+ 'Dependency computation failed or was cancelled.' )
696685 pipeline_result = self ._execute_pipeline_fragment (
697- pcolls_to_compute , None , runner , options
698- )
686+ pcolls_to_compute , None , runner , options )
699687 if pipeline_result .state == PipelineState .DONE :
700688 self ._env .mark_pcollection_computed (pcolls_to_compute )
701689 else :
702690 _LOGGER .error (
703- f'Blocking computation failed. State: { pipeline_result .state } '
704- )
691+ f'Blocking computation failed. State: { pipeline_result .state } ' )
705692 raise RuntimeError (
706- f'Blocking computation failed. State: { pipeline_result .state } '
707- )
693+ f'Blocking computation failed. State: { pipeline_result .state } ' )
708694 finally :
709695 self ._env .unmark_pcollection_computing (pcolls_to_compute )
710696 return None
711697
712698 else : # Asynchronous
713699 future = Future ()
714700 async_result = AsyncComputationResult (
715- future , pcolls_to_compute , self .user_pipeline , self
716- )
701+ future , pcolls_to_compute , self .user_pipeline , self )
717702 self ._async_computations [async_result ._display_id ] = async_result
718703
719704 def task ():
720705 try :
721706 result = self ._run_async_computation (
722- pcolls_to_compute , async_result , wait_for_inputs , runner , options
723- )
707+ pcolls_to_compute , async_result , wait_for_inputs , runner , options )
724708 future .set_result (result )
725709 except Exception as e :
726710 if not future .cancelled ():
@@ -744,8 +728,8 @@ def _get_pcoll_id_map(self):
744728 return {v : k for k , v in pcoll_to_id .items ()}
745729
746730 def _get_all_dependencies (
747- self , pcolls : Set [ beam . pvalue . PCollection ]
748- ) -> Set [beam .pvalue .PCollection ]:
731+ self ,
732+ pcolls : Set [ beam . pvalue . PCollection ] ) -> Set [beam .pvalue .PCollection ]:
749733 """Gets all upstream PCollection dependencies for the given set of PCollections."""
750734 if not self ._pipeline_graph :
751735 return set ()
@@ -759,8 +743,7 @@ def _get_all_dependencies(
759743
760744 target_pcoll_ids = {
761745 pcoll_to_id .get (str (pcoll ))
762- for pcoll in pcolls
763- if str (pcoll ) in pcoll_to_id
746+ for pcoll in pcolls if str (pcoll ) in pcoll_to_id
764747 }
765748
766749 if not target_pcoll_ids :
@@ -824,16 +807,13 @@ def _wait_for_dependencies(
824807
825808 if async_result :
826809 async_result .update_display (
827- f'Waiting for { len (computing_deps )} dependencies to finish...'
828- )
810+ f'Waiting for { len (computing_deps )} dependencies to finish...' )
829811 _LOGGER .info (
830812 f'Waiting for { len (computing_deps )} dependencies:'
831- f' { computing_deps .keys ()} '
832- )
813+ f' { computing_deps .keys ()} ' )
833814
834815 futures_to_wait = list (
835- set (comp ._future for comp in computing_deps .values ())
836- )
816+ set (comp ._future for comp in computing_deps .values ()))
837817
838818 try :
839819 for i , future in enumerate (futures_to_wait ):
@@ -901,8 +881,7 @@ def record(
901881 if not self ._wait_for_dependencies (uncomputed_pcolls ):
902882 raise RuntimeError (
903883 'Cannot record because a dependency failed to compute'
904- ' asynchronously.'
905- )
884+ ' asynchronously.' )
906885
907886 self ._clear ()
908887
0 commit comments