Package backend :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module backend.dispatcher

  1  import re 
  2  import os 
  3  import sys 
  4  import time 
  5  import fcntl 
  6  import Queue 
  7  import json 
  8  import subprocess 
  9  import multiprocessing 
 10   
 11  import ansible 
 12  import ansible.utils 
 13  from ansible import callbacks 
 14  from bunch import Bunch 
 15  from setproctitle import setproctitle 
 16  from IPy import IP 
 17   
 18  import errors 
 19  import mockremote 
 20  from callback import FrontendCallback 
 21   
 22  try: 
 23      import fedmsg 
 24  except ImportError: 
 25      pass  # fedmsg is optional 
 26   
 27   
28 -class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks):
29 30 """ playbook callbacks - quietly! """ 31
32 - def __init__(self, verbose=False):
33 super(SilentPlaybookCallbacks, self).__init__() 34 self.verbose = verbose
35
36 - def on_start(self):
37 callbacks.call_callback_module("playbook_on_start")
38
39 - def on_notify(self, host, handler):
40 callbacks.call_callback_module("playbook_on_notify", host, handler)
41
42 - def on_no_hosts_matched(self):
43 callbacks.call_callback_module("playbook_on_no_hosts_matched")
44
45 - def on_no_hosts_remaining(self):
46 callbacks.call_callback_module("playbook_on_no_hosts_remaining")
47
48 - def on_task_start(self, name, is_conditional):
49 callbacks.call_callback_module( 50 "playbook_on_task_start", name, is_conditional)
51
52 - def on_vars_prompt(self, varname, 53 private=True, prompt=None, encrypt=None, 54 confirm=False, salt_size=None, salt=None):
55 56 result = None 57 sys.stderr.write( 58 "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****\n") 59 60 callbacks.call_callback_module( 61 "playbook_on_vars_prompt", varname, private=private, 62 prompt=prompt, encrypt=encrypt, confirm=confirm, 63 salt_size=salt_size, salt=None) 64 65 return result
66
67 - def on_setup(self):
68 callbacks.call_callback_module("playbook_on_setup")
69
70 - def on_import_for_host(self, host, imported_file):
71 callbacks.call_callback_module( 72 "playbook_on_import_for_host", host, imported_file)
73
74 - def on_not_import_for_host(self, host, missing_file):
75 callbacks.call_callback_module( 76 "playbook_on_not_import_for_host", host, missing_file)
77
78 - def on_play_start(self, pattern):
79 callbacks.call_callback_module("playbook_on_play_start", pattern)
80
81 - def on_stats(self, stats):
82 callbacks.call_callback_module("playbook_on_stats", stats)
83 84
85 -class WorkerCallback(object):
86
87 - def __init__(self, logfile=None):
88 self.logfile = logfile
89
90 - def log(self, msg):
91 if self.logfile: 92 now = time.strftime("%F %T") 93 try: 94 with open(self.logfile, 'a') as lf: 95 fcntl.flock(lf, fcntl.LOCK_EX) 96 lf.write(str(now) + ': ' + msg + '\n') 97 fcntl.flock(lf, fcntl.LOCK_UN) 98 except (IOError, OSError), e: 99 sys.stderr.write("Could not write to logfile {0} - {1}\n" 100 .format(self.logfile, str(e)))
101 102
103 -class Worker(multiprocessing.Process):
104
105 - def __init__(self, opts, jobs, events, worker_num, 106 ip=None, create=True, callback=None, lock=None):
107 108 # base class initialization 109 multiprocessing.Process.__init__(self, name="worker-builder") 110 111 # job management stuff 112 self.jobs = jobs 113 # event queue for communicating back to dispatcher 114 self.events = events 115 self.worker_num = worker_num 116 self.ip = ip 117 self.opts = opts 118 self.kill_received = False 119 self.callback = callback 120 self.create = create 121 self.lock = lock 122 self.frontend_callback = FrontendCallback(opts) 123 if not self.callback: 124 self.logfile = os.path.join( 125 self.opts.worker_logdir, 126 "worker-{0}.log".format(self.worker_num)) 127 self.callback = WorkerCallback(logfile=self.logfile) 128 129 if ip: 130 self.callback.log("creating worker: {0}".format(ip)) 131 self.event("worker.create", "creating worker: {ip}", dict(ip=ip)) 132 else: 133 self.callback.log("creating worker: dynamic ip") 134 self.event("worker.create", "creating worker: dynamic ip")
135
136 - def event(self, topic, template, content=None):
137 """ Multi-purpose logging method. 138 139 Logs messages to two different destinations: 140 - To log file 141 - The internal "events" queue for communicating back to the 142 dispatcher. 143 - The fedmsg bus. Messages are posted asynchronously to a 144 zmq.PUB socket. 145 146 """ 147 148 content = content or {} 149 what = template.format(**content) 150 151 if self.ip: 152 who = "worker-{0}-{1}".format(self.worker_num, self.ip) 153 else: 154 who = "worker-{0}".format(self.worker_num) 155 156 self.callback.log("event: who: {0}, what: {1}".format(who, what)) 157 self.events.put({"when": time.time(), "who": who, "what": what}) 158 try: 159 content["who"] = who 160 content["what"] = what 161 if self.opts.fedmsg_enabled: 162 fedmsg.publish(modname="copr", topic=topic, msg=content) 163 # pylint: disable=W0703 164 except Exception, e: 165 # XXX - Maybe log traceback as well with traceback.format_exc() 166 self.callback.log("failed to publish message: {0}".format(e))
167
168 - def spawn_instance(self, retry=0):
169 """call the spawn playbook to startup/provision a building instance""" 170 171 self.callback.log("spawning instance begin") 172 start = time.time() 173 174 # Does not work, do not know why. See: 175 # https://groups.google.com/forum/#!topic/ansible-project/DNBD2oHv5k8 176 #stats = callbacks.AggregateStats() 177 #playbook_cb = SilentPlaybookCallbacks(verbose=False) 178 #runner_cb = callbacks.DefaultRunnerCallbacks() 179 # fixme - extra_vars to include ip as a var if we need to specify ips 180 # also to include info for instance type to handle the memory requirements of builds 181 # play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.spawn_playbook, 182 # callbacks=playbook_cb, runner_callbacks=runner_cb, 183 # remote_user="root", transport="ssh") 184 # play.run() 185 try: 186 result = subprocess.check_output( 187 "ansible-playbook -c ssh {0}".format(self.opts.spawn_playbook), 188 shell=True) 189 190 except subprocess.CalledProcessError as e: 191 result = e.output 192 sys.stderr.write("{0}\n".format(result)) 193 self.callback.log("CalledProcessError: {0}".format(result)) 194 # well mostly we run out of space in OpenStack, wait some time and 195 # try again 196 if retry < 3: 197 time.sleep(self.opts.sleeptime) 198 self.spawn_instance(retry + 1) 199 else: 200 # FIXME: this can't work and whole retry is implemented 201 # incorrectly, should use decorator instead 202 raise subprocess.CalledProcessError, None, sys.exc_info()[2] 203 self.callback.log("Raw output from playbook: {0}".format(result)) 204 match = re.search(r'IP=([^\{\}"]+)', result, re.MULTILINE) 205 206 if not match: 207 return None 208 209 ipaddr = match.group(1) 210 211 self.callback.log("spawning instance end") 212 self.callback.log("got instance ip: {0}".format(ipaddr)) 213 self.callback.log( 214 "Instance spawn/provision took {0} sec".format(time.time() - start)) 215 216 if self.ip: 217 return self.ip 218 219 # for i in play.SETUP_CACHE: 220 # if i =="localhost": 221 # continue 222 # return i 223 try: 224 IP(ipaddr) 225 return ipaddr 226 except ValueError: 227 # if we get here we"re in trouble 228 self.callback.log( 229 "No IP back from spawn_instance - dumping cache output") 230 self.callback.log(str(result)) 231 self.callback.log("Test spawn_instance playbook manually") 232 return None
233
234 - def terminate_instance(self, ip):
235 """call the terminate playbook to destroy the building instance""" 236 self.callback.log("terminate instance begin") 237 238 #stats = callbacks.AggregateStats() 239 #playbook_cb = SilentPlaybookCallbacks(verbose=False) 240 #runner_cb = callbacks.DefaultRunnerCallbacks() 241 # play = ansible.playbook.PlayBook(host_list=ip +",", stats=stats, playbook=self.opts.terminate_playbook, 242 # callbacks=playbook_cb, runner_callbacks=runner_cb, 243 # remote_user="root", transport="ssh") 244 # play.run() 245 subprocess.check_output( 246 "/usr/bin/ansible-playbook -c ssh -i '{0},' {1} ".format( 247 ip, self.opts.terminate_playbook), 248 shell=True) 249 250 self.callback.log("terminate instance end")
251
252 - def parse_job(self, jobfile):
253 # read the json of the job in 254 # break out what we need return a bunch of the info we need 255 try: 256 build = json.load(open(jobfile)) 257 except ValueError: 258 # empty file? 259 return None 260 jobdata = Bunch() 261 jobdata.pkgs = build["pkgs"].split(" ") 262 jobdata.repos = [r for r in build["repos"].split(" ") if r.strip()] 263 jobdata.chroot = build["chroot"] 264 jobdata.buildroot_pkgs = build["buildroot_pkgs"] 265 jobdata.memory_reqs = build["memory_reqs"] 266 if build["timeout"]: 267 jobdata.timeout = build["timeout"] 268 else: 269 jobdata.timeout = self.opts.timeout 270 jobdata.destdir = os.path.normpath( 271 os.path.join(self.opts.destdir, 272 build["copr"]["owner"]["name"], 273 build["copr"]["name"])) 274 275 jobdata.build_id = build["id"] 276 jobdata.results = os.path.join( 277 self.opts.results_baseurl, 278 build["copr"]["owner"]["name"], 279 build["copr"]["name"] + "/") 280 281 jobdata.copr_id = build["copr"]["id"] 282 jobdata.user_id = build["user_id"] 283 jobdata.user_name = build["copr"]["owner"]["name"] 284 jobdata.copr_name = build["copr"]["name"] 285 return jobdata
286 287 # maybe we move this to the callback?
288 - def post_to_frontend(self, data):
289 """send data to frontend""" 290 i = 10 291 while i > 0: 292 result = self.frontend_callback.post_to_frontend(data) 293 if not result: 294 self.callback.log(self.frontend_callback.msg) 295 i -= 1 296 time.sleep(5) 297 else: 298 i = 0 299 return result
300 301 # maybe we move this to the callback?
302 - def mark_started(self, job):
303 304 build = {"id": job.build_id, 305 "started_on": job.started_on, 306 "results": job.results, 307 "chroot": job.chroot, 308 "status": 3, # running 309 } 310 data = {"builds": [build]} 311 312 if not self.post_to_frontend(data): 313 raise errors.CoprWorkerError( 314 "Could not communicate to front end to submit status info")
315 316 # maybe we move this to the callback?
317 - def return_results(self, job):
318 319 self.callback.log( 320 "{0} status {1}. Took {2} seconds".format( 321 job.build_id, job.status, job.ended_on - job.started_on)) 322 323 build = { 324 "id": job.build_id, 325 "ended_on": job.ended_on, 326 "status": job.status, 327 "chroot": job.chroot, 328 } 329 330 data = {"builds": [build]} 331 332 if not self.post_to_frontend(data): 333 raise errors.CoprWorkerError( 334 "Could not communicate to front end to submit results") 335 336 os.unlink(job.jobfile)
337
338 - def run(self):
339 """ 340 Worker should startup and check if it can function 341 for each job it takes from the jobs queue 342 run opts.setup_playbook to create the instance 343 do the build (mockremote) 344 terminate the instance. 345 """ 346 347 setproctitle("worker {0}".format(self.worker_num)) 348 while not self.kill_received: 349 try: 350 jobfile = self.jobs.get() 351 except Queue.Empty: 352 break 353 354 # parse the job json into our info 355 job = self.parse_job(jobfile) 356 357 if job is None: 358 self.callback.log( 359 'jobfile {0} is mangled, please investigate'.format( 360 jobfile)) 361 362 time.sleep(self.opts.sleeptime) 363 continue 364 365 # FIXME 366 # this is our best place to sanity check the job before starting 367 # up any longer process 368 369 job.jobfile = jobfile 370 371 # spin up our build instance 372 if self.create: 373 try: 374 ip = self.spawn_instance() 375 if not ip: 376 raise errors.CoprWorkerError( 377 "No IP found from creating instance") 378 379 except ansible.errors.AnsibleError, e: 380 self.callback.log( 381 "failure to setup instance: {0}".format(e)) 382 383 raise 384 385 try: 386 # This assumes there are certs and a fedmsg config on disk 387 try: 388 if self.opts.fedmsg_enabled: 389 fedmsg.init( 390 name="relay_inbound", 391 cert_prefix="copr", 392 active=True) 393 394 except Exception, e: 395 self.callback.log( 396 "failed to initialize fedmsg: {0}".format(e)) 397 398 status = 1 # succeeded 399 job.started_on = time.time() 400 self.mark_started(job) 401 402 template = "build start: user:{user} copr:{copr}" \ 403 " build:{build} ip:{ip} pid:{pid}" 404 405 content = dict(user=job.user_name, copr=job.copr_name, 406 build=job.build_id, ip=ip, pid=self.pid) 407 self.event("build.start", template, content) 408 409 template = "chroot start: chroot:{chroot} user:{user}" \ 410 "copr:{copr} build:{build} ip:{ip} pid:{pid}" 411 412 content = dict(chroot=job.chroot, user=job.user_name, 413 copr=job.copr_name, build=job.build_id, 414 ip=ip, pid=self.pid) 415 416 self.event("chroot.start", template, content) 417 418 chroot_destdir = os.path.normpath( 419 job.destdir + '/' + job.chroot) 420 421 # setup our target dir locally 422 if not os.path.exists(chroot_destdir): 423 try: 424 os.makedirs(chroot_destdir) 425 except (OSError, IOError), e: 426 msg = "Could not make results dir" \ 427 " for job: {0} - {1}".format(chroot_destdir, 428 str(e)) 429 430 self.callback.log(msg) 431 status = 0 # fail 432 433 if status == 1: # succeeded 434 # FIXME 435 # need a plugin hook or some mechanism to check random 436 # info about the pkgs 437 # this should use ansible to download the pkg on the remote system 438 # and run a series of checks on the package before we 439 # start the build - most importantly license checks. 440 441 self.callback.log("Starting build: id={0} builder={1}" 442 " timeout={2} destdir={3}" 443 " chroot={4} repos={5}".format( 444 job.build_id, ip, 445 job.timeout, job.destdir, 446 job.chroot, str(job.repos))) 447 448 self.callback.log("building pkgs: {0}".format( 449 ' '.join(job.pkgs))) 450 451 try: 452 chroot_repos = list(job.repos) 453 chroot_repos.append(job.results + '/' + job.chroot) 454 chrootlogfile = "{0}/build-{1}.log".format( 455 chroot_destdir, job.build_id) 456 457 macros = { 458 "copr_username": job.user_name, 459 "copr_projectname": job.copr_name, 460 "vendor": "Fedora Project COPR ({0}/{1})".format( 461 job.user_name, job.copr_name) 462 } 463 464 mr = mockremote.MockRemote( 465 builder=ip, 466 timeout=job.timeout, 467 destdir=job.destdir, 468 chroot=job.chroot, 469 cont=True, 470 recurse=True, 471 repos=chroot_repos, 472 macros=macros, 473 lock=self.lock, 474 buildroot_pkgs=job.buildroot_pkgs, 475 callback=mockremote.CliLogCallBack( 476 quiet=True, logfn=chrootlogfile)) 477 478 mr.build_pkgs(job.pkgs) 479 480 except mockremote.MockRemoteError, e: 481 # record and break 482 self.callback.log("{0} - {1}".format(ip, e)) 483 status = 0 # failure 484 else: 485 # we can"t really trace back if we just fail normally 486 # check if any pkgs didn"t build 487 if mr.failed: 488 status = 0 # failure 489 490 self.callback.log( 491 "Finished build: id={0} builder={1}" 492 " timeout={2} destdir={3}" 493 " chroot={4} repos={5}".format( 494 job.build_id, ip, 495 job.timeout, job.destdir, 496 job.chroot, str(job.repos))) 497 498 job.ended_on = time.time() 499 500 job.status = status 501 self.return_results(job) 502 self.callback.log("worker finished build: {0}".format(ip)) 503 template = "build end: user:{user} copr:{copr} build:{build}" \ 504 " ip:{ip} pid:{pid} status:{status}" 505 506 content = dict(user=job.user_name, copr=job.copr_name, 507 build=job.build_id, ip=ip, pid=self.pid, 508 status=job.status) 509 self.event("build.end", template, content) 510 511 finally: 512 # clean up the instance 513 if self.create: 514 self.terminate_instance(ip)
515