1 import fcntl
2 import re
3 import os
4 import sys
5 import multiprocessing
6 import time
7 import Queue
8 import json
9 import mockremote
10 from callback import FrontendCallback
11 from bunch import Bunch
12 import errors
13 import ansible
14 import ansible.playbook
15 import ansible.errors
16 from ansible import callbacks
17 import requests
18 import subprocess
19 import string
20 import setproctitle
21 from IPy import IP
22
23 try:
24 import fedmsg
25 except ImportError:
26 pass
27
28
29
30
32 ''' playbook callbacks - quietly! '''
33
37
39 callbacks.call_callback_module('playbook_on_start')
40
42 callbacks.call_callback_module('playbook_on_notify', host, handler)
43
45 callbacks.call_callback_module('playbook_on_no_hosts_matched')
46
48 callbacks.call_callback_module('playbook_on_no_hosts_remaining')
49
51 callbacks.call_callback_module('playbook_on_task_start', name, is_conditional)
52
53 - def on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None):
54 result = None
55 sys.stderr.write("***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****\n")
56 callbacks.call_callback_module('playbook_on_vars_prompt', varname, private=private, prompt=prompt, encrypt=encrypt, confirm=confirm, salt_size=salt_size, salt=None)
57 return result
58
60 callbacks.call_callback_module('playbook_on_setup')
61
63 callbacks.call_callback_module('playbook_on_import_for_host', host, imported_file)
64
66 callbacks.call_callback_module('playbook_on_not_import_for_host', host, missing_file)
67
69 callbacks.call_callback_module('playbook_on_play_start', pattern)
70
72 callbacks.call_callback_module('playbook_on_stats', stats)
73
74
77 self.logfile = logfile
78
80 if self.logfile:
81 now = time.strftime('%F %T')
82 try:
83 with open(self.logfile, 'a') as lf:
84 fcntl.flock(lf, fcntl.LOCK_EX)
85 lf.write(str(now) + ': ' + msg + '\n')
86 fcntl.flock(lf, fcntl.LOCK_UN)
87 except (IOError, OSError), e:
88 print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e))
89
90
91 -class Worker(multiprocessing.Process):
92 - def __init__(self, opts, jobs, events, worker_num, ip=None, create=True, callback=None):
93
94
95 multiprocessing.Process.__init__(self, name="worker-builder")
96
97
98
99 self.jobs = jobs
100 self.events = events
101 self.worker_num = worker_num
102 self.ip = ip
103 self.opts = opts
104 self.kill_received = False
105 self.callback = callback
106 self.create = create
107 self.frontend_callback = FrontendCallback(opts)
108 if not self.callback:
109 self.logfile = self.opts.worker_logdir + '/worker-%s.log' % self.worker_num
110 self.callback = WorkerCallback(logfile = self.logfile)
111
112 if ip:
113 self.callback.log('creating worker: %s' % ip)
114 self.event('worker.create', 'creating worker: {ip}', dict(ip=ip))
115 else:
116 self.callback.log('creating worker: dynamic ip')
117 self.event('worker.create', 'creating worker: dynamic ip')
118
119 - def event(self, topic, template, content=None):
120 """ Multi-purpose logging method.
121
122 Logs messages to two different destinations:
123 - To log file
124 - The internal "events" queue for communicating back to the
125 dispatcher.
126 - The fedmsg bus. Messages are posted asynchronously to a
127 zmq.PUB socket.
128
129 """
130
131 content = content or {}
132 what = template.format(**content)
133
134 if self.ip:
135 who = 'worker-%s-%s' % (self.worker_num, self.ip)
136 else:
137 who = 'worker-%s' % (self.worker_num)
138
139 self.callback.log("event: who: %s, what: %s" % ( who, what))
140 self.events.put({'when':time.time(), 'who':who, 'what':what})
141 try:
142 content['who'] = who
143 content['what'] = what
144 if self.opts.fedmsg_enabled:
145 fedmsg.publish(modname="copr", topic=topic, msg=content)
146
147 except Exception, e:
148
149 self.callback.log('failed to publish message: %s' % e)
150
152 """call the spawn playbook to startup/provision a building instance"""
153
154
155 self.callback.log('spawning instance begin')
156 start = time.time()
157
158
159
160
161
162
163
164
165
166
167
168
169 try:
170 result = subprocess.check_output("ansible-playbook -c ssh %s" % self.opts.spawn_playbook,
171 shell=True)
172 except subprocess.CalledProcessError, e:
173 result = e.output
174 sys.stderr.write("%s\n" % result)
175 self.callback.log("CalledProcessError: %s" % result)
176
177 if retry < 3:
178 time.sleep(self.opts.sleeptime)
179 self.spawn_instance(retry+1)
180 else:
181 raise subprocess.CalledProcessError, None, sys.exc_info()[2]
182 self.callback.log('Raw output from playbook: %s' % result)
183 match = re.search(r'IP=([^\{\}"]+)', result, re.MULTILINE)
184
185 if not match:
186 return None
187
188 ipaddr = match.group(1)
189
190 self.callback.log('spawning instance end')
191 self.callback.log('got instance ip: %s' % ipaddr)
192 self.callback.log('Instance spawn/provision took %s sec' % (time.time() - start))
193
194 if self.ip:
195 return self.ip
196
197
198
199
200
201 try:
202 IP(ipaddr)
203 return ipaddr
204 except ValueError:
205
206 self.callback.log('No IP back from spawn_instance - dumping cache output')
207 self.callback.log(str(result))
208 self.callback.log('Test spawn_instance playbook manually')
209 return None
210
212 """call the terminate playbook to destroy the building instance"""
213 self.callback.log('terminate instance begin')
214
215
216
217
218
219
220
221
222 subprocess.check_output("/usr/bin/ansible-playbook -c ssh -i '%s,' %s " % (ip, self.opts.terminate_playbook), shell=True)
223 self.callback.log('terminate instance end')
224
226
227
228 try:
229 build = json.load(open(jobfile))
230 except ValueError:
231
232 return None
233 jobdata = Bunch()
234 jobdata.pkgs = build['pkgs'].split(' ')
235 jobdata.repos = [r for r in build['repos'].split(' ') if r.strip() ]
236 jobdata.chroot = build['chroot']
237 jobdata.buildroot_pkgs = build['buildroot_pkgs']
238 jobdata.memory_reqs = build['memory_reqs']
239 if build['timeout']:
240 jobdata.timeout = build['timeout']
241 else:
242 jobdata.timeout = self.opts.timeout
243 jobdata.destdir = os.path.normpath(self.opts.destdir + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'])
244 jobdata.build_id = build['id']
245 jobdata.results = self.opts.results_baseurl + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/'
246 jobdata.copr_id = build['copr']['id']
247 jobdata.user_id = build['user_id']
248 jobdata.user_name = build['copr']['owner']['name']
249 jobdata.copr_name = build['copr']['name']
250 return jobdata
251
252
253 - def post_to_frontend(self, data):
254 """send data to frontend"""
255 i = 10
256 while i > 0:
257 result = self.frontend_callback.post_to_frontend(data)
258 if not result:
259 self.callback.log(self.frontend_callback.msg)
260 i -= 1
261 time.sleep(5)
262 else:
263 i = 0
264 return result
265
266
280
281
296
298 """ Worker should startup and check if it can function
299 for each job it takes from the jobs queue
300 run opts.setup_playbook to create the instance
301 do the build (mockremote)
302 terminate the instance
303 """
304
305 setproctitle.setproctitle("worker %s" % self.worker_num)
306 while not self.kill_received:
307 try:
308 jobfile = self.jobs.get()
309 except Queue.Empty:
310 break
311
312
313 job = self.parse_job(jobfile)
314
315 if job is None:
316 self.callback.log('jobfile %s is mangled, please investigate' % jobfile)
317 time.sleep(self.opts.sleeptime)
318 continue
319
320
321
322
323 job.jobfile = jobfile
324
325
326 if self.create:
327 try:
328 ip = self.spawn_instance()
329 if not ip:
330 raise errors.CoprWorkerError, "No IP found from creating instance"
331
332 except ansible.errors.AnsibleError, e:
333 self.callback.log('failure to setup instance: %s' % e)
334 raise
335
336 try:
337
338 try:
339 if self.opts.fedmsg_enabled:
340 fedmsg.init(name="relay_inbound", cert_prefix="copr", active=True)
341 except Exception, e:
342 self.callback.log('failed to initialize fedmsg: %s' % e)
343
344 status = 1
345 job.started_on = time.time()
346 self.mark_started(job)
347
348 template = 'build start: user:{user} copr:{copr} build:{build} ip:{ip} pid:{pid}'
349 content = dict(user=job.user_name, copr=job.copr_name,
350 build=job.build_id, ip=ip, pid=self.pid)
351 self.event('build.start', template, content)
352
353 template = 'chroot start: chroot:{chroot} user:{user} copr:{copr} build:{build} ip:{ip} pid:{pid}'
354 content = dict(chroot=job.chroot, user=job.user_name,
355 copr=job.copr_name, build=job.build_id,
356 ip=ip, pid=self.pid)
357 self.event('chroot.start', template, content)
358
359 chroot_destdir = os.path.normpath(job.destdir + '/' + job.chroot)
360
361 if not os.path.exists(chroot_destdir):
362 try:
363 os.makedirs(chroot_destdir)
364 except (OSError, IOError), e:
365 msg = "Could not make results dir for job: %s - %s" % (chroot_destdir, str(e))
366 self.callback.log(msg)
367 status = 0
368
369 if status == 1:
370
371
372
373
374
375
376
377 self.callback.log('Starting build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id,ip, job.timeout, job.destdir, job.chroot, str(job.repos)))
378 self.callback.log('building pkgs: %s' % ' '.join(job.pkgs))
379 try:
380 chroot_repos = list(job.repos)
381 chroot_repos.append(job.results + '/' + job.chroot)
382 chrootlogfile = chroot_destdir + '/build-%s.log' % job.build_id
383 macros = {'copr_username': job.user_name,
384 'copr_projectname': job.copr_name,
385 'vendor': "Fedora Project COPR (%s/%s)" % (job.user_name, job.copr_name)}
386 mr = mockremote.MockRemote(builder=ip, timeout=job.timeout,
387 destdir=job.destdir, chroot=job.chroot, cont=True, recurse=True,
388 repos=chroot_repos, macros=macros, buildroot_pkgs=job.buildroot_pkgs,
389 callback=mockremote.CliLogCallBack(quiet=True,logfn=chrootlogfile))
390 mr.build_pkgs(job.pkgs)
391 except mockremote.MockRemoteError, e:
392
393 self.callback.log('%s - %s' % (ip, e))
394 status = 0
395 else:
396
397
398 if mr.failed:
399 status = 0
400 self.callback.log('Finished build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id, ip, job.timeout, job.destdir, job.chroot, str(job.repos)))
401 job.ended_on = time.time()
402
403 job.status = status
404 self.return_results(job)
405 self.callback.log('worker finished build: %s' % ip)
406 template = 'build end: user:{user} copr:{copr} build:{build} ip:{ip} pid:{pid} status:{status}'
407 content = dict(user=job.user_name, copr=job.copr_name,
408 build=job.build_id, ip=ip, pid=self.pid,
409 status=job.status)
410 self.event('build.end', template, content)
411 finally:
412
413 if self.create:
414 self.terminate_instance(ip)
415