diff options
Diffstat (limited to 'util/pbs')
-rwxr-xr-x | util/pbs/job.py | 139 | ||||
-rw-r--r-- | util/pbs/jobfile.py | 534 | ||||
-rwxr-xr-x | util/pbs/send.py | 164 |
3 files changed, 677 insertions, 160 deletions
diff --git a/util/pbs/job.py b/util/pbs/job.py index f370862de..e2636c111 100755 --- a/util/pbs/job.py +++ b/util/pbs/job.py @@ -29,10 +29,21 @@ # Steve Reinhardt # Ali Saidi -import os, os.path, shutil, signal, socket, sys, time +import os, os.path, shutil, signal, socket, sys from os import environ as env from os.path import join as joinpath, expanduser +def date(): + import time + return time.strftime('%a %b %e %H:%M:%S %Z %Y', time.localtime()) + +def cleandir(dir): + for root, dirs, files in os.walk(dir, False): + for name in files: + os.remove(joinpath(root, name)) + for name in dirs: + os.rmdir(joinpath(root, name)) + class rsync: def __init__(self): self.sudo = False @@ -61,25 +72,76 @@ class rsync: return os.spawnvp(os.P_WAIT, args[0], args) -def cleandir(dir): - for root, dirs, files in os.walk(dir, False): - for name in files: - os.remove(joinpath(root, name)) - for name in dirs: - os.rmdir(joinpath(root, name)) +class JobDir(object): + def __init__(self, dir): + self.dir = dir -def date(): - return time.strftime('%a %b %e %H:%M:%S %Z %Y', time.localtime()) + def file(self, filename): + return joinpath(self.dir, filename) + + def create(self): + if os.path.exists(self.dir): + if not os.path.isdir(self.dir): + sys.exit('%s is not a directory. Cannot build job' % self.dir) + else: + os.mkdir(self.dir) + + def exists(self): + return os.path.isdir(self.dir) + + def clean(self): + cleandir(self.dir) + + def hasfile(self, filename): + return os.path.isfile(self.file(filename)) + + def echofile(self, filename, string): + filename = self.file(filename) + try: + f = file(filename, 'w') + print >>f, string + f.flush() + f.close() + except IOError,e: + sys.exit(e) -def remfile(file): - if os.path.isfile(file): - os.unlink(file) + def rmfile(self, filename): + filename = self.file(filename) + if os.path.isfile(filename): + os.unlink(filename) -def readval(filename): - file = open(filename, 'r') - value = file.readline().strip() - file.close() - return value + def readval(self, filename): + filename = self.file(filename) + f = file(filename, 'r') + value = f.readline().strip() + f.close() + return value + + def setstatus(self, string): + filename = self.file('.status') + try: + f = file(filename, 'a') + print >>f, string + f.flush() + f.close() + except IOError,e: + sys.exit(e) + + def getstatus(self): + filename = self.file('.status') + try: + f = file(filename, 'r') + except IOError, e: + return 'none' + + # fast forward to the end + for line in f: pass + + # the first word on the last line is the status + return line.split(' ')[0] + + def __str__(self): + return self.dir if __name__ == '__main__': rootdir = env.setdefault('ROOTDIR', os.getcwd()) @@ -97,29 +159,27 @@ if __name__ == '__main__': workbase = "/tmp/" workdir = joinpath(workbase, '%s.%s' % (env['USER'], pbs_jobid)) - - def echofile(filename, string): - try: - f = file(joinpath(outdir, filename), 'w') - print >>f, string - f.flush() - f.close() - except IOError,e: - sys.exit(e) + host = socket.gethostname() os.umask(0022) - echofile('.start', date()) - echofile('.pbs_jobid', pbs_jobid) - echofile('.pbs_jobname', pbs_jobid) - echofile('.host', socket.gethostname()) + jobdir = JobDir(outdir) + + started = date() + jobdir.echofile('.running', started) + jobdir.rmfile('.queued') + jobdir.echofile('.pbs_jobid', pbs_jobid) + jobdir.echofile('.pbs_jobname', pbs_jobid) + jobdir.echofile('.host', host) + + jobdir.setstatus('running on %s on %s' % (host, started)) if os.path.isdir(workdir): cleandir(workdir) else: os.mkdir(workdir) - if os.path.isdir('/z/dist'): + if False and os.path.isdir('/z/dist'): sync = rsync() sync.delete = True sync.sudo = True @@ -130,13 +190,13 @@ if __name__ == '__main__': except OSError,e: sys.exit(e) - os.symlink(joinpath(outdir, 'output'), 'status.out') + os.symlink(jobdir.file('output'), 'status.out') args = [ joinpath(basedir, 'm5'), joinpath(basedir, 'run.py') ] if not len(args): sys.exit("no arguments") - print 'starting job... %s' % date() + print 'starting job... %s' % started print ' '.join(args) print sys.stdout.flush() @@ -145,7 +205,7 @@ if __name__ == '__main__': if not childpid: # Execute command sys.stdin.close() - fd = os.open(joinpath(outdir, "output"), + fd = os.open(jobdir.file("output"), os.O_WRONLY | os.O_CREAT | os.O_TRUNC) os.dup2(fd, sys.stdout.fileno()) os.dup2(fd, sys.stderr.fileno()) @@ -170,12 +230,15 @@ if __name__ == '__main__': thepid,ec = os.waitpid(childpid, 0) if ec: print 'Exit code ', ec - echofile('.failure', date()) + status = 'failure' else: - echofile('.success', date()) + status = 'success' done = 1 except OSError: pass - print '\njob complete... %s' % date() - echofile('.stop', date()) + complete = date() + print '\njob complete... %s' % complete + jobdir.echofile('.%s' % status, complete) + jobdir.rmfile('.running') + jobdir.setstatus('%s on %s' % (status, complete)) diff --git a/util/pbs/jobfile.py b/util/pbs/jobfile.py index 83eb81358..d36b5ee6d 100644 --- a/util/pbs/jobfile.py +++ b/util/pbs/jobfile.py @@ -26,67 +26,485 @@ # # Authors: Nathan Binkert -from os.path import expanduser, isfile, join as joinpath import sys -def crossproduct(options): - number = len(options) - indexes = [ 0 ] * number - maxes = [ len(opt) for opt in options ] - def next(): - for i in xrange(number - 1, -1, -1): - indexes[i] += 1 - if indexes[i] < maxes[i]: +class ternary(object): + def __new__(cls, *args): + if len(args) > 1: + raise TypeError, \ + '%s() takes at most 1 argument (%d given)' % \ + (cls.__name__, len(args)) + + if args: + if not isinstance(args[0], (bool, ternary)): + raise TypeError, \ + '%s() argument must be True, False, or Any' % \ + cls.__name__ + return args[0] + return super(ternary, cls).__new__(cls) + + def __bool__(self): + return True + + def __neg__(self): + return self + + def __eq__(self, other): + return True + + def __ne__(self, other): + return False + + def __str__(self): + return 'Any' + + def __repr__(self): + return 'Any' + +Any = ternary() + +class Flags(dict): + def __init__(self, *args, **kwargs): + super(Flags, self).__init__() + self.update(*args, **kwargs) + + def __getattr__(self, attr): + return self[attr] + + def __setattr__(self, attr, value): + self[attr] = value + + def __setitem__(self, item, value): + return super(Flags, self).__setitem__(item, ternary(value)) + + def __getitem__(self, item): + if item not in self: + return False + return super(Flags, self).__getitem__(item) + + def update(self, *args, **kwargs): + for arg in args: + if isinstance(arg, Flags): + super(Flags, self).update(arg) + elif isinstance(arg, dict): + for key,val in kwargs.iteritems(): + self[key] = val + else: + raise AttributeError, \ + 'flags not of type %s or %s, but %s' % \ + (Flags, dict, type(arg)) + + for key,val in kwargs.iteritems(): + self[key] = val + + def match(self, *args, **kwargs): + match = Flags(*args, **kwargs) + + for key,value in match.iteritems(): + if self[key] != value: return False - indexes[i] = 0 return True - done = False - while not done: - result = [] - for i in xrange(number): - result.append(options[i][indexes[i]]) - yield result - done = next() - -class JobFile(object): - def __init__(self, jfile): - self.data = {} - jfile = expanduser(jfile) - if not isfile(jfile): - for p in sys.path: - if isfile(joinpath(p, jfile)): - jfile = joinpath(p, jfile) - break - - execfile(jfile, self.data) - self.options = self.data['options'] - self.environment = self.data['environment'] - self.jobinfo = {} - self.jobs = [] - for job in crossproduct(self.options): - jobname = '.'.join([ id[0] for id in job ]) - self.jobs.append(jobname) - list = [] - for info in job: - for item in info[1:]: - list.append(item) - self.jobinfo[jobname] = list - - def env(self, jobname): - env = {} - for key,val in self.jobinfo[jobname]: - env[key] = val - - for key,val in self.environment: - env[key] = val - return env - - def printinfo(self, jobname): - print '%s:' % jobname - for key,val in self.jobinfo[jobname]: - print ' %s = %s' % (key, val) - - for key,val in self.environment: - print ' %s = %s' % (key, val) +def crossproduct(items): + if not isinstance(items, (list, tuple)): + raise AttributeError, 'crossproduct works only on sequences' + + if not items: + yield None + return + + current = items[0] + remainder = items[1:] + + if not hasattr(current, '__iter__'): + current = [ current ] + + for item in current: + for rem in crossproduct(remainder): + data = [ item ] + if rem: + data += rem + yield data + +def flatten(items): + if not isinstance(items, (list, tuple)): + yield items + return + + for item in items: + for flat in flatten(item): + yield flat + +class Data(object): + def __init__(self, name, desc, **kwargs): + self.name = name + self.desc = desc + self.system = None + self.flags = Flags() + self.env = {} + for k,v in kwargs.iteritems(): + setattr(self, k, v) + + def update(self, obj): + if not isinstance(obj, Data): + raise AttributeError, "can only update from Data object" + + self.env.update(obj.env) + self.flags.update(obj.flags) + if obj.system: + if self.system and self.system != obj.system: + raise AttributeError, \ + "conflicting values for system: '%s'/'%s'" % \ + (self.system, obj.system) + self.system = obj.system + + def printinfo(self): + if self.name: + print 'name: %s' % self.name + if self.desc: + print 'desc: %s' % self.desc + if self.system: + print 'system: %s' % self.system + + def printverbose(self): + print 'flags:' + keys = self.flags.keys() + keys.sort() + for key in keys: + print ' %s = %s' % (key, self.flags[key]) + print 'env:' + keys = self.env.keys() + keys.sort() + for key in keys: + print ' %s = %s' % (key, self.env[key]) + print + + def __str__(self): + return self.name + +class Job(Data): + def __init__(self, options): + super(Job, self).__init__('', '') + self.setoptions(options) + + self.checkpoint = False + opts = [] + for opt in options: + cpt = opt.group.checkpoint + if not cpt: + self.checkpoint = True + continue + if isinstance(cpt, Option): + opt = cpt.clone(suboptions=False) + else: + opt = opt.clone(suboptions=False) + + opts.append(opt) + + if not opts: + self.checkpoint = False + + if self.checkpoint: + self.checkpoint = Job(opts) + + def clone(self): + return Job(self.options) + + def __getattribute__(self, attr): + if attr == 'name': + names = [ ] + for opt in self.options: + if opt.name: + names.append(opt.name) + return ':'.join(names) + + if attr == 'desc': + descs = [ ] + for opt in self.options: + if opt.desc: + descs.append(opt.desc) + return ', '.join(descs) + + return super(Job, self).__getattribute__(attr) + + def setoptions(self, options): + config = options[0].config + for opt in options: + if opt.config != config: + raise AttributeError, \ + "All options are not from the same Configuration" + + self.config = config + self.groups = [ opt.group for opt in options ] + self.options = options + + self.update(self.config) + for group in self.groups: + self.update(group) + + for option in self.options: + self.update(option) + if option._suboption: + self.update(option._suboption) + + def printinfo(self): + super(Job, self).printinfo() + if self.checkpoint: + print 'checkpoint: %s' % self.checkpoint.name + print 'config: %s' % self.config.name + print 'groups: %s' % [ g.name for g in self.groups ] + print 'options: %s' % [ o.name for o in self.options ] + super(Job, self).printverbose() + +class SubOption(Data): + def __init__(self, name, desc, **kwargs): + super(SubOption, self).__init__(name, desc, **kwargs) + self.number = None + +class Option(Data): + def __init__(self, name, desc, **kwargs): + super(Option, self).__init__(name, desc, **kwargs) + self._suboptions = [] + self._suboption = None + self.number = None + + def __getattribute__(self, attr): + if attr == 'name': + name = self.__dict__[attr] + if self._suboption is not None: + name = '%s:%s' % (name, self._suboption.name) + return name + + if attr == 'desc': + desc = self.__dict__[attr] + if self._suboption is not None: + desc = '%s, %s' % (desc, self._suboption.desc) + return desc + + return super(Option, self).__getattribute__(attr) + + def suboption(self, name, desc, **kwargs): + subo = SubOption(name, desc, **kwargs) + subo.config = self.config + subo.group = self.group + subo.option = self + subo.number = len(self._suboptions) + self._suboptions.append(subo) + return subo + + def clone(self, suboptions=True): + option = Option(self.__dict__['name'], self.__dict__['desc']) + option.update(self) + option.group = self.group + option.config = self.config + option.number = self.number + if suboptions: + option._suboptions.extend(self._suboptions) + option._suboption = self._suboption + return option + + def subopts(self): + if not self._suboptions: + return [ self ] + + subopts = [] + for subo in self._suboptions: + option = self.clone() + option._suboption = subo + subopts.append(option) + + return subopts + + def printinfo(self): + super(Option, self).printinfo() + print 'config: %s' % self.config.name + super(Option, self).printverbose() + +class Group(Data): + def __init__(self, name, desc, **kwargs): + super(Group, self).__init__(name, desc, **kwargs) + self._options = [] + self.checkpoint = False + self.number = None + + def option(self, name, desc, **kwargs): + opt = Option(name, desc, **kwargs) + opt.config = self.config + opt.group = self + opt.number = len(self._options) + self._options.append(opt) + return opt + + def options(self): + return self._options + + def subopts(self): + subopts = [] + for opt in self._options: + for subo in opt.subopts(): + subopts.append(subo) + return subopts + + def printinfo(self): + super(Group, self).printinfo() + print 'config: %s' % self.config.name + print 'options: %s' % [ o.name for o in self._options ] + super(Group, self).printverbose() + +class Configuration(Data): + def __init__(self, name, desc, **kwargs): + super(Configuration, self).__init__(name, desc, **kwargs) + self._groups = [] + + def group(self, name, desc, **kwargs): + grp = Group(name, desc, **kwargs) + grp.config = self + grp.number = len(self._groups) + self._groups.append(grp) + return grp + + def groups(self, flags=Flags(), sign=True): + if not flags: + return self._groups + + return [ grp for grp in self._groups if sign ^ grp.flags.match(flags) ] + + def checkchildren(self, kids): + for kid in kids: + if kid.config != self: + raise AttributeError, "child from the wrong configuration" + + def sortgroups(self, groups): + groups = [ (grp.number, grp) for grp in groups ] + groups.sort() + return [ grp[1] for grp in groups ] + + def options(self, groups = None, checkpoint = False): + if groups is None: + groups = self._groups + self.checkchildren(groups) + groups = self.sortgroups(groups) + if checkpoint: + groups = [ grp for grp in groups if grp.checkpoint ] + optgroups = [ g.options() for g in groups ] + else: + optgroups = [ g.subopts() for g in groups ] + for options in crossproduct(optgroups): + for opt in options: + cpt = opt.group.checkpoint + if not isinstance(cpt, bool) and cpt != opt: + if checkpoint: + break + else: + yield options + else: + if checkpoint: + yield options + + def checkpoints(self, groups = None): + for options in self.options(groups, True): + yield Job(options) + + def jobs(self, groups = None): + for options in self.options(groups, False): + yield Job(options) + + def alljobs(self, groups = None): + for options in self.options(groups, True): + yield Job(options) + for options in self.options(groups, False): + yield Job(options) + + def find(self, jobname): + for job in self.alljobs(): + if job.name == jobname: + return job + else: + raise AttributeError, "job '%s' not found" % jobname + + def job(self, options): + self.checkchildren(options) + options = [ (opt.group.number, opt) for opt in options ] + options.sort() + options = [ opt[1] for opt in options ] + job = Job(options) + return job + + def printinfo(self): + super(Configuration, self).printinfo() + print 'groups: %s' % [ g.name for g in self._grouips ] + super(Configuration, self).printverbose() + +def JobFile(jobfile): + from os.path import expanduser, isfile, join as joinpath + filename = expanduser(jobfile) + + # Can't find filename in the current path, search sys.path + if not isfile(filename): + for path in sys.path: + testname = joinpath(path, filename) + if isfile(testname): + filename = testname + break + else: + raise AttributeError, \ + "Could not find file '%s'" % jobfile + + data = {} + execfile(filename, data) + if 'conf' not in data: + raise ImportError, 'cannot import name conf from %s' % jobfile + conf = data['conf'] + import jobfile + if not isinstance(conf, Configuration): + raise AttributeError, \ + 'conf in jobfile: %s (%s) is not type %s' % \ + (jobfile, type(conf), Configuration) + return conf + +if __name__ == '__main__': + from jobfile import * + import sys + + usage = 'Usage: %s [-b] [-c] [-v] <jobfile>' % sys.argv[0] + + try: + import getopt + opts, args = getopt.getopt(sys.argv[1:], '-bcv') + except getopt.GetoptError: + sys.exit(usage) + + if len(args) != 1: + raise AttributeError, usage + + both = False + checkpoint = False + verbose = False + for opt,arg in opts: + if opt == '-b': + both = True + checkpoint = True + if opt == '-c': + checkpoint = True + if opt == '-v': + verbose = True + + jobfile = args[0] + conf = JobFile(jobfile) + + if both: + gen = conf.alljobs() + elif checkpoint: + gen = conf.checkpoints() + else: + gen = conf.jobs() + + for job in gen: + if not verbose: + cpt = '' + if job.checkpoint: + cpt = job.checkpoint.name + print job.name, cpt + else: + job.printinfo() diff --git a/util/pbs/send.py b/util/pbs/send.py index ecb0be0ec..c66fb1c05 100755 --- a/util/pbs/send.py +++ b/util/pbs/send.py @@ -96,7 +96,7 @@ Usage: try: import getopt - opts, args = getopt.getopt(sys.argv[1:], '-cd:efhj:lq:v') + opts, args = getopt.getopt(sys.argv[1:], '-CRcd:efhj:lq:v') except getopt.GetoptError: sys.exit(usage) @@ -107,13 +107,18 @@ force = False listonly = False queue = '' verbose = False -rootdir = nfspath(os.getcwd()) -jfile = 'test.py' +jfile = 'Base/test.py' +docpts = False +doruns = True +runflag = False + for opt,arg in opts: + if opt == '-C': + docpts = True + if opt == '-R': + runflag = True if opt == '-c': clean = True - if opt == '-d': - rootdir = arg if opt == '-e': onlyecho = True if opt == '-f': @@ -130,95 +135,123 @@ for opt,arg in opts: if opt == '-v': verbose = True -basedir = joinpath(rootdir, 'Base') -linkdir = joinpath(rootdir, 'Link') +if docpts: + doruns = runflag for arg in args: exprs.append(re.compile(arg)) -if not listonly and not onlyecho and isdir(linkdir): +import jobfile, pbs +from job import JobDir, date + +conf = jobfile.JobFile(jfile) + +if not listonly and not onlyecho and isdir(conf.linkdir): if verbose: print 'Checking for outdated files in Link directory' - syncdir(linkdir, basedir) + syncdir(conf.linkdir, conf.basedir) -import job, jobfile, pbs +jobnames = {} +joblist = [] -test = jobfile.JobFile(joinpath(basedir, jfile)) +if docpts and doruns: + gen = conf.alljobs() +elif docpts: + gen = conf.checkpoints() +elif doruns: + gen = conf.jobs() -joblist = [] -for jobname in test.jobs: - if not exprs: - joblist.append(jobname) +for job in gen: + if job.name in jobnames: continue - for expr in exprs: - if expr.match(jobname): - joblist.append(jobname) - break + if exprs: + for expr in exprs: + if expr.match(job.name): + joblist.append(job) + break + else: + joblist.append(job) if listonly: if verbose: - for jobname in joblist: - test.printinfo(jobname) + for job in joblist: + job.printinfo() else: - for jobname in joblist: - print jobname + for job in joblist: + print job.name sys.exit(0) if not onlyecho: - jl = [] - for jobname in joblist: - jobdir = joinpath(rootdir, jobname) - if os.path.exists(jobname): + newlist = [] + for job in joblist: + jobdir = JobDir(joinpath(conf.rootdir, job.name)) + if jobdir.exists(): if not force: - if os.path.isfile(joinpath(jobdir, '.success')): + status = jobdir.getstatus() + if status == 'queued': + continue + + if status == 'running': continue - if os.path.isfile(joinpath(jobdir, '.start')) and \ - not os.path.isfile(joinpath(jobdir, '.stop')): + if status == 'success': continue if not clean: - sys.exit('job directory not clean!') + sys.exit('job directory %s not clean!' % jobdir) - job.cleandir(jobdir) - else: - os.mkdir(jobdir) - jl.append(jobname) - joblist = jl - -def setname(jobid, jobname): - # since pbs can handle jobnames of 15 characters or less, don't - # use the raj hack. - if len(jobname) <= 15: - return - - import socket - s = socket.socket() - # Connect to pbs.pool and send the jobid/jobname pair to port - # 24465 (Raj didn't realize that there are only 64k ports and - # setup inetd to point to port 90001) - s.connect(("pbs.pool", 24465)) - s.send("%s %s\n" % (jobid, jobname)) - s.close() - -for jobname in joblist: - jobdir = joinpath(rootdir, jobname) - - if not onlyecho and not os.path.isdir(jobdir): - sys.exit('%s is not a directory. Cannot build job' % jobdir) - - print 'Job name: %s' % jobname + jobdir.clean() + newlist.append(job) + joblist = newlist + +class NameHack(object): + def __init__(self, host='pbs.pool', port=24465): + self.host = host + self.port = port + self.socket = None + + def setname(self, jobid, jobname): + try: + jobid = int(jobid) + except ValueError: + jobid = int(jobid.strip().split('.')[0]) + + jobname = jobname.strip() + # since pbs can handle jobnames of 15 characters or less, + # don't use the raj hack. + if len(jobname) <= 15: + return + + if self.socket is None: + import socket + self.socket = socket.socket() + # Connect to pbs.pool and send the jobid/jobname pair to port + # 24465 (Raj didn't realize that there are only 64k ports and + # setup inetd to point to port 90001) + self.socket.connect((self.host, self.port)) + + self.socket.send("%s %s\n" % (jobid, jobname)) + +namehack = NameHack() + +for job in joblist: + jobdir = JobDir(joinpath(conf.rootdir, job.name)) + + if not onlyecho: + jobdir.create() + + print 'Job name: %s' % job.name print 'Job directory: %s' % jobdir qsub = pbs.qsub() qsub.pbshost = 'simpool.eecs.umich.edu' - qsub.stdout = joinpath(jobdir, 'jobout') - qsub.name = jobname[:15] + qsub.stdout = jobdir.file('jobout') + qsub.name = job.name[:15] qsub.join = True qsub.node_type = 'FAST' - qsub.env['ROOTDIR'] = rootdir - qsub.env['JOBNAME'] = jobname + qsub.env['ROOTDIR'] = conf.rootdir + qsub.env['JOBNAME'] = job.name if len(queue): qsub.queue = queue qsub.build(joinpath(progpath, 'job.py')) @@ -231,6 +264,9 @@ for jobname in joblist: if ec == 0: jobid = qsub.result print 'PBS Jobid: %s' % jobid - setname(jobid, jobname) + namehack.setname(jobid, job.name) + queued = date() + jobdir.echofile('.queued', queued) + jobdir.setstatus('queued on %s' % queued) else: print 'PBS Failed' |