Skip to content

Commit 10abf16

Browse files
committed
refactor: move simple Bluesky plan magics definitions to separate file
Signed-off-by: Sofia Donato Ferreira <[email protected]>
1 parent 5d2581d commit 10abf16

File tree

2 files changed

+163
-158
lines changed

2 files changed

+163
-158
lines changed

src/sophys/cli/core/magics/plan_magics.py

Lines changed: 0 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from enum import IntEnum
33
from typing import Annotated
44

5-
import functools
65
import importlib
76
import inspect
87

@@ -15,7 +14,6 @@
1514
from . import in_debug_mode, NamespaceKeys, get_from_namespace, add_to_namespace
1615

1716
try:
18-
from bluesky_queueserver_api.item import BPlan
1917
remote_control_available = True
2018
except ImportError:
2119
remote_control_available = False
@@ -266,162 +264,6 @@ def _usage(self):
266264
return None
267265

268266

269-
class PlanMV(PlanCLI):
270-
def create_parser(self):
271-
_a = super().create_parser()
272-
273-
_a.add_argument("args", nargs='+', type=str)
274-
275-
return _a
276-
277-
def _create_plan(self, parsed_namespace, local_ns):
278-
args, _, motors = self.parse_varargs(parsed_namespace.args, local_ns)
279-
280-
md = self.parse_md(*motors, ns=parsed_namespace)
281-
282-
if self._mode_of_operation == ModeOfOperation.Local:
283-
return functools.partial(self._plan, *args, md=md)
284-
if self._mode_of_operation == ModeOfOperation.Remote:
285-
return BPlan(self._plan.__name__, *args, md=md)
286-
if self._mode_of_operation == ModeOfOperation.Test:
287-
return (self._plan, args, md)
288-
289-
290-
class PlanReadMany(PlanCLI):
291-
def create_parser(self):
292-
_a = super().create_parser()
293-
294-
_a.add_argument("devices", nargs='+', type=str)
295-
296-
return _a
297-
298-
def _create_plan(self, parsed_namespace, local_ns):
299-
devices, _, names = self.parse_varargs(parsed_namespace.devices, local_ns)
300-
301-
md = self.parse_md(*names, ns=parsed_namespace)
302-
303-
if self._mode_of_operation == ModeOfOperation.Local:
304-
return functools.partial(self._plan, devices, md=md)
305-
if self._mode_of_operation == ModeOfOperation.Remote:
306-
return BPlan(self._plan.__name__, devices, md=md)
307-
if self._mode_of_operation == ModeOfOperation.Test:
308-
return (self._plan, devices, md)
309-
310-
311-
class PlanCount(PlanCLI):
312-
def create_parser(self):
313-
_a = super().create_parser()
314-
315-
_a.add_argument("-n", "--num", type=int, default=1)
316-
_a.add_argument("--delay", type=float, default=0.0)
317-
318-
return _a
319-
320-
def _create_plan(self, parsed_namespace, local_ns):
321-
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
322-
num = parsed_namespace.num
323-
delay = parsed_namespace.delay
324-
325-
md = self.parse_md(*parsed_namespace.detectors, ns=parsed_namespace)
326-
327-
if self._mode_of_operation == ModeOfOperation.Local:
328-
return functools.partial(self._plan, detectors, num=num, delay=delay, md=md)
329-
if self._mode_of_operation == ModeOfOperation.Remote:
330-
return BPlan(self._plan_name, detectors, num=num, delay=delay, md=md)
331-
if self._mode_of_operation == ModeOfOperation.Test:
332-
return (self._plan, detectors, num, delay, md)
333-
334-
335-
class PlanScan(PlanCLI):
336-
def create_parser(self):
337-
_a = super().create_parser()
338-
339-
_a.add_argument("-m", "--motors", nargs='+', type=str)
340-
_a.add_argument("-n", "--num", type=int, default=-1)
341-
342-
return _a
343-
344-
def _create_plan(self, parsed_namespace, local_ns):
345-
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
346-
_args = parsed_namespace.motors
347-
_num = parsed_namespace.num
348-
args, num, motors = self.parse_varargs(_args, local_ns, with_final_num=(_num == -1), default_num=_num)
349-
350-
md = self.parse_md(*parsed_namespace.detectors, *motors, ns=parsed_namespace)
351-
352-
if self._mode_of_operation == ModeOfOperation.Local:
353-
return functools.partial(self._plan, detectors, *args, num=num, md=md)
354-
if self._mode_of_operation == ModeOfOperation.Remote:
355-
return BPlan(self._plan_name, detectors, *args, num=num, md=md)
356-
if self._mode_of_operation == ModeOfOperation.Test:
357-
return (self._plan, detectors, args, num, md)
358-
359-
360-
class PlanGridScan(PlanCLI):
361-
def create_parser(self):
362-
_a = super().create_parser()
363-
364-
_a.add_argument("-m", "--motors", nargs='+', type=str)
365-
_a.add_argument("-s", "--snake_axes", action="store_true", default=False)
366-
367-
return _a
368-
369-
def _create_plan(self, parsed_namespace, local_ns):
370-
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
371-
_args = parsed_namespace.motors
372-
args, _, motors = self.parse_varargs(_args, local_ns)
373-
374-
md = self.parse_md(*parsed_namespace.detectors, *motors, ns=parsed_namespace)
375-
376-
snake = parsed_namespace.snake_axes
377-
378-
if self._mode_of_operation == ModeOfOperation.Local:
379-
return functools.partial(self._plan, detectors, *args, snake_axes=snake, md=md)
380-
if self._mode_of_operation == ModeOfOperation.Remote:
381-
return BPlan(self._plan_name, detectors, *args, snake_axes=snake, md=md)
382-
if self._mode_of_operation == ModeOfOperation.Test:
383-
return (self._plan, detectors, args, snake, md)
384-
385-
386-
class PlanAdaptiveScan(PlanCLI):
387-
def create_parser(self):
388-
_a = super().create_parser()
389-
390-
_a.add_argument("-t", "--target_field", type=str)
391-
_a.add_argument("-m", "--motor", type=str)
392-
_a.add_argument("-st", "--start", type=float)
393-
_a.add_argument("-sp", "--stop", type=float)
394-
_a.add_argument("-mins", "--min_step", type=float)
395-
_a.add_argument("-maxs", "--max_step", type=float)
396-
_a.add_argument("-td", "--target_delta", type=float)
397-
_a.add_argument("-b", "--backstep", type=bool)
398-
_a.add_argument("-th", "--threshold", type=float, default=0.8)
399-
400-
return _a
401-
402-
def _create_plan(self, parsed_namespace, local_ns):
403-
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
404-
motor = self.get_real_devices_if_needed(parsed_namespace.motor, local_ns)
405-
406-
md = self.parse_md(*parsed_namespace.detectors, parsed_namespace.motor, ns=parsed_namespace)
407-
408-
target_field = parsed_namespace.target_field
409-
start = parsed_namespace.start
410-
stop = parsed_namespace.stop
411-
min_step = parsed_namespace.min_step
412-
max_step = parsed_namespace.max_step
413-
target_delta = parsed_namespace.target_delta
414-
backstep = parsed_namespace.backstep
415-
threshold = parsed_namespace.threshold
416-
417-
if self._mode_of_operation == ModeOfOperation.Local:
418-
return functools.partial(self._plan, detectors, target_field=target_field, motor=motor, start=start, stop=stop, min_step=min_step, max_step=max_step, target_delta=target_delta, backstep=backstep, threshold=threshold, md=md)
419-
if self._mode_of_operation == ModeOfOperation.Remote:
420-
return BPlan(self._plan_name, detectors, target_field=target_field, motor=motor, start=start, stop=stop, min_step=min_step, max_step=max_step, target_delta=target_delta, backstep=backstep, threshold=threshold, md=md)
421-
if self._mode_of_operation == ModeOfOperation.Test:
422-
return (self._plan, detectors, target_field, motor, start, stop, min_step, max_step, target_delta, backstep, threshold, md)
423-
424-
425267
@magics_class
426268
class RealMagics(Magics):
427269
...
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import functools
2+
3+
from .plan_magics import PlanCLI, ModeOfOperation, remote_control_available
4+
5+
6+
if remote_control_available:
7+
from bluesky_queueserver_api.item import BPlan
8+
9+
10+
class PlanMV(PlanCLI):
11+
def create_parser(self):
12+
_a = super().create_parser()
13+
14+
_a.add_argument("args", nargs='+', type=str)
15+
16+
return _a
17+
18+
def _create_plan(self, parsed_namespace, local_ns):
19+
args, _, motors = self.parse_varargs(parsed_namespace.args, local_ns)
20+
21+
md = self.parse_md(*motors, ns=parsed_namespace)
22+
23+
if self._mode_of_operation == ModeOfOperation.Local:
24+
return functools.partial(self._plan, *args, md=md)
25+
if self._mode_of_operation == ModeOfOperation.Remote:
26+
return BPlan(self._plan.__name__, *args, md=md)
27+
if self._mode_of_operation == ModeOfOperation.Test:
28+
return (self._plan, args, md)
29+
30+
31+
class PlanReadMany(PlanCLI):
32+
def create_parser(self):
33+
_a = super().create_parser()
34+
35+
_a.add_argument("devices", nargs='+', type=str)
36+
37+
return _a
38+
39+
def _create_plan(self, parsed_namespace, local_ns):
40+
devices, _, names = self.parse_varargs(parsed_namespace.devices, local_ns)
41+
42+
md = self.parse_md(*names, ns=parsed_namespace)
43+
44+
if self._mode_of_operation == ModeOfOperation.Local:
45+
return functools.partial(self._plan, devices, md=md)
46+
if self._mode_of_operation == ModeOfOperation.Remote:
47+
return BPlan(self._plan.__name__, devices, md=md)
48+
if self._mode_of_operation == ModeOfOperation.Test:
49+
return (self._plan, devices, md)
50+
51+
52+
class PlanCount(PlanCLI):
53+
def create_parser(self):
54+
_a = super().create_parser()
55+
56+
_a.add_argument("-n", "--num", type=int, default=1)
57+
_a.add_argument("--delay", type=float, default=0.0)
58+
59+
return _a
60+
61+
def _create_plan(self, parsed_namespace, local_ns):
62+
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
63+
num = parsed_namespace.num
64+
delay = parsed_namespace.delay
65+
66+
md = self.parse_md(*parsed_namespace.detectors, ns=parsed_namespace)
67+
68+
if self._mode_of_operation == ModeOfOperation.Local:
69+
return functools.partial(self._plan, detectors, num=num, delay=delay, md=md)
70+
if self._mode_of_operation == ModeOfOperation.Remote:
71+
return BPlan(self._plan_name, detectors, num=num, delay=delay, md=md)
72+
if self._mode_of_operation == ModeOfOperation.Test:
73+
return (self._plan, detectors, num, delay, md)
74+
75+
76+
class PlanScan(PlanCLI):
77+
def create_parser(self):
78+
_a = super().create_parser()
79+
80+
_a.add_argument("-m", "--motors", nargs='+', type=str)
81+
_a.add_argument("-n", "--num", type=int, default=-1)
82+
83+
return _a
84+
85+
def _create_plan(self, parsed_namespace, local_ns):
86+
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
87+
_args = parsed_namespace.motors
88+
_num = parsed_namespace.num
89+
args, num, motors = self.parse_varargs(_args, local_ns, with_final_num=(_num == -1), default_num=_num)
90+
91+
md = self.parse_md(*parsed_namespace.detectors, *motors, ns=parsed_namespace)
92+
93+
if self._mode_of_operation == ModeOfOperation.Local:
94+
return functools.partial(self._plan, detectors, *args, num=num, md=md)
95+
if self._mode_of_operation == ModeOfOperation.Remote:
96+
return BPlan(self._plan_name, detectors, *args, num=num, md=md)
97+
if self._mode_of_operation == ModeOfOperation.Test:
98+
return (self._plan, detectors, args, num, md)
99+
100+
101+
class PlanGridScan(PlanCLI):
102+
def create_parser(self):
103+
_a = super().create_parser()
104+
105+
_a.add_argument("-m", "--motors", nargs='+', type=str)
106+
_a.add_argument("-s", "--snake_axes", action="store_true", default=False)
107+
108+
return _a
109+
110+
def _create_plan(self, parsed_namespace, local_ns):
111+
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
112+
_args = parsed_namespace.motors
113+
args, _, motors = self.parse_varargs(_args, local_ns)
114+
115+
md = self.parse_md(*parsed_namespace.detectors, *motors, ns=parsed_namespace)
116+
117+
snake = parsed_namespace.snake_axes
118+
119+
if self._mode_of_operation == ModeOfOperation.Local:
120+
return functools.partial(self._plan, detectors, *args, snake_axes=snake, md=md)
121+
if self._mode_of_operation == ModeOfOperation.Remote:
122+
return BPlan(self._plan_name, detectors, *args, snake_axes=snake, md=md)
123+
if self._mode_of_operation == ModeOfOperation.Test:
124+
return (self._plan, detectors, args, snake, md)
125+
126+
127+
class PlanAdaptiveScan(PlanCLI):
128+
def create_parser(self):
129+
_a = super().create_parser()
130+
131+
_a.add_argument("-t", "--target_field", type=str)
132+
_a.add_argument("-m", "--motor", type=str)
133+
_a.add_argument("-st", "--start", type=float)
134+
_a.add_argument("-sp", "--stop", type=float)
135+
_a.add_argument("-mins", "--min_step", type=float)
136+
_a.add_argument("-maxs", "--max_step", type=float)
137+
_a.add_argument("-td", "--target_delta", type=float)
138+
_a.add_argument("-b", "--backstep", type=bool)
139+
_a.add_argument("-th", "--threshold", type=float, default=0.8)
140+
141+
return _a
142+
143+
def _create_plan(self, parsed_namespace, local_ns):
144+
detectors = self.get_real_devices_if_needed(parsed_namespace.detectors, local_ns)
145+
motor = self.get_real_devices_if_needed(parsed_namespace.motor, local_ns)
146+
147+
md = self.parse_md(*parsed_namespace.detectors, parsed_namespace.motor, ns=parsed_namespace)
148+
149+
target_field = parsed_namespace.target_field
150+
start = parsed_namespace.start
151+
stop = parsed_namespace.stop
152+
min_step = parsed_namespace.min_step
153+
max_step = parsed_namespace.max_step
154+
target_delta = parsed_namespace.target_delta
155+
backstep = parsed_namespace.backstep
156+
threshold = parsed_namespace.threshold
157+
158+
if self._mode_of_operation == ModeOfOperation.Local:
159+
return functools.partial(self._plan, detectors, target_field=target_field, motor=motor, start=start, stop=stop, min_step=min_step, max_step=max_step, target_delta=target_delta, backstep=backstep, threshold=threshold, md=md)
160+
if self._mode_of_operation == ModeOfOperation.Remote:
161+
return BPlan(self._plan_name, detectors, target_field=target_field, motor=motor, start=start, stop=stop, min_step=min_step, max_step=max_step, target_delta=target_delta, backstep=backstep, threshold=threshold, md=md)
162+
if self._mode_of_operation == ModeOfOperation.Test:
163+
return (self._plan, detectors, target_field, motor, start, stop, min_step, max_step, target_delta, backstep, threshold, md)

0 commit comments

Comments
 (0)