forked from sonamkshenoy/YACS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmaster.py
More file actions
388 lines (251 loc) · 12.4 KB
/
master.py
File metadata and controls
388 lines (251 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
import sys
import socket
import threading
import json
import random
import time
import datetime
from queue import Queue
import logging
logging.basicConfig(
level=logging.INFO,
format="(%(asctime)s) %(message)s",
handlers=[
logging.FileHandler("logs/master.log"),
]
)
# ----------------------------------
# from allConfigs import *
# General variables
# File and variable names
# CONFIGFILE = "config.json"
MAINKEYINCONFIG = "workers"
# IPs and Ports
MASTER_SCHEDULING_PORT = 5000 # Port that listens to requests from request generator and schedules them to workers
MASTER_UPDATE_PORT = 5001 # Port that listens to updates from workers and executes reduce tasks once done
MASTER_IP = "localhost"
WORKER_IP = "localhost"
# Variables
PORTNUMBER = "portNumber"
# ----------------------------------
# Initialise scheduling algo to "random"
SCHEDULING_ALGO = "R"
queueOfRequests = Queue()
queueOfReduceRequests = Queue()
allPorts = []
tasksInProcess = {} # Keeps record of jobs whose map or reduce tasks are still running
lastUsedWorkerPortIndex = 0 # Used only for Round Robin Scheduling (Index of last used port in 'allPorts' variable)
numFreeSlotsInAllMachines = {}
jobTotalTime = {}
# THREAD 1: LISTENS TO REQUESTS (ACTS AS CLIENT)
def listenRequest():
global queueOfRequests
# Set up socket for listening to request
# create an INET, STREAMing socket
mastersocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Listen to requests on Port 5000
# "localhost" below only listens to requests from the same system. To access server from a different machine, have to give a globally available IP instead of "localhost"
mastersocket.bind((MASTER_IP, MASTER_SCHEDULING_PORT))
# become a server socket
mastersocket.listen()
# Once socket is set up, listen and schedule requests
while True:
# accept connections from outside
(reqGeneratorSocket, address) = mastersocket.accept()
while True:
# address has (host ip, port) of client (requests.py)
data = reqGeneratorSocket.recv(4096).decode()
if len(data) == 0: # the client does not send anything but just closes its side
# Close the connection with the client but keep socket open for new connections
reqGeneratorSocket.close()
break
queueOfRequests.put(data)
# Returns port number of worker depending on scheduling algorithm
def getWorkerId():
global lastUsedWorkerPortIndex
# Random selection of machine
if(SCHEDULING_ALGO == "R"):
return(random.choice(allPorts))
# Round Robin selection
elif(SCHEDULING_ALGO == "RR"):
firstAvailablePortIndex = lastUsedWorkerPortIndex
lastUsedWorkerPortIndex = (lastUsedWorkerPortIndex + 1) % len(allPorts)
return(allPorts[firstAvailablePortIndex])
# Least Loaded selection
else:
maxFreeSlots = 0
maxFreeSlotsMachine = list(numFreeSlotsInAllMachines.keys())[0]
while(True):
# Find machine with max free slots
for machine in numFreeSlotsInAllMachines:
if(numFreeSlotsInAllMachines[machine] > maxFreeSlots):
maxFreeSlots = numFreeSlotsInAllMachines[machine]
maxFreeSlotsMachine = machine
# If max free slots is 0 => no machine free, sleep for 1 second till it finds one
if(maxFreeSlots > 0):
break
time.sleep(1)
return maxFreeSlotsMachine
# THREAD 2 : SCHEDULES TASKS - both map and reduce (ACTS AS SERVER)
# New thread since we don't want scheduling to block listening to events
def scheduleRequest(lock):
global tasksInProcess
global queueOfReduceRequests
global queueOfRequests
# Get jobs from queue and execute (FIFO)
while(True):
# Only one map task and one reduce map task are executed in one iteration. All map tasks in the map queue are not executed at once to prevent starvation of reduce tasks and thus of the job (from completing). They have after all waited so long for their map tasks to complete executing!
# Execute only map tasks
if(not queueOfRequests.empty()):
# Get the job
newJobRequest = queueOfRequests.get()
newJobRequest = json.loads(newJobRequest)
# Extract and schedule map tasks before scheduling (going to) next job
mapTasks = newJobRequest["map_tasks"]
reduceTasks = newJobRequest["reduce_tasks"]
job_id = newJobRequest["job_id"]
# Create an entry for current job in "executing"
tasksInProcess[job_id] = {"mapTasks":[], "reduceTasks":[], "reduceTasksInfo":[]}
# Mark the sent map task of current job as "executing"
tasksInProcess[job_id]["mapTasks"] = [x[list(x.keys())[0]] for x in mapTasks]
tasksInProcess[job_id]["reduceTasks"] = [x[list(x.keys())[0]] for x in reduceTasks]
tasksInProcess[job_id]["reduceTasksInfo"] = reduceTasks
logging.info("[START] Job-{0} Started Execution".format(job_id))
# Keep log of time at which job began
jobTotalTime[job_id] = datetime.datetime.now()
for task in mapTasks:
# Now allot the map task to a worker machine
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
while(True):
if(debug):
print(numFreeSlotsInAllMachines)
# Get machine to execute according to chosen scheduling algorithm
selectedWorker = getWorkerId()
# If no free slots, search again for a machine with free slot (applicable only for R and RR since LL takes care of it in the function itself)
if(numFreeSlotsInAllMachines[selectedWorker] <= 0):
continue
# If free slots, connect and send task
s.connect((WORKER_IP, selectedWorker))
message= json.dumps(task)
s.send(message.encode())
logging.info("[INFO] Allotting task-{0} to {1} [SUCCESS]".format(task, selectedWorker))
# Number of available slots decreases by one since allotment successful
with lock:
numFreeSlotsInAllMachines[selectedWorker] -= 1
break
# Execute only reduce tasks (if present)
# These are added to reduceQueue only once all map tasks belonging to that job have completed executing
if(not queueOfReduceRequests.empty()):
# Get the job
reduceTasks = queueOfReduceRequests.get()
for task in reduceTasks:
# Now allot the task to a worker machine
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
while(True):
if(debug): print(numFreeSlotsInAllMachines)
# Get machine to execute according to chosen scheduling algorithm
selectedWorker = getWorkerId()
if(numFreeSlotsInAllMachines[selectedWorker] <= 0):
continue
s.connect((WORKER_IP, selectedWorker))
# Send task
message= json.dumps(task)
s.send(message.encode())
logging.info("[INFO] Allotting task-{0} to {1} [SUCCESS]".format(task, selectedWorker))
with lock:
numFreeSlotsInAllMachines[selectedWorker] -= 1
break
# THREAD 3 : LISTENS TO UPDATES AND HEARTBEATS FROM WORKERS
def listenToUpdatesFromWorker(lock):
global tasksInProcess
global queueOfReduceRequests
global numFreeSlotsInAllMachines
# Set up socket
mastersocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
mastersocket.bind((MASTER_IP, MASTER_UPDATE_PORT))
mastersocket.listen()
while True:
(workersocket, address) = mastersocket.accept()
while True:
data = workersocket.recv(4096).decode()
if(len(data) == 0):
workersocket.close()
break
# Get the update from worker (and hence the task id of the map task that finished executing)
update = json.loads(data)
# Update number of free slots on that machine
with lock:
numFreeSlotsInAllMachines[update[PORTNUMBER]] += 1
# Execution update
task_id = update["taskid"]
job_id = None
# Search the job the task belongs to and mark it as "no more executing" (by deleting its entry)
for executingJob in tasksInProcess:
if(task_id in tasksInProcess[executingJob]["mapTasks"]):
job_id = executingJob
tasksInProcess[executingJob]["mapTasks"].remove(task_id)
taskType = "mapTask"
break
for executingJob in tasksInProcess:
if(task_id in tasksInProcess[executingJob]["reduceTasks"]):
job_id = executingJob
tasksInProcess[executingJob]["reduceTasks"].remove(task_id)
taskType = "reduceTask"
break
# If job_id is not initialised (Should ideally never come to this condition, yet handle)
if not job_id:
break
# If all reduce tasks of this job id have finished executing, remove this record (job) from "executing" list
if(taskType == "reduceTask" and len(tasksInProcess[job_id]["reduceTasks"]) <= 0):
tasksInProcess.pop(job_id)
# Calculate time taken to complete job
starttime = jobTotalTime.pop(job_id)
duration = datetime.datetime.now() - starttime
logging.info("[FINISH] JOB {0} Finished execution. Total duration - {1:.3f}".format(job_id, duration.total_seconds()*1000))
# If all map tasks of this job id have finished executing, add to the reduce task queue
if(taskType == "mapTask" and len(tasksInProcess[job_id]["mapTasks"]) <= 0):
currentJob = tasksInProcess[job_id]
# Push all reduce tasks belonging to that job in queue to be executed (reduce tasks can be executed parallelly)
queueOfReduceRequests.put(currentJob["reduceTasksInfo"])
if(debug):
print(numFreeSlotsInAllMachines)
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python master.py <path to config file> <scheduling algorithm>", file=sys.stderr)
sys.exit(-1)
PATH_TO_CONFIG = sys.argv[1]
SCHEDULING_ALGO = sys.argv[2]
if(SCHEDULING_ALGO not in ["R", "RR", "LL"]):
print("Please enter R, RR or LL only.\nR for Random\nRR for Round Robin\nLL for Least Loaded\n")
sys.exit(-1)
# debug -> if logs should be print to the terminal
debug = True
if(len(sys.argv) == 4):
debug = sys.argv[3]
if(debug == 'False'):
debug = False
if(debug):
logging.getLogger().addHandler(logging.StreamHandler())
# Get all workers and their port numbers from the config file
with open(PATH_TO_CONFIG, "r") as f:
configs = f.read()
configs = json.loads(configs)
configs = configs[MAINKEYINCONFIG]
for config in configs:
allPorts.append(config["port"])
numFreeSlotsInAllMachines[config["port"]] = config["slots"]
if(debug):
print("\n----------REQUESTS BEGIN------------\n")
# Set up locks to prevent multiple threads from modifying "freeSlotsNum" at the same time
lock = threading.Lock()
try:
t1 = threading.Thread(target = listenRequest)
t2 = threading.Thread(target = scheduleRequest, args=(lock,))
t3 = threading.Thread(target = listenToUpdatesFromWorker, args=(lock,))
t1.start()
t2.start()
t3.start()
# We don't want to join (stop master till threads finish executing, they don't stop executing)
except Exception as e:
if(debug):
print("Error in starting thread: ", e)