summaryrefslogtreecommitdiff
path: root/util/batch
diff options
context:
space:
mode:
Diffstat (limited to 'util/batch')
-rw-r--r--util/batch/batch.py249
-rwxr-xr-xutil/batch/job.py246
-rw-r--r--util/batch/jobfile.py539
-rwxr-xr-xutil/batch/send.py306
4 files changed, 1340 insertions, 0 deletions
diff --git a/util/batch/batch.py b/util/batch/batch.py
new file mode 100644
index 000000000..91d354e97
--- /dev/null
+++ b/util/batch/batch.py
@@ -0,0 +1,249 @@
+# Copyright (c) 2006 The Regents of The University of Michigan
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Kevin Lim
+
+import os, popen2, re, sys
+
+class MyPOpen(object):
+ def __init__(self, cmd, input = None, output = None, bufsize = -1):
+ self.status = -1
+
+ if input is None:
+ p2c_read, p2c_write = os.pipe()
+ self.tochild = os.fdopen(p2c_write, 'w', bufsize)
+ else:
+ p2c_write = None
+ if isinstance(input, file):
+ p2c_read = input.fileno()
+ elif isinstance(input, str):
+ input = file(input, 'r')
+ p2c_read = input.fileno()
+ elif isinstance(input, int):
+ p2c_read = input
+ else:
+ raise AttributeError
+
+ if output is None:
+ c2p_read, c2p_write = os.pipe()
+ self.fromchild = os.fdopen(c2p_read, 'r', bufsize)
+ else:
+ c2p_read = None
+ if isinstance(output, file):
+ c2p_write = output.fileno()
+ elif isinstance(output, str):
+ output = file(output, 'w')
+ c2p_write = output.fileno()
+ elif isinstance(output, int):
+ c2p_write = output
+ else:
+ raise AttributeError
+
+ self.pid = os.fork()
+ if self.pid == 0:
+ os.dup2(p2c_read, sys.stdin.fileno())
+ os.dup2(c2p_write, sys.stdout.fileno())
+ os.dup2(c2p_write, sys.stderr.fileno())
+ try:
+ os.execvp(cmd[0], cmd)
+ finally:
+ os._exit(1)
+
+ os.close(p2c_read)
+ os.close(c2p_write)
+
+ def poll(self):
+ if self.status < 0:
+ pid, status = os.waitpid(self.pid, os.WNOHANG)
+ if pid == self.pid:
+ self.status = status
+ return self.status
+
+ def wait(self):
+ if self.status < 0:
+ pid, status = os.waitpid(self.pid, 0)
+ if pid == self.pid:
+ self.status = status
+ return self.status
+
+
+class oarsub:
+ def __init__(self):
+ self.walltime = None
+ self.queue = None
+ self.properties = None
+
+ # OAR 2.0 parameters only!
+ self.name = None
+ self.afterok = None
+ self.notify = None
+ self.stderr = None
+ self.stdout = None
+
+
+ self.oarhost = None
+ self.oarsub = 'oarsub'
+
+ self.jobid = re.compile('IdJob = (\S+)')
+ #self.outfile = open("jobnames.dat", "a+")
+
+ def build(self, script, args = []):
+ self.cmd = [ self.oarsub ]
+
+ print "args:", args
+ print "script:", script
+ if self.properties:
+ self.cmd.append('-p"%s"' % self.properties )
+
+ if self.queue:
+ self.cmd.append('-q "%s"' % self.queue)
+
+ if self.walltime:
+ self.cmd.append('-l walltime=%s' % self.walltime)
+
+ if script[0] != "/":
+ self.script = os.getcwd()
+ else:
+ self.script = script
+
+ self.cmd.extend(args)
+ self.cmd.append(self.script)
+ #cmd = [ 'ssh', '-x', self.oarhost, '"cd %s; %s"' % (os.getcwd(), self.command) ]
+ self.command = ' '.join(self.cmd)
+
+ print "command: [%s]" % self.command
+
+ def do(self):
+ oar = MyPOpen(self.cmd)
+ self.result = oar.fromchild.read()
+ ec = oar.wait()
+
+ if ec != 0 and self.oarhost:
+ pstdin, pstdout = os.popen4(self.command)
+ self.result = pstdout.read()
+
+ jobid = self.jobid.match(self.result)
+ if jobid == None:
+ print "Couldn't get jobid from [%s]" % self.result
+ sys.exit(1)
+ else:
+ #self.outfile.write("%d %s\n" %(int(jobid.group(1)), self.name));
+ #self.outfile.flush()
+ self.result = jobid.group(1)
+
+ return 0
+
+class qsub:
+ def __init__(self):
+ self.afterok = None
+ self.hold = False
+ self.join = False
+ self.keep_stdout = False
+ self.keep_stderr = False
+ self.node_type = None
+ self.mail_abort = False
+ self.mail_begin = False
+ self.mail_end = False
+ self.name = None
+ self.stdout = None
+ self.priority = None
+ self.queue = None
+ self.pbshost = None
+ self.qsub = 'qsub'
+ self.env = {}
+
+ def build(self, script, args = []):
+ self.cmd = [ self.qsub ]
+
+ if self.env:
+ arg = '-v'
+ arg += ','.join([ '%s=%s' % i for i in self.env.iteritems() ])
+ self.cmd.append(arg)
+
+ if self.hold:
+ self.cmd.append('-h')
+
+ if self.stdout:
+ self.cmd.append('-olocalhost:' + self.stdout)
+
+ if self.keep_stdout and self.keep_stderr:
+ self.cmd.append('-koe')
+ elif self.keep_stdout:
+ self.cmd.append('-ko')
+ elif self.keep_stderr:
+ self.cmd.append('-ke')
+ else:
+ self.cmd.append('-kn')
+
+ if self.join:
+ self.cmd.append('-joe')
+
+ if self.node_type:
+ self.cmd.append('-lnodes=' + self.node_type)
+
+ if self.mail_abort or self.mail_begin or self.mail_end:
+ flags = ''
+ if self.mail_abort:
+ flags.append('a')
+ if self.mail_begin:
+ flags.append('b')
+ if self.mail_end:
+ flags.append('e')
+ if len(flags):
+ self.cmd.append('-m ' + flags)
+ else:
+ self.cmd.append('-mn')
+
+ if self.name:
+ self.cmd.append("-N%s" % self.name)
+
+ if self.priority:
+ self.cmd.append('-p' + self.priority)
+
+ if self.queue:
+ self.cmd.append('-q' + self.queue)
+
+ if self.afterok:
+ self.cmd.append('-Wdepend=afterok:%s' % self.afterok)
+
+ self.cmd.extend(args)
+ self.script = script
+ self.command = ' '.join(self.cmd + [ self.script ])
+
+ def do(self):
+ pbs = MyPOpen(self.cmd + [ self.script ])
+ self.result = pbs.fromchild.read()
+ ec = pbs.wait()
+
+ if ec != 0 and self.pbshost:
+ cmd = ' '.join(self.cmd + [ '-' ])
+ cmd = [ 'ssh', '-x', self.pbshost, cmd ]
+ self.command = ' '.join(cmd)
+ ssh = MyPOpen(cmd, input = self.script)
+ self.result = ssh.fromchild.read()
+ ec = ssh.wait()
+
+ return ec
diff --git a/util/batch/job.py b/util/batch/job.py
new file mode 100755
index 000000000..9d7ecca8c
--- /dev/null
+++ b/util/batch/job.py
@@ -0,0 +1,246 @@
+#!/usr/bin/env python
+# Copyright (c) 2006 The Regents of The University of Michigan
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Kevin Lim
+
+import os, os.path, shutil, signal, socket, sys
+from os import environ as env
+from os.path import join as joinpath, expanduser
+
+def date():
+ import time
+ return time.strftime('%a %b %e %H:%M:%S %Z %Y', time.localtime())
+
+def cleandir(dir):
+ for root, dirs, files in os.walk(dir, False):
+ for name in files:
+ os.remove(joinpath(root, name))
+ for name in dirs:
+ os.rmdir(joinpath(root, name))
+
+class rsync:
+ def __init__(self):
+ self.sudo = False
+ self.rsync = 'rsync'
+ self.compress = False
+ self.archive = True
+ self.delete = False
+ self.options = ''
+
+ def do(self, src, dst):
+ args = []
+ if self.sudo:
+ args.append('sudo')
+
+ args.append(self.rsync)
+ if (self.archive):
+ args.append('-a')
+ if (self.compress):
+ args.append('-z')
+ if (self.delete):
+ args.append('--delete')
+ if len(self.options):
+ args.append(self.options)
+ args.append(src)
+ args.append(dst)
+
+ return os.spawnvp(os.P_WAIT, args[0], args)
+
+class JobDir(object):
+ def __init__(self, dir):
+ self.dir = dir
+
+ def file(self, filename):
+ return joinpath(self.dir, filename)
+
+ def create(self):
+ if os.path.exists(self.dir):
+ if not os.path.isdir(self.dir):
+ sys.exit('%s is not a directory. Cannot build job' % self.dir)
+ else:
+ os.mkdir(self.dir)
+
+ def exists(self):
+ return os.path.isdir(self.dir)
+
+ def clean(self):
+ cleandir(self.dir)
+
+ def hasfile(self, filename):
+ return os.path.isfile(self.file(filename))
+
+ def echofile(self, filename, string):
+ filename = self.file(filename)
+ try:
+ f = file(filename, 'w')
+ print >>f, string
+ f.flush()
+ f.close()
+ except IOError,e:
+ sys.exit(e)
+
+ def rmfile(self, filename):
+ filename = self.file(filename)
+ if os.path.isfile(filename):
+ os.unlink(filename)
+
+ def readval(self, filename):
+ filename = self.file(filename)
+ f = file(filename, 'r')
+ value = f.readline().strip()
+ f.close()
+ return value
+
+ def setstatus(self, string):
+ filename = self.file('.status')
+ try:
+ f = file(filename, 'a')
+ print >>f, string
+ f.flush()
+ f.close()
+ except IOError,e:
+ sys.exit(e)
+
+ def getstatus(self):
+ filename = self.file('.status')
+ try:
+ f = file(filename, 'r')
+ except IOError, e:
+ return 'none'
+
+ # fast forward to the end
+ for line in f: pass
+
+ # the first word on the last line is the status
+ return line.split(' ')[0]
+
+ def __str__(self):
+ return self.dir
+
+if __name__ == '__main__':
+ import platform
+ binaries = { 'i686' : 'm5.i386',
+ 'x86_64' : 'm5.amd64' }
+ binary = binaries[platform.machine()]
+
+ cwd = os.getcwd()
+ rootdir = env.setdefault('ROOTDIR', os.path.dirname(cwd))
+ oar_jobid = int(env['OAR_JOBID'])
+ oar_jobname = os.path.basename(cwd)
+ #pbs_jobname = env['PBS_JOBNAME']
+ basedir = joinpath(rootdir, 'Base')
+ jobname = env.setdefault('JOBNAME', oar_jobname)
+ jobfile = env.setdefault('JOBFILE', joinpath(rootdir, 'Test.py'))
+ outdir = env.setdefault('OUTPUT_DIR', cwd)
+ env['POOLJOB'] = 'True'
+
+ if os.path.isdir("/work"):
+ workbase = "/work"
+ else:
+ workbase = "/tmp/"
+
+ workdir = joinpath(workbase, '%s.%s' % (env['USER'], oar_jobid))
+ host = socket.gethostname()
+
+ os.umask(0022)
+
+ jobdir = JobDir(outdir)
+
+ started = date()
+ jobdir.echofile('.running', started)
+ jobdir.rmfile('.queued')
+ jobdir.echofile('.host', host)
+
+ jobdir.setstatus('running on %s on %s' % (host, started))
+
+ if os.path.isdir(workdir):
+ cleandir(workdir)
+ else:
+ os.mkdir(workdir)
+
+ if False and os.path.isdir('/z/dist'):
+ sync = rsync()
+ sync.delete = True
+ sync.sudo = True
+ sync.do('poolfs::dist/m5/', '/z/dist/m5/')
+
+ try:
+ os.chdir(workdir)
+ except OSError,e:
+ sys.exit(e)
+
+ os.symlink(jobdir.file('output'), 'status.out')
+
+ args = [ joinpath(basedir, binary), joinpath(basedir, 'run.py') ]
+ if not len(args):
+ sys.exit("no arguments")
+
+ print 'starting job... %s' % started
+ print ' '.join(args)
+ print
+ sys.stdout.flush()
+
+ childpid = os.fork()
+ if not childpid:
+ # Execute command
+ sys.stdin.close()
+ fd = os.open(jobdir.file("output"),
+ os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
+ os.dup2(fd, sys.stdout.fileno())
+ os.dup2(fd, sys.stderr.fileno())
+ os.execvp(args[0], args)
+
+ def handler(signum, frame):
+ if childpid != 0:
+ os.kill(childpid, signum)
+
+ signal.signal(signal.SIGHUP, handler)
+ signal.signal(signal.SIGINT, handler)
+ signal.signal(signal.SIGQUIT, handler)
+ signal.signal(signal.SIGTERM, handler)
+ signal.signal(signal.SIGCONT, handler)
+ signal.signal(signal.SIGUSR1, handler)
+ signal.signal(signal.SIGUSR2, handler)
+
+ done = 0
+ while not done:
+ try:
+ thepid,ec = os.waitpid(childpid, 0)
+ if ec:
+ print 'Exit code ', ec
+ status = 'failure'
+ else:
+ status = 'success'
+ done = 1
+ except OSError:
+ pass
+
+ complete = date()
+ print '\njob complete... %s' % complete
+ jobdir.echofile('.%s' % status, complete)
+ jobdir.rmfile('.running')
+ jobdir.setstatus('%s on %s' % (status, complete))
diff --git a/util/batch/jobfile.py b/util/batch/jobfile.py
new file mode 100644
index 000000000..b78d7f3e1
--- /dev/null
+++ b/util/batch/jobfile.py
@@ -0,0 +1,539 @@
+# Copyright (c) 2006 The Regents of The University of Michigan
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Kevin Lim
+
+import sys
+
+class ternary(object):
+ def __new__(cls, *args):
+ if len(args) > 1:
+ raise TypeError, \
+ '%s() takes at most 1 argument (%d given)' % \
+ (cls.__name__, len(args))
+
+ if args:
+ if not isinstance(args[0], (bool, ternary)):
+ raise TypeError, \
+ '%s() argument must be True, False, or Any' % \
+ cls.__name__
+ return args[0]
+ return super(ternary, cls).__new__(cls)
+
+ def __bool__(self):
+ return True
+
+ def __neg__(self):
+ return self
+
+ def __eq__(self, other):
+ return True
+
+ def __ne__(self, other):
+ return False
+
+ def __str__(self):
+ return 'Any'
+
+ def __repr__(self):
+ return 'Any'
+
+Any = ternary()
+
+class Flags(dict):
+ def __init__(self, *args, **kwargs):
+ super(Flags, self).__init__()
+ self.update(*args, **kwargs)
+
+ def __getattr__(self, attr):
+ return self[attr]
+
+ def __setattr__(self, attr, value):
+ self[attr] = value
+
+ def __setitem__(self, item, value):
+ return super(Flags, self).__setitem__(item, ternary(value))
+
+ def __getitem__(self, item):
+ if item not in self:
+ return False
+ return super(Flags, self).__getitem__(item)
+
+ def update(self, *args, **kwargs):
+ for arg in args:
+ if isinstance(arg, Flags):
+ super(Flags, self).update(arg)
+ elif isinstance(arg, dict):
+ for key,val in kwargs.iteritems():
+ self[key] = val
+ else:
+ raise AttributeError, \
+ 'flags not of type %s or %s, but %s' % \
+ (Flags, dict, type(arg))
+
+ for key,val in kwargs.iteritems():
+ self[key] = val
+
+ def match(self, *args, **kwargs):
+ match = Flags(*args, **kwargs)
+
+ for key,value in match.iteritems():
+ if self[key] != value:
+ return False
+
+ return True
+
+def crossproduct(items):
+ if not isinstance(items, (list, tuple)):
+ raise AttributeError, 'crossproduct works only on sequences'
+
+ if not items:
+ yield None
+ return
+
+ current = items[0]
+ remainder = items[1:]
+
+ if not hasattr(current, '__iter__'):
+ current = [ current ]
+
+ for item in current:
+ for rem in crossproduct(remainder):
+ data = [ item ]
+ if rem:
+ data += rem
+ yield data
+
+def flatten(items):
+ if not isinstance(items, (list, tuple)):
+ yield items
+ return
+
+ for item in items:
+ for flat in flatten(item):
+ yield flat
+
+class Data(object):
+ def __init__(self, name, desc, **kwargs):
+ self.name = name
+ self.desc = desc
+ self.system = None
+ self.flags = Flags()
+ self.env = {}
+ for k,v in kwargs.iteritems():
+ setattr(self, k, v)
+
+ def update(self, obj):
+ if not isinstance(obj, Data):
+ raise AttributeError, "can only update from Data object"
+
+ self.env.update(obj.env)
+ self.flags.update(obj.flags)
+ if obj.system:
+ if self.system and self.system != obj.system:
+ raise AttributeError, \
+ "conflicting values for system: '%s'/'%s'" % \
+ (self.system, obj.system)
+ self.system = obj.system
+
+ def printinfo(self):
+ if self.name:
+ print 'name: %s' % self.name
+ if self.desc:
+ print 'desc: %s' % self.desc
+ if self.system:
+ print 'system: %s' % self.system
+
+ def printverbose(self):
+ print 'flags:'
+ keys = self.flags.keys()
+ keys.sort()
+ for key in keys:
+ print ' %s = %s' % (key, self.flags[key])
+ print 'env:'
+ keys = self.env.keys()
+ keys.sort()
+ for key in keys:
+ print ' %s = %s' % (key, self.env[key])
+ print
+
+ def __str__(self):
+ return self.name
+
+class Job(Data):
+ def __init__(self, options):
+ super(Job, self).__init__('', '')
+ self.setoptions(options)
+
+ self.checkpoint = False
+ opts = []
+ for opt in options:
+ cpt = opt.group.checkpoint
+ if not cpt:
+ self.checkpoint = True
+ continue
+ if isinstance(cpt, Option):
+ opt = cpt.clone(suboptions=False)
+ else:
+ opt = opt.clone(suboptions=False)
+
+ opts.append(opt)
+
+ if not opts:
+ self.checkpoint = False
+
+ if self.checkpoint:
+ self.checkpoint = Job(opts)
+
+ def clone(self):
+ return Job(self.options)
+
+ def __getattribute__(self, attr):
+ if attr == 'name':
+ names = [ ]
+ for opt in self.options:
+ if opt.name:
+ names.append(opt.name)
+ return ':'.join(names)
+
+ if attr == 'desc':
+ descs = [ ]
+ for opt in self.options:
+ if opt.desc:
+ descs.append(opt.desc)
+ return ', '.join(descs)
+
+ return super(Job, self).__getattribute__(attr)
+
+ def setoptions(self, options):
+ config = options[0].config
+ for opt in options:
+ if opt.config != config:
+ raise AttributeError, \
+ "All options are not from the same Configuration"
+
+ self.config = config
+ self.groups = [ opt.group for opt in options ]
+ self.options = options
+
+ self.update(self.config)
+ for group in self.groups:
+ self.update(group)
+
+ for option in self.options:
+ self.update(option)
+ if option._suboption:
+ self.update(option._suboption)
+
+ def printinfo(self):
+ super(Job, self).printinfo()
+ if self.checkpoint:
+ print 'checkpoint: %s' % self.checkpoint.name
+ print 'config: %s' % self.config.name
+ print 'groups: %s' % [ g.name for g in self.groups ]
+ print 'options: %s' % [ o.name for o in self.options ]
+ super(Job, self).printverbose()
+
+class SubOption(Data):
+ def __init__(self, name, desc, **kwargs):
+ super(SubOption, self).__init__(name, desc, **kwargs)
+ self.number = None
+
+class Option(Data):
+ def __init__(self, name, desc, **kwargs):
+ super(Option, self).__init__(name, desc, **kwargs)
+ self._suboptions = []
+ self._suboption = None
+ self.number = None
+
+ def __getattribute__(self, attr):
+ if attr == 'name':
+ name = self.__dict__[attr]
+ if self._suboption is not None:
+ name = '%s:%s' % (name, self._suboption.name)
+ return name
+
+ if attr == 'desc':
+ desc = [ self.__dict__[attr] ]
+ if self._suboption is not None and self._suboption.desc:
+ desc.append(self._suboption.desc)
+ return ', '.join(desc)
+
+
+ return super(Option, self).__getattribute__(attr)
+
+ def suboption(self, name, desc, **kwargs):
+ subo = SubOption(name, desc, **kwargs)
+ subo.config = self.config
+ subo.group = self.group
+ subo.option = self
+ subo.number = len(self._suboptions)
+ self._suboptions.append(subo)
+ return subo
+
+ def clone(self, suboptions=True):
+ option = Option(self.__dict__['name'], self.__dict__['desc'])
+ option.update(self)
+ option.group = self.group
+ option.config = self.config
+ option.number = self.number
+ if suboptions:
+ option._suboptions.extend(self._suboptions)
+ option._suboption = self._suboption
+ return option
+
+ def subopts(self):
+ if not self._suboptions:
+ return [ self ]
+
+ subopts = []
+ for subo in self._suboptions:
+ option = self.clone()
+ option._suboption = subo
+ subopts.append(option)
+
+ return subopts
+
+ def printinfo(self):
+ super(Option, self).printinfo()
+ print 'config: %s' % self.config.name
+ super(Option, self).printverbose()
+
+class Group(Data):
+ def __init__(self, name, desc, **kwargs):
+ super(Group, self).__init__(name, desc, **kwargs)
+ self._options = []
+ self.checkpoint = False
+ self.number = None
+
+ def option(self, name, desc, **kwargs):
+ opt = Option(name, desc, **kwargs)
+ opt.config = self.config
+ opt.group = self
+ opt.number = len(self._options)
+ self._options.append(opt)
+ return opt
+
+ def options(self):
+ return self._options
+
+ def subopts(self):
+ subopts = []
+ for opt in self._options:
+ for subo in opt.subopts():
+ subopts.append(subo)
+ return subopts
+
+ def printinfo(self):
+ super(Group, self).printinfo()
+ print 'config: %s' % self.config.name
+ print 'options: %s' % [ o.name for o in self._options ]
+ super(Group, self).printverbose()
+
+class Configuration(Data):
+ def __init__(self, name, desc, **kwargs):
+ super(Configuration, self).__init__(name, desc, **kwargs)
+ self._groups = []
+ self._posfilters = []
+ self._negfilters = []
+
+ def group(self, name, desc, **kwargs):
+ grp = Group(name, desc, **kwargs)
+ grp.config = self
+ grp.number = len(self._groups)
+ self._groups.append(grp)
+ return grp
+
+ def groups(self, flags=Flags(), sign=True):
+ if not flags:
+ return self._groups
+
+ return [ grp for grp in self._groups if sign ^ grp.flags.match(flags) ]
+
+ def checkchildren(self, kids):
+ for kid in kids:
+ if kid.config != self:
+ raise AttributeError, "child from the wrong configuration"
+
+ def sortgroups(self, groups):
+ groups = [ (grp.number, grp) for grp in groups ]
+ groups.sort()
+ return [ grp[1] for grp in groups ]
+
+ def options(self, groups = None, checkpoint = False):
+ if groups is None:
+ groups = self._groups
+ self.checkchildren(groups)
+ groups = self.sortgroups(groups)
+ if checkpoint:
+ groups = [ grp for grp in groups if grp.checkpoint ]
+ optgroups = [ g.options() for g in groups ]
+ else:
+ optgroups = [ g.subopts() for g in groups ]
+ for options in crossproduct(optgroups):
+ for opt in options:
+ cpt = opt.group.checkpoint
+ if not isinstance(cpt, bool) and cpt != opt:
+ if checkpoint:
+ break
+ else:
+ yield options
+ else:
+ if checkpoint:
+ yield options
+
+ def addfilter(self, filt, pos=True):
+ import re
+ filt = re.compile(filt)
+ if pos:
+ self._posfilters.append(filt)
+ else:
+ self._negfilters.append(filt)
+
+ def jobfilter(self, job):
+ for filt in self._negfilters:
+ if filt.match(job.name):
+ return False
+
+ if not self._posfilters:
+ return True
+
+ for filt in self._posfilters:
+ if filt.match(job.name):
+ return True
+
+ return False
+
+ def checkpoints(self, groups = None):
+ for options in self.options(groups, True):
+ job = Job(options)
+ if self.jobfilter(job):
+ yield job
+
+ def jobs(self, groups = None):
+ for options in self.options(groups, False):
+ job = Job(options)
+ if self.jobfilter(job):
+ yield job
+
+ def alljobs(self, groups = None):
+ for options in self.options(groups, True):
+ yield Job(options)
+ for options in self.options(groups, False):
+ yield Job(options)
+
+ def find(self, jobname):
+ for job in self.alljobs():
+ if job.name == jobname:
+ return job
+ else:
+ raise AttributeError, "job '%s' not found" % jobname
+
+ def job(self, options):
+ self.checkchildren(options)
+ options = [ (opt.group.number, opt) for opt in options ]
+ options.sort()
+ options = [ opt[1] for opt in options ]
+ job = Job(options)
+ return job
+
+ def printinfo(self):
+ super(Configuration, self).printinfo()
+ print 'groups: %s' % [ g.name for g in self._grouips ]
+ super(Configuration, self).printverbose()
+
+def JobFile(jobfile):
+ from os.path import expanduser, isfile, join as joinpath
+ filename = expanduser(jobfile)
+
+ # Can't find filename in the current path, search sys.path
+ if not isfile(filename):
+ for path in sys.path:
+ testname = joinpath(path, filename)
+ if isfile(testname):
+ filename = testname
+ break
+ else:
+ raise AttributeError, \
+ "Could not find file '%s'" % jobfile
+
+ data = {}
+ execfile(filename, data)
+ if 'conf' not in data:
+ raise ImportError, 'cannot import name conf from %s' % jobfile
+ conf = data['conf']
+ import jobfile
+ if not isinstance(conf, Configuration):
+ raise AttributeError, \
+ 'conf in jobfile: %s (%s) is not type %s' % \
+ (jobfile, type(conf), Configuration)
+ return conf
+
+if __name__ == '__main__':
+ from jobfile import *
+ import sys
+
+ usage = 'Usage: %s [-b] [-c] [-v] <jobfile>' % sys.argv[0]
+
+ try:
+ import getopt
+ opts, args = getopt.getopt(sys.argv[1:], '-bcv')
+ except getopt.GetoptError:
+ sys.exit(usage)
+
+ if len(args) != 1:
+ raise AttributeError, usage
+
+ both = False
+ checkpoint = False
+ verbose = False
+ for opt,arg in opts:
+ if opt == '-b':
+ both = True
+ checkpoint = True
+ if opt == '-c':
+ checkpoint = True
+ if opt == '-v':
+ verbose = True
+
+ jobfile = args[0]
+ conf = JobFile(jobfile)
+
+ if both:
+ gen = conf.alljobs()
+ elif checkpoint:
+ gen = conf.checkpoints()
+ else:
+ gen = conf.jobs()
+
+ for job in gen:
+ if not verbose:
+ cpt = ''
+ if job.checkpoint:
+ cpt = job.checkpoint.name
+ print job.name, cpt
+ else:
+ job.printinfo()
diff --git a/util/batch/send.py b/util/batch/send.py
new file mode 100755
index 000000000..e7bf1958f
--- /dev/null
+++ b/util/batch/send.py
@@ -0,0 +1,306 @@
+#!/usr/bin/env python
+# Copyright (c) 2006 The Regents of The University of Michigan
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Kevin Lim
+
+import os, os.path, re, socket, sys
+from os import environ as env, listdir
+from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
+from filecmp import cmp as filecmp
+from shutil import copy
+
+def nfspath(dir):
+ if dir.startswith('/.automount/'):
+ dir = '/n/%s' % dir[12:]
+ elif not dir.startswith('/n/'):
+ dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
+ return dir
+
+def syncdir(srcdir, destdir):
+ srcdir = normpath(srcdir)
+ destdir = normpath(destdir)
+ if not isdir(destdir):
+ sys.exit('destination directory "%s" does not exist' % destdir)
+
+ for root, dirs, files in os.walk(srcdir):
+ root = normpath(root)
+ prefix = os.path.commonprefix([root, srcdir])
+ root = root[len(prefix):]
+ if root.startswith('/'):
+ root = root[1:]
+ for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
+ dirs.remove(rem)
+
+ for entry in dirs:
+ newdir = joinpath(destdir, root, entry)
+ if not isdir(newdir):
+ os.mkdir(newdir)
+ print 'mkdir', newdir
+
+ for i,d in enumerate(dirs):
+ if islink(joinpath(srcdir, root, d)):
+ dirs[i] = joinpath(d, '.')
+
+ for entry in files:
+ dest = normpath(joinpath(destdir, root, entry))
+ src = normpath(joinpath(srcdir, root, entry))
+ if not isfile(dest) or not filecmp(src, dest):
+ print 'copy %s %s' % (dest, src)
+ copy(src, dest)
+
+progpath = nfspath(sys.path[0])
+progname = basename(sys.argv[0])
+usage = """\
+Usage:
+ %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
+ -c clean directory if job can be run
+ -C submit the checkpointing runs
+ -d Make jobs be dependent on the completion of the checkpoint runs
+ -e only echo pbs command info, don't actually send the job
+ -f force the job to run regardless of state
+ -q <queue> submit job to the named queue
+ -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
+ -v be verbose
+
+ %(progname)s [-j <jobfile>] -l [-v] <regexp>
+ -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
+ -l list job names, don't submit
+ -v be verbose (list job parameters)
+
+ %(progname)s -h
+ -h display this help
+""" % locals()
+
+try:
+ import getopt
+ opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
+except getopt.GetoptError:
+ sys.exit(usage)
+
+depend = False
+clean = False
+onlyecho = False
+exprs = []
+force = False
+listonly = False
+queue = ''
+verbose = False
+jfile = 'Test.py'
+docpts = False
+doruns = True
+runflag = False
+node_type = 'FAST'
+update = True
+
+for opt,arg in opts:
+ if opt == '-C':
+ docpts = True
+ if opt == '-c':
+ clean = True
+ if opt == '-d':
+ depend = True
+ if opt == '-e':
+ onlyecho = True
+ if opt == '-f':
+ force = True
+ if opt == '-h':
+ print usage
+ sys.exit(0)
+ if opt == '-j':
+ jfile = arg
+ if opt == '-l':
+ listonly = True
+ if opt == '-n':
+ update = False
+ if opt == '-q':
+ queue = arg
+ if opt == '-R':
+ runflag = True
+ if opt == '-t':
+ node_type = arg
+ if opt == '-v':
+ verbose = True
+
+if docpts:
+ doruns = runflag
+
+for arg in args:
+ exprs.append(re.compile(arg))
+
+import jobfile, batch
+from job import JobDir, date
+
+conf = jobfile.JobFile(jfile)
+
+if update and not listonly and not onlyecho and isdir(conf.linkdir):
+ if verbose:
+ print 'Checking for outdated files in Link directory'
+ if not isdir(conf.basedir):
+ os.mkdir(conf.basedir)
+ syncdir(conf.linkdir, conf.basedir)
+
+jobnames = {}
+joblist = []
+
+if docpts and doruns:
+ gen = conf.alljobs()
+elif docpts:
+ gen = conf.checkpoints()
+elif doruns:
+ gen = conf.jobs()
+
+for job in gen:
+ if job.name in jobnames:
+ continue
+
+ if exprs:
+ for expr in exprs:
+ if expr.match(job.name):
+ joblist.append(job)
+ break
+ else:
+ joblist.append(job)
+
+if listonly:
+ if verbose:
+ for job in joblist:
+ job.printinfo()
+ else:
+ for job in joblist:
+ print job.name
+ sys.exit(0)
+
+if not onlyecho:
+ newlist = []
+ for job in joblist:
+ jobdir = JobDir(joinpath(conf.rootdir, job.name))
+ if jobdir.exists():
+ if not force:
+ status = jobdir.getstatus()
+ if status == 'queued':
+ continue
+
+ if status == 'running':
+ continue
+
+ if status == 'success':
+ continue
+
+ if not clean:
+ sys.exit('job directory %s not clean!' % jobdir)
+
+ jobdir.clean()
+ newlist.append(job)
+ joblist = newlist
+
+class NameHack(object):
+ def __init__(self, host='pbs.pool', port=24465):
+ self.host = host
+ self.port = port
+ self.socket = None
+
+ def setname(self, jobid, jobname):
+ try:
+ jobid = int(jobid)
+ except ValueError:
+ jobid = int(jobid.strip().split('.')[0])
+
+ jobname = jobname.strip()
+ # since pbs can handle jobnames of 15 characters or less,
+ # don't use the raj hack.
+ if len(jobname) <= 15:
+ return
+
+ if self.socket is None:
+ import socket
+ self.socket = socket.socket()
+ # Connect to pbs.pool and send the jobid/jobname pair to port
+ # 24465 (Raj didn't realize that there are only 64k ports and
+ # setup inetd to point to port 90001)
+ self.socket.connect((self.host, self.port))
+
+ self.socket.send("%s %s\n" % (jobid, jobname))
+
+namehack = NameHack()
+
+rootdir = conf.rootdir
+script = joinpath(rootdir, 'Base', 'job.py')
+
+for job in joblist:
+ jobdir = JobDir(joinpath(rootdir, job.name))
+ if depend:
+ cptdir = JobDir(joinpath(rootdir, job.checkpoint.name))
+ path = str(cptdir)
+ if not isdir(path) or not isfile(joinpath(path, '.success')):
+ continue
+
+ cptjob = cptdir.readval('.batch_jobid')
+
+ if not onlyecho:
+ jobdir.create()
+ os.chdir(str(jobdir))
+ os.environ['PWD'] = str(jobdir)
+
+ print 'Job name: %s' % job.name
+ print 'Job directory: %s' % jobdir
+
+
+ qsub = batch.oarsub()
+ qsub.oarhost = 'poolfs.eecs.umich.edu'
+ #qsub.stdout = jobdir.file('jobout')
+ qsub.name = job.name
+ qsub.walltime = '50'
+ #qsub.join = True
+ #qsub.node_type = node_type
+ #qsub.env['ROOTDIR'] = conf.rootdir
+ #qsub.env['JOBNAME'] = job.name
+ #if depend:
+ # qsub.afterok = cptjob
+ #if queue:
+ # qsub.queue = queue
+ qsub.properties = "64bit = 'Yes' or 64bit = 'No'"
+ qsub.build(script)
+
+ if verbose:
+ print 'cwd: %s' % qsub.command
+ print 'PBS Command: %s' % qsub.command
+
+ if not onlyecho:
+ ec = qsub.do()
+ if ec == 0:
+ jobid = qsub.result
+ print 'OAR Jobid: %s' % jobid
+ #namehack.setname(jobid, job.name)
+ queued = date()
+ jobdir.echofile('.batch_jobid', jobid)
+ jobdir.echofile('.batch_jobname', job.name)
+ jobdir.echofile('.queued', queued)
+ jobdir.setstatus('queued on %s' % queued)
+ else:
+ print 'OAR Failed'
+ print
+ print