-
Notifications
You must be signed in to change notification settings - Fork 0
/
jobDistributor.py
244 lines (216 loc) · 10.2 KB
/
jobDistributor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""
License: This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at your
option) any later version. This program is distributed in the hope that it
will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
"""
import time, math
from Queue import Queue
from HashCat import HashCat
from Host import Host
from Config import Config
from Encryption import Encryption
from Task import States
from threading import Thread
config = Config().getConfig()
crypto = Encryption()
class Job(object):
def __init__(self, host,command):
self.log = Config().getLogger('distributor.'+host.getHostName(),host)
self.startTime = time.ctime()
self.endTime = None
self.__status=None
self.__command=command
self.__host=host
self.HC = HashCat(self.__host, self.__command)
self.__status=self.HC.get_result()
if self.__host.addProcess():
self.HC.start()
else:
self.log.debug("could not add process")
self.__status.set_command_xcode(-1000)
def __str__(self):
ret = '[Job on host %s: %-15s started %s]' % \
(self.__host.getHostName(),self.__command.getCommand()[:15], self.startTime)
return ret
def terminate(self):
self.HC.abort(True)
self.poll()
def poll(self):
self.log.debug("Job status is: %s"%self.HC.isAlive())
if not self.HC.isAlive() or self.HC.isAborted():
self.log.debug("Waiting for thread to finish...")
self.HC.join(300.0)
self.endTime=time.ctime()
self.__host.delProcess()
if self.__status.get_command_xcode()!=0:
if self.__status.get_command_xcode()!=-500:
self.log.debug("exit code is not 0 and -500")
self.__host.addError()
else:
self.__host.resetErrors()
return False
elif self.__status.get_command_xcode()==-1000:
self.log.debug("return status -1000")
self.endTime=time.ctime()
self.__host.addError()
return False
else:
return True
def getStatus(self):
return self.__status
class JobDistributor(Thread):
def __init__(self,task):
Thread.__init__(self)
self.log = Config().getLogger('distributor')
self.__maxJobs = config["hostJobs"]
self.__maxErrors = config["hostErrors"]
self.totalJobs = 0
self.instances = 0
self.__task=task
self.__status = States.Pending
self.computer_list = self.__getHostfromConfig(self.__maxJobs, self.__maxErrors)
self.__processes = {}
self.__jobQueue = Queue(100)
def getTask(self):
return self.__task
def terminate(self):
self.__status=States.Aborted
self.__task.setStatus(States.Aborted)
self.__stopAll()
def __stopAll(self):
self.log.debug("Terminating all jobs, please standby ...")
for host in self.__processes:
for job in self.__processes[host]:
job.terminate()
def run(self):
commands=self.__task.createCommandList()
self.log.debug("Adding commands to queue...")
for command in commands:
self.__jobQueue.put(command, block=False)
self.__task.setStatus(States.Running)
while not self.__jobQueue.empty() and self.__status not in [States.Completed, States.Aborted]:
self.log.debug("JD.run - jobQ %s and status %s"%(self.__jobQueue.qsize(),self.__status))
self.distribute(self.__jobQueue.get(block=False))
while len(self)!=0:
self.log.debug("JD.run end - len is %s"%len(self))
self.__cleanup()
time.sleep(config["poll_timeout"])
if not self.__task.getStatus() in [States.Completed,States.Aborted]:
self.__task.setStatus(States.Failed)
def distribute(self, command):
if self.__status==States.Pending:
self.__status=States.Running
procNum = command.getID()
host=None
sleepTime=0
maxSleepTime=config["maxHostWait"]
self.log.info('Searching for host for process %i...' % (procNum))
while host==None and self.__status not in [States.Completed, States.Aborted]:
self.log.debug("Waiting %i seconds for available host!"%sleepTime)
time.sleep(sleepTime)
if sleepTime<maxSleepTime:
sleepTime+=10
host = self.__getHost()
if self.__status not in [States.Completed, States.Aborted]:
self.log.info('Host %s chosen for proc %i.' % (host.getHostName(), procNum))
self.__processes[host.getHostName()].append(Job(host,command))
self.log.info('Submited to ' + host.getHostName() + ': ' + command.getCommand())
self.totalJobs += 1
return True
return False
def __getHost(self):
"""Find a host among the computer_list whose load is less than maxJobs."""
self.log.info("Finding available host...")
self.__cleanup()
for host in self.__processes:
hostInfo=self.__getHostfromList(host)
self.log.debug("Checking host %s" % hostInfo.getHostName())
if hostInfo.getStatus() in [Host.States.Available, Host.States.Running]:
return hostInfo
return None
def __getHostfromConfig(self,maxProcess, maxError):
lst = config["hosts"]
pcList=[]
for host in lst:
h = Host(host["name"], host["user"], crypto.decrypt(host["pass"]))
h.setMaxProcess(maxProcess)
h.setMaxErrors(maxError)
self.log.debug("Host %s extracted from config file." % h.getHostName())
pcList.append(h)
self.log.debug("Total number of hosts is: %i" % len(pcList))
return pcList
def __cleanup(self):
processes={}
for host in self.computer_list:
if host.checkHost() and host.getStatus() != Host.States.Error: #host is alive
self.log.debug("Host %s has status %s"%(host.getHostName(),host.getStatus()))
if host.getStatus() in (Host.States.Running, Host.States.Full): # host is working
if self.__processes.has_key(host.getHostName()): #current process list contains this host
jobs=[]
for job in self.__processes[host.getHostName()]:
if job.poll(): # process is still working
jobs.append(job)
else: #process is done
if job.getStatus().get_status() == "Cracked": # check for crack code
self.log.debug("status is Cracked")
self.__status = States.Completed
self.__task.setStatus(States.Completed)
self.__task.setCode(job.getStatus().get_crackCode())
self.__stopAll()
break
if job.getStatus().get_command_xcode()!=0: # check for errors
self.log.debug("exit code is %s"%job.getStatus().get_command_xcode())
self.__jobQueue.put(job.getStatus().get_command(),block=False)
continue
if not self.__status==States.Aborted: # remove completed from task
self.log.debug("removing job %s from queue"%job.getStatus().get_command().getID())
self.__task.delJobID(job.getStatus().get_command().getID())
processes[host.getHostName()]=jobs
else:
self.log.error("Something is very wrong!!! There are hosts with assigned tasks that are not in the current jobs list.")
else: # host is idle, add empty proc list
processes[host.getHostName()]=[]
else: # host is error, or dead
if self.__processes.has_key(host.getHostName()):
for job in self.__processes[host.getHostName()]:
#add to error queue
job.terminate()
self.log.debug("Host %s is dead returning job %s in the queue!"%(host.getHostName(),job.getStatus().get_command()))
self.__jobQueue.put(job.getStatus().get_command(),block=False)
self.__processes=processes
self.__task.setProgress(self.__calcTaskProgress(self.__task.getJobCount(),self.__calculateProgress()))
def __len__(self):
return sum([len(plist) for plist in self.__processes.values()])
def __repr__(self):
errors=0
for host in self.computer_list:
if host.getStatus()==Host.States.Error:
errors+=1
return (errors,self.__status)
def __calcTaskProgress(self,jCount,fraction):
answer=100 - jCount + fraction
self.log.debug("progress part is: %f"%answer)
return answer
def __calculateProgress(self):
allProgress=[]
for host in self.__processes:
for job in self.__processes[host]:
allProgress.append(job.getStatus().get_progress())
fraction=0.00
for i in allProgress:
fraction=fraction+math.modf(i)[0]
return fraction
def resetErrorHost(self):
for host in self.computer_list:
if host.getStatus()==Host.States.Error:
host.resetErrors()
def __getHostfromList(self,host):
for hostInfo in self.computer_list:
if hostInfo.getHostName() == host:
return hostInfo
if __name__ == '__main__':
pass