source: mergebot/trunk/mergebot/mergebotdaemon.py

Last change on this file was 66, checked in by retracile, 14 years ago

Mergebot: add documentation of the 'Queued' task state

File size: 14.1 KB
Line 
1#!/usr/bin/python
2"""Daemon that performs mergebot operations and manages the associated work.
3"""
4
5import sys
6import socket
7import select
8import os
9import time
10import trac
11import base64
12
13from threading import Thread
14
15from mergebot.BranchActor import BranchActor
16from mergebot.RebranchActor import RebranchActor
17from mergebot.CheckMergeActor import CheckMergeActor
18from mergebot.MergeActor import MergeActor
19
20from mergebot.daemonize import daemonize
21
22
23class Task(object):
24    """Represents a task in the queue with its dependencies.
25    """
26    task_id = 0
27    task_name_to_actor = {
28        'merge': MergeActor,
29        'branch': BranchActor,
30        'rebranch': RebranchActor,
31        'checkmerge': CheckMergeActor,
32    }
33    def __init__(self, master, ticket, action, component, version, summary,
34                 requestor):
35        self.id = Task.task_id = Task.task_id + 1
36        self.master = master
37        self.action = action.lower()
38        if action not in Task.task_name_to_actor.keys():
39            raise Exception('Invalid task type %s' % action)
40        self.ticket = ticket
41        self.component = component
42        self.version = version
43        self.summary = summary
44        self.requestor = requestor
45        self.blocked_by = []
46        self._queued = False
47        self._running = False
48        self._completed = False
49
50    def find_blockers(self, tasks):
51        """Adds existing tasks that block this task to this task's blocked_by
52        list.
53        """
54        for task in tasks:
55            # Tasks for different components are completely independent
56            if task.component == self.component:
57                is_blocked = False
58                # if self.version is a ticket ID, then any action other than
59                # 'checkmerge' on the parent ticket blocks this action
60                if self.version.startswith('#'):
61                    parent_ticket = int(self.version.lstrip('#'))
62                    if task.ticket == parent_ticket and task.action != 'checkmerge':
63                        is_blocked = True
64                # Any action other than checkmerge on a child ticket of ours blocks us
65                if task.version.startswith('#'):
66                    parent_ticket = int(task.version.lstrip('#'))
67                    if self.ticket == parent_ticket and task.action != 'checkmerge':
68                        is_blocked = True
69                # If (re)branching, then blocked by other tickets that are merging _to_ self.version
70                if self.action == 'branch' or self.action == 'rebranch':
71                    if task.action == 'merge' and task.version == self.version:
72                        is_blocked = True
73                # A merge operation targeting our version blocks us
74                if task.action == 'merge' and task.version == self.version:
75                    is_blocked = True
76                # If there is another queued operation for this same ticket,
77                # that task blocks this one
78                if self.ticket == task.ticket:
79                    is_blocked = True
80
81                if is_blocked:
82                    self.blocked_by.append(task)
83        return len(self.blocked_by)
84
85    def other_task_completed(self, task):
86        """Remove the given task from this task's blocked_by list.
87        """
88        if task in self.blocked_by:
89            self.blocked_by.remove(task)
90        return len(self.blocked_by)
91
92    def queued(self):
93        """Mark this task ask queued to run.
94        """
95        self._queued = True
96
97    def started(self):
98        """Mark this task as running/started.
99        """
100        self._running = True
101
102    def completed(self):
103        """Mark this task as completed/zombie.
104        """
105        self._completed = True
106
107    def get_state(self):
108        """Return a single-character indicator of this task's current state.
109
110        Tasks are:
111            Pending if they are ready to run.
112            Waiting if they are blocked by another task.
113            Running if they are currently being done.
114            Queued if they have been sent to a worker but not yet begun.
115            Zombie if they have been completed.
116        """
117        if self._completed:
118            state = 'Z'#ombie
119        elif self._running:
120            state = 'R'#unning
121        elif self._queued:
122            state = 'Q'#ueued
123        elif self.blocked_by:
124            state = 'W'#aiting
125        else:
126            state = 'P'#ending
127        return state
128
129    def execute(self):
130        """Performs the actions for this task.
131        """
132        work_dir = os.path.join(self.master.work_dir, 'worker-%s' % self.id)
133        actor = Task.task_name_to_actor[self.action](work_dir,
134            self.master.repo_url, self.master.repo_dir, self.ticket,
135            self.component, self.version, self.summary, self.requestor)
136        self.results, self.result_comment, self.success = actor.execute()
137       
138    def __str__(self):
139        summary = base64.b64encode(self.summary)
140        return ','.join([str(e) for e in (self.id, self.get_state(),
141            self.ticket, self.action, self.component, self.version,
142            self.requestor, summary)])
143
144
145class Worker(object):
146    """Thread to do the work for an operation; has a work area it is
147    responsible for.
148    """
149    def __init__(self, num, work_dir):
150        self.number = num
151        self.work_dir = work_dir
152        self.task = None
153        self.inbox_read, self.inbox_write = os.pipe()
154        self.notifier_read, self.notifier_write = os.pipe()
155        self.thread = Thread(target=self.work)
156        self.thread.setDaemon(True)
157        self.thread.start()
158
159    def queue(self, task):
160        task.queued()
161        self.task = task
162        os.write(self.inbox_write, 'q')
163
164    def _dequeue(self):
165        os.read(self.inbox_read, 1)
166
167    def _completed(self):
168        self.task.completed()
169        os.write(self.notifier_write, 'd')
170
171    def ack_complete(self):
172        os.read(self.notifier_read, 1)
173        return self.task
174
175    def notifier(self):
176        return self.notifier_read
177
178    def work(self):
179        while True:
180            # get a task -- blocking read on pipe?
181            self._dequeue()
182            # do the task
183            log_filename = os.path.join(self.work_dir, 'worker.%s' % self.number)
184            open(log_filename, 'a').write(str(self.task) + ' started %s\n' % time.time())
185            self.task.started()
186            self.task.execute()
187            open(log_filename, 'a').write(str(self.task) + ' completed %s\n' % time.time())
188            # notify master of completion -- write to pipe?
189            self._completed()
190
191
192class Mergebot(object):
193    # Maybe I should just pass this the trac environment dir and have it create
194    # an environment, then pull config info from that.
195    def __init__(self, trac_dir):
196        self.trac_dir = os.path.abspath(trac_dir)
197        self.trac_env = trac.env.open_environment(self.trac_dir)
198        config = self.trac_env.config
199        self.listen_on = (config.get('mergebot', 'listen.ip'),
200                          config.getint('mergebot', 'listen.port'))
201        self.work_dir = config.get('mergebot', 'work_dir')
202        if not os.path.isabs(self.work_dir):
203            self.work_dir = os.path.join(self.trac_dir, self.work_dir)
204        self.repo_url = config.get('mergebot', 'repository_url')
205        repo_dir = config.get('trac', 'repository_dir')
206        if not os.path.isabs(repo_dir):
207            repo_dir = os.path.join(self.trac_dir, repo_dir)
208        self.repo_dir = repo_dir
209
210        self.listening = None
211        self.worker_count = config.getint('mergebot', 'worker_count')
212        self.task_list = []
213
214    def run(self):
215        """Run forever, handling requests.
216        """
217        self.listening = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
218        self.listening.bind(self.listen_on)
219        self.listening.listen(10)
220        open_sockets = []
221        workers = [Worker(i, work_dir=self.work_dir) for i in range(self.worker_count)]
222        active = []
223        try:
224            while True:
225                fds = [self.listening] + open_sockets + [w.notifier() for w in
226                                                         active]
227                readable, _writeable, _other = select.select(fds, [], [])
228                for s in readable:
229                    if s is self.listening:
230                        new_socket, _address = self.listening.accept()
231                        open_sockets.append(new_socket)
232                    elif s in open_sockets:
233                        data = s.recv(4096)
234                        for line in data.rstrip('\n').split('\n'):
235                            if line == '':
236                                open_sockets.remove(s)
237                                s.close()
238                            elif line[:4].upper() == 'QUIT':
239                                open_sockets.remove(s)
240                                s.close()
241                            else:
242                                response = self.handle_command(line)
243                                if response:
244                                    s.send(response)
245                                # I think we're going to want to make this a
246                                # single-shot deal
247                                #s.close()
248                                #open_sockets.remove(s)
249                    else:
250                        # Must be an active worker telling us it is done
251                        worker = [w for w in active if w.notifier() == s][0]
252                        task = worker.ack_complete()
253                        active.remove(worker)
254                        workers.insert(0, worker)
255                        # On failure, cancel all tasks that depend on this one.
256                        if not task.success:
257                            self._remove_dependant_tasks(task)
258                        self._remove_task(task)
259                        self._update_ticket(task)
260                # TODO: need to handle connections that the other end
261                # terminates?
262
263                # Assign a task to a worker
264                available_workers = list(workers)
265                pending = [t for t in self.task_list if t.get_state() == 'P']
266                for w, t in zip(available_workers, pending):
267                    w.queue(t)
268                    workers.remove(w)
269                    active.append(w)
270
271        except KeyboardInterrupt:
272            print 'Exiting due to keyboard interrupt.'
273        except Exception, e:
274            print 'Exiting due to: ', e
275            raise
276        self.listening.close()
277
278    def handle_command(self, command):
279        """Takes input from clients, and calls the appropriate sub-command.
280        """
281        parts = command.strip().split()
282        if not parts:
283            return '\n'
284        command = parts[0].upper()
285        args = parts[1:]
286
287        response = 'unrecognized command "%s"' % command
288        if command == 'LIST':
289            response = self.command_list()
290        elif command == 'ADD':
291            response = self.command_add(args)
292        elif command == 'CANCEL':
293            response = self.command_cancel(args)
294        # etc...
295        return response + '\n'
296
297    def command_list(self):
298        """Returns a listing of all active tasks.
299        """
300        listing = []
301        for task in self.task_list:
302            listing.append(str(task))
303        return '\n'.join(listing) + '\nOK'
304
305    def command_add(self, args):
306        # create a new task object and add it to the pool
307        try:
308            ticket, action, component, version, requestor = args
309        except ValueError:
310            return 'Error: wrong number of args: add <ticket> <action> ' \
311                '<component> <version> <requestor>\nGot: %r' % args
312        try:
313            ticket = int(ticket.strip('#'))
314        except ValueError:
315            return 'Error: invalid ticket number "%s"' % ticket
316
317        trac_ticket = trac.ticket.Ticket(self.trac_env, ticket)       
318        summary = trac_ticket['summary']
319        new_task = Task(self, ticket, action, component, version, summary,
320                        requestor)
321        new_task.find_blockers(self.task_list)
322        self.task_list.append(new_task)
323        # and trigger the worker threads if needed
324        return 'OK'
325
326    def command_cancel(self, args):
327        try:
328            tasknum = int(args[0])
329        except ValueError:
330            return 'Error'
331        except IndexError:
332            return 'Error'
333        found = [t for t in self.task_list if t.id == tasknum]
334        if len(found) != 1:
335            return 'Error: Not found'
336        dead = found[0]
337        dead_state = dead.get_state()
338        if dead_state not in ['W', 'P']:
339            return 'Error: Cannot kill task (state %s)' % dead_state
340        self._remove_dependant_tasks(dead)
341        self._remove_task(dead)
342        return 'OK'
343
344    def _remove_task(self, dead):
345        """Removes the given task from the queue.
346        Removes task as a dependency from any other tasks in the queue.
347        Assumes the task is in the task_list.
348        """
349        try:
350            self.task_list.remove(dead)
351        except ValueError:
352            self.trac_env.log.error("Task %s not in task_list when asked to remove" % dead)
353        for t in self.task_list:
354            if dead in t.blocked_by:
355                t.blocked_by.remove(dead)
356
357    def _remove_dependant_tasks(self, task):
358        for t in self.task_list:
359            if task in t.blocked_by:
360                self._remove_dependant_tasks(t)
361                self._remove_task(t)
362
363    def _update_ticket(self, task):
364        ticket = trac.ticket.Ticket(self.trac_env, task.ticket)       
365        if task.results or task.result_comment:
366            for key, value in task.results.items():
367                ticket[key] = value
368            try:
369                ticket.save_changes('mergebot', task.result_comment)
370            except:
371                time.sleep(1) # attempt to avoid "pysqlite2.dbapi2.IntegrityError: columns ticket, time, field are not unique"
372                ticket.save_changes('mergebot', task.result_comment)
373
374
375def main(args):
376    foreground = False
377    if args[0] == '-f':
378        foreground = True
379        args = args[1:]
380    trac_dir = args[0]
381    if not foreground:
382        daemonize()
383    bot = Mergebot(trac_dir)
384    bot.run()
385
386def run():
387    main(sys.argv[1:])
388
389if __name__ == '__main__':
390    run()
Note: See TracBrowser for help on using the repository browser.