77import time
88import weakref
99from concurrent .futures import CancelledError , TimeoutError
10+ from itertools import islice
1011from unittest import mock
1112
1213import pytest
@@ -153,6 +154,9 @@ def test_map(executor):
153154 results = list (executor .map (lambda x : x + 1 , range (10 )))
154155 assert results == [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]
155156
157+ results = list (executor .map (lambda x , y : x + y , range (10 ), range (9 )))
158+ assert results == [0 , 2 , 4 , 6 , 8 , 10 , 12 , 14 , 16 ]
159+
156160
157161def test_map_timeout (executor ):
158162 """Test that map with timeout raises TimeoutError and cancels futures"""
@@ -178,6 +182,56 @@ def func(x):
178182 assert set (results ) != {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 }
179183
180184
185+ def test_map_error (executor ):
186+ """Test that map with an exception will raise, and remaining tasks are cancelled"""
187+ results = []
188+
189+ def func (x ):
190+ nonlocal results
191+ time .sleep (0.05 )
192+ if len (results ) == 5 :
193+ raise ValueError ("Test error" )
194+ results .append (x )
195+ return x
196+
197+ with pytest .raises (ValueError ):
198+ list (executor .map (func , range (15 )))
199+
200+ executor .shutdown (wait = True , cancel_futures = False )
201+ assert len (results ) <= 10 , "Final 5 at least should have been cancelled"
202+
203+
204+ @pytest .mark .parametrize ("cancel" , [True , False ])
205+ def test_map_shutdown (executor , cancel ):
206+ results = []
207+
208+ def func (x ):
209+ nonlocal results
210+ time .sleep (0.05 )
211+ results .append (x )
212+ return x
213+
214+ # Get the first few results.
215+ # Keep the iterator alive so that it isn't closed when its reference is dropped.
216+ m = executor .map (func , range (15 ))
217+ values = list (islice (m , 5 ))
218+ assert values == [0 , 1 , 2 , 3 , 4 ]
219+
220+ executor .shutdown (wait = True , cancel_futures = cancel )
221+ if cancel :
222+ assert len (results ) < 15 , "Some tasks should have been cancelled"
223+ else :
224+ assert len (results ) == 15 , "All tasks should have been completed"
225+
226+
227+ def test_map_start (executor ):
228+ """Test that map starts tasks immediately, before iterating"""
229+ e = threading .Event ()
230+ m = executor .map (lambda x : (e .set (), x ), range (1 ))
231+ e .wait (timeout = 0.1 )
232+ assert list (m ) == [(None , 0 )]
233+
234+
181235def test_closing (executor ):
182236 """Test that closing context manager works as expected"""
183237 # mock the shutdown method of the executor
0 commit comments