This repository was archived by the owner on Oct 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathcli.py
More file actions
471 lines (398 loc) · 18.1 KB
/
cli.py
File metadata and controls
471 lines (398 loc) · 18.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# A console based UI for SimDem.
import difflib
import os
import pexpect
import pexpect.replwrap
import random
import re
import time
import sys
import colorama
import config
colorama.init(strip=None)
PEXPECT_PROMPT = u'[PEXPECT_PROMPT>'
PEXPECT_CONTINUATION_PROMPT = u'[PEXPECT_PROMPT+'
class Ui(object):
_shell = None
demo = None
execution_log = ""
def __init__(self):
pass
def prompt(self):
"""Display the prompt for the user. This is intended to indicate that
the user is expected to take an action at this point.
"""
self.display(config.console_prompt, colorama.Fore.WHITE)
def command(self, text):
"""Display a command, or a part of a command tp be executed."""
self.display(text, colorama.Fore.WHITE + colorama.Style.BRIGHT)
def results(self, text):
"""Display the results of a command execution"""
self.display(text, colorama.Fore.GREEN + colorama.Style.BRIGHT, True)
def heading(self, text):
"""Display a heading"""
self.display(text, colorama.Fore.CYAN + colorama.Style.BRIGHT, True)
self.new_line()
def description(self, text):
"""Display some descriptive text. Usually this is text from the demo
document itself.
"""
self.display(text, colorama.Fore.CYAN)
def information(self, text, new_line = False):
"""Display some informative text. Usually this is content generated by
SimDem. Do not print a new line unless new_line == True.
"""
self.display(text, colorama.Fore.WHITE, new_line)
def prep_step(self, step):
"""Displays a preparation step item.
"""
self.display(step["title"], colorama.Fore.MAGENTA, True)
def next_step(self, index, title):
"""Displays a next step item with an index (the number to be entered
to select it) and a title (to be displayed).
"""
self.display(index, colorama.Fore.CYAN)
self.display(title, colorama.Fore.CYAN, True)
def instruction(self, text):
"""Display an instruction for the user.
"""
self.display(text, colorama.Fore.MAGENTA, True)
def warning(self, text):
"""Display a warning to the user.
"""
self.display(text, colorama.Fore.RED + colorama.Style.BRIGHT, True)
def new_para(self):
"""Starts a new paragraph."""
self.new_line()
self.new_line()
def new_line(self):
"""Move to the next line"""
self.display("", colorama.Fore.WHITE, True)
def horizontal_rule(self):
self.display("\n\n============================================\n\n", colorama.Fore.WHITE)
def clear(self):
"""Clears the screen ready for anew section of the script."""
if self.demo.is_simulation:
self.demo.current_command = "clear"
self.simulate_command()
else:
self.run_command("clear")
def display(self, text, color, new_line=False):
"""Display some text in a given color. Do not print a new line unless
new_line is set to True.
"""
self.execution_log += color
if self.demo.output_format == "log":
print(color, end="")
self.execution_log += text
if self.demo.output_format == "log":
print(text, end="", flush=True)
if new_line:
self.execution_log += colorama.Style.RESET_ALL + "\n"
if self.demo.output_format == "log":
print(colorama.Style.RESET_ALL)
else:
self.execution_log += colorama.Style.RESET_ALL
if self.demo.output_format == "log":
print(colorama.Style.RESET_ALL, end="")
def log(self, level, text):
if config.is_debug:
print(level.upper() + " : " + text)
def request_input(self, text):
"""Displays text that is intended to propmt the user for
input and then waits for input.
"""
print(colorama.Fore.MAGENTA + colorama.Style.BRIGHT, end="")
print(text)
print(colorama.Style.RESET_ALL, end="")
return self.input_string().lower()
def input_interactive_variable(self, name):
"""
Gets a value from stdin for a variable.
"""
print(colorama.Fore.MAGENTA + colorama.Style.BRIGHT, end="")
print("\n\nEnter a value for ", end="")
print(colorama.Fore.YELLOW + colorama.Style.BRIGHT, end="")
print("$" + name, end="")
print(colorama.Fore.MAGENTA + colorama.Style.BRIGHT, end="")
print(": ", end="")
print(colorama.Fore.WHITE + colorama.Style.BRIGHT, end="")
value = input()
return value
def type_command(self):
"""
Displays the command on the screen
If simulation == True then it will look like someone is typing the command
"""
text = ""
end_of_var = 0
current_command, undefined_var_list, defined_var_list = self.demo.get_current_command()
for idx, char in enumerate(current_command):
if char != "\n":
text += char
if self.demo.is_simulation:
for char in text:
delay = random.uniform(0.02, config.TYPING_DELAY)
time.sleep(delay)
self.command(char)
else:
self.command(text)
def simulate_command(self, silent = False):
"""Types the current command on the screen, executes it and outputs
the results if simulation == True then system will make the
"typing" look real and will wait for keyboard entry before
proceeding to the next command.
If silent = True then the command and its results will not be
ouptut.
"""
self.log("debug", "Simulating command: '" + self.demo.current_command + "'")
if not self.demo.is_learning or self.demo.current_command.strip() == "clear":
self.type_command()
_, undefined_var_list, defined_var_list = self.demo.get_current_command()
# Get values for unknown variables
for var_name in undefined_var_list:
if (self.demo.is_testing):
var_value = "Dummy value for test"
else:
var_value = self.input_interactive_variable(var_name)
if not var_name.startswith("SIMDEM_"):
self.demo.env.set(var_name, var_value)
self.run_command(var_name + '="' + var_value + '"')
# Log values if in debug mode
if config.is_debug:
self.information("\n")
for var_name in undefined_var_list:
self.log("debug", "$" + var_name + " = " + self.demo.env.get(var_name))
for var_name in defined_var_list:
self.log("debug", "$" + var_name + " = " + self.demo.env.get(var_name))
output = self.run_command()
self.demo.last_command = self.demo.current_command
self.demo.current_command = ""
else:
done = False
while not done:
print(colorama.Fore.MAGENTA + colorama.Style.BRIGHT, end="")
print("\nType the command '", end = "")
print(colorama.Fore.WHITE + colorama.Style.BRIGHT, end="")
print(self.demo.current_command.strip(), end = "")
print(colorama.Fore.MAGENTA + colorama.Style.BRIGHT, end="")
print("'")
print("\t- type 'auto' (or 'a') to automatically type the command")
print(colorama.Fore.WHITE + colorama.Style.BRIGHT, end="")
print("\n$ ", end = "", flush = True)
typed_command = input()
if typed_command.lower() == "a" or typed_command.lower() == "auto":
self.demo.is_learning = False
output = self.simulate_command()
self.demo.is_learning = True
done = True
elif typed_command == self.demo.current_command.strip():
self.demo.is_learning = False
output = self.simulate_command()
self.demo.is_learning = True
done = True
else:
print(colorama.Fore.RED, end="")
print("You have a typo there")
self.log("debug", "Output: '" + output +"'")
return output
def input_string(self):
""" Get a string from the user."""
return input()
def get_shell(self):
"""Gets or creates the shell in which to run commands for the
supplied demo
"""
if self._shell == None:
child = pexpect.spawnu('/bin/bash', env=self.demo.env.get(), echo=False, timeout=None)
ps1 = PEXPECT_PROMPT[:5] + u'\[\]' + PEXPECT_PROMPT[5:]
ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + PEXPECT_CONTINUATION_PROMPT[5:]
prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
self._shell = pexpect.replwrap.REPLWrapper(child, u'\$', prompt_change)
return self._shell
def run_command(self, command=None, silent = False):
"""
Run the self.demo.curent_command unless command is passed in, in
which case run the supplied command in the current demo
environment. Return the output of the command.
A small number of commands are intercepted and handled as
special cases, see `run_special_command`
"""
if not command:
command = self.demo.current_command
command = command.strip()
self.new_line();
self.log("debug", "Execute command: '" + command + "'")
start_time = time.time()
response = self.run_special_command(command)
if response:
pass
else:
response = self.get_shell().run_command(command)
end_time = time.time()
if not silent:
self.results(response)
if self.demo.is_testing:
self.information("--- %s seconds execution time ---" % (end_time - start_time), True)
return response
def run_special_command(self, command):
"""Test to see if the command is a spcial command that needs to be
handled diferently, these include:
`xdg-open $URL` - intercepted and converted to a curl for headless CLI
`az acs create ...` - if we have a service principle set in environment variables 'SERVICE_PRINCIPAL_ID' and 'SERVICE_PRINCIPAL_SECRET_KEY' then add them to the command (assuming that we don't have a device login active)
Returns the response from the command if it was handled by this function,
otherwise returns False.
"""
orig_command = command
if command.startswith("xdg-open "):
self.warning("Since you are running in headless CLI mode it is not possible to execute xdg-open commands.")
command = "curl -I " + command[9:] + " --connect-timeout 90"
self.warning("Converting to `" + command + "`")
self.warning("Note that this may break tests.")
if command.startswith('az acs create '):
if "--orchestrator-type=kubernetes" in command and not "--service-principal" in command:
if os.getenv('SERVICE_PRINCIPAL_ID'):
command += " --service-principal ${SERVICE_PRINCIPAL_ID} --client-secret ${SERVICE_PRINCIPAL_SECRET_KEY}"
if orig_command != command:
self.log("INFO", "Running special command " + orig_command + " as " + command)
response = self.get_shell().run_command(command)
return response
else:
return False
def expand_vars(self, command):
"""Expand the variables in the supplied command by replacing them
with the value they carry in the Environment. This is used by some special commands because the shell doesn't expand them (e.g. copying a $URL into a browser window using xdg-open)"""
self.log("debug", "Expanding vars in " + command)
var_pattern = re.compile(".*?(?<=\$)\(?{?(\w*)(?=[\W|\$|\s|\\\"]?)\)?(?!\$).*")
matches = var_pattern.findall(command)
if matches:
for var in matches:
value = self.demo.env.get(var)
self.log("Debug", "Expanding variable " + var + " to value " + value)
command = command.replace("$" + var, value)
return command
def get_help(self):
help = []
help.append("SimDem Help")
help.append("===========")
help.append("")
help.append("Pressing any key other than those listed below will result in the script progressing")
help.append("")
help.append("b - break out of the script and accept a command from user input")
help.append("b -> CTRL-C - stop the script")
help.append("d - (redisplay the description that precedes the current command then resume from this point)")
help.append("r - repeat the previous command")
help.append("h - displays this help message")
help.append("")
return help
def check_for_interactive_command(self):
"""Wait for a key to be pressed.
Most keys result in the script
progressing, but a few have special meaning. See the
documentation or code for a description of the special keys.
"""
if not self.demo.is_automated:
if not self.demo.is_simulation:
self.instruction("Press a command key to proceed (h for help)")
key = self.get_instruction_key()
if key == 'h':
text = self.get_help()
for line in text:
self.information(line, True)
self.check_for_interactive_command()
elif key == 'b':
print("shell> ", end='')
command = input()
if command != "":
self.run_command(command)
self.prompt()
self.check_for_interactive_command()
elif key == 'd':
print("")
print(colorama.Fore.CYAN)
print(self.demo.current_description);
print(colorama.Style.RESET_ALL)
self.prompt()
print(self.demo.current_command, end="", flush=True)
self.check_for_interactive_command()
elif key == 'r':
if not self.demo.last_command == "":
self.demo.current_command = self.demo.last_command
self.simulate_command()
self.prompt()
self.check_for_interactive_command()
def get_instruction_key(self):
"""Waits for a single keypress on stdin.
This is a silly function to call if you need to do it a lot because it has
to store stdin's current setup, setup stdin for reading single keystrokes
then read the single keystroke then revert stdin back after reading the
keystroke.
Returns the character of the key that was pressed (zero on
KeyboardInterrupt which can happen when a signal gets handled)
This method is licensed under cc by-sa 3.0
Thanks to mheyman http://stackoverflow.com/questions/983354/how-do-i-make-python-to-wait-for-a-pressed-key\
"""
import termios, fcntl, sys, os
fd = sys.stdin.fileno()
# save old state
flags_save = fcntl.fcntl(fd, fcntl.F_GETFL)
attrs_save = termios.tcgetattr(fd)
# make raw - the way to do this comes from the termios(3) man page.
attrs = list(attrs_save) # copy the stored version to update
# iflag
attrs[0] &= ~(termios.IGNBRK | termios.BRKINT | termios.PARMRK
| termios.ISTRIP | termios.INLCR | termios. IGNCR
| termios.ICRNL | termios.IXON )
# oflag
attrs[1] &= ~termios.OPOST
# cflag
attrs[2] &= ~(termios.CSIZE | termios. PARENB)
attrs[2] |= termios.CS8
# lflag
attrs[3] &= ~(termios.ECHONL | termios.ECHO | termios.ICANON
| termios.ISIG | termios.IEXTEN)
termios.tcsetattr(fd, termios.TCSANOW, attrs)
# turn off non-blocking
fcntl.fcntl(fd, fcntl.F_SETFL, flags_save & ~os.O_NONBLOCK)
# read a single keystroke
try:
ret = sys.stdin.read(1) # returns a single character
except KeyboardInterrupt:
ret = 0
finally:
# restore old state
termios.tcsetattr(fd, termios.TCSAFLUSH, attrs_save)
fcntl.fcntl(fd, fcntl.F_SETFL, flags_save)
return ret
def test_results(self, results):
"""Display the test results for a single test
"""
if results["passed"]:
return
else:
print("\n\n=============================\n\n")
print(colorama.Fore.RED + colorama.Style.BRIGHT)
print("FAILED")
print(colorama.Style.RESET_ALL)
print("Similarity ratio: " + str(results["similarity"]))
print("Expected Similarity: " + str(results["required_similarity"]))
print("\n\n=============================\n\n")
print("Expected results:")
print(colorama.Fore.GREEN + colorama.Style.BRIGHT)
print(results["expected_results"])
print(colorama.Style.RESET_ALL)
print("Actual results:")
print(colorama.Fore.RED + colorama.Style.BRIGHT)
print(results["results"])
print(colorama.Style.RESET_ALL)
print("\n\n=============================\n\n")
print(colorama.Style.RESET_ALL)
def get_command(self, commands):
cmd = self.request_input("What mode do you want to run in? (default 'tutorial')")
if cmd == "":
cmd = "tutorial"
while not cmd in commands:
cmd = self.get_command(commands)
return cmd
def set_demo(self, demo):
self.demo = demo