source: mergebot/trunk/mergebot/WorkQueue.py @ 16

Last change on this file since 16 was 16, checked in by retracile, 15 years ago

Mergebot: Codebase as released with permission from CommProve?, plus cleanups to remove traces of that environment.

File size: 6.9 KB
Line 
1#!/usr/bin/env python
2import os
3import sys
4import time
5import traceback
6import trac.env
7
8from TrackerTools import Task, GetWorkDir
9from CreateDaemon import createDaemon
10
11class WorkQueue(object):
12    # TODO: add locking to file accesses.
13    delim = " "
14    def __init__(self, workdir, name):
15        self.name = name
16        if not os.path.exists(workdir):
17            os.makedirs(workdir)
18        self.queuefile = os.path.join(workdir, "queue.%s" % name)
19        self.inprocessfile = os.path.join(workdir, "queue.%s.current" % name)
20        self.pidfile = os.path.join(workdir, "queue.%s.pid" % name)
21    def drivequeue(self):
22        "return true if this actor is already running"
23        if os.path.exists(self.pidfile):
24            # Check that it's running...
25            pid = open(self.pidfile).read()
26            process = os.popen("ps h --pid %s" % pid).read()
27            if process:
28                return 1
29            # otherwise, just overwrite the existing one.
30        open(self.pidfile, "w").write("%s\n" % os.getpid())
31        return 0
32    def releasequeue(self):
33        os.remove(self.pidfile)
34    def readqueue(self):
35        if os.path.exists(self.queuefile):
36            lines = open( self.queuefile ).readlines()
37        else:
38            lines = []
39        # The -1 is to strip the trailing newline.
40        queued = map(lambda x: x[:-1].split(self.delim), lines)
41        return queued
42    def writequeue(self, tasks):
43        if tasks:
44            lines = map( lambda task:"%s\n" % (self.delim.join(task)), tasks )
45        else:
46            lines = []
47        open(self.queuefile, "w").writelines(lines)
48    def readcurrent(self):
49        if os.path.exists(self.inprocessfile):
50            current = open( self.inprocessfile ).readlines()
51        else:
52            current = []
53        if current:
54            task = current[0].split(self.delim)
55        else:
56            task = None
57        return task
58    def writecurrent(self, task):
59        if task:
60            line = "%s\n" % (self.delim.join(task))
61        else:
62            line = ""
63        open(self.inprocessfile, "w").writelines([line])
64    def enqueue(self, task):
65        queued = self.readqueue()
66        if task in queued:
67            return 1
68        queued.append(task)
69        self.writequeue(queued)
70        return 0
71    def dequeue(self):
72        # Check that we don't have something half-way dequeued.
73        # The problem is that if we have something half-way dequeued, there was
74        # a problem completing that task.
75        # TODO: So instead of returning it, we need to ring alarm bells, but
76        # dequeue the next task.
77        queued = self.readqueue()
78        if not queued:
79            return None
80        self.writequeue(queued[1:])
81        self.writecurrent(queued[0])
82        return queued[0]
83    def completetask(self):
84        self.writecurrent(None)
85
86class MergeBotActor(object):
87    def __init__(self, tracEnv, name, action):
88        self.tracEnv = tracEnv
89        self.name = name
90        self.action = action
91        workdir = GetWorkDir(tracEnv)
92        self.queue = WorkQueue(workdir, name)
93    def AddTask(self, task):
94        if self.queue.enqueue(task):
95            # must be a dup
96            return
97        task_obj = Task(self.tracEnv, task[0])
98        task_obj.SetMergeBotStatus("to%s"%self.name)
99        task_obj.Save()
100    def Run(self):
101        self.tracEnv.log.debug("Starting %s action" % (self.name))
102        # The child will need the absolute path to the environment.
103        trac_env_path = os.path.abspath(self.tracEnv.path)
104        pid = os.fork()
105        if pid:
106            # Parent
107            self.tracEnv.log.debug("Running %s action in child %s" % (self.name, pid))
108        else:
109            # Child.  We daemonize and do the work.
110            createDaemon()  # This closes all open files, and sets the working directory to /
111            # And because of that, we must open our own copy of the Trac environment.
112            tracEnv = trac.env.open_environment(trac_env_path)
113            if self.queue.drivequeue():
114                # something is already working
115                tracEnv.log.debug("The %s queue is already running." % (self.name, ))
116                sys.exit(0)
117            try:
118                tracEnv.log.debug("Child running %s action" % (self.name, ))
119                while 1:
120                    task = self.queue.dequeue()
121                    if not task:
122                        tracEnv.log.debug("Queue %s emptied." % (self.name, ))
123                        # we're done.
124                        break
125                    tracEnv.log.debug("%s working on %s..." % (self.name, task))
126                    # FIXME: Need to rethink the status update logic.
127                    args = [tracEnv] + task
128                    tracEnv.log.debug("Running %s action with arguments %s" % (self.name, repr(args)))
129                    try:
130                        result, task_obj = self.action(*args)
131                        if result:
132                            tracEnv.log.debug("Action %s completed with result %s" % (self.name, result))
133                            task_obj.SetMergeBotStatus(result)
134                        task_obj.Save()
135                    except Exception, e:
136                        tracEnv.log.error("BUG!!!  Task %r failed while running"
137                            " the %s queue.  Continuing." % (args, self.name))
138                        tracEnv.log.exception(e)
139                       
140                    self.queue.completetask()
141                self.queue.releasequeue()
142            except Exception, e:
143                tracEnv.log.error("BUG!!!  Failed while running the %s queue" % (self.name, ))
144                tracEnv.log.exception(e)
145                sys.exit(1)
146           
147            tracEnv.log.debug("Child completed action %s, exiting" % (self.name, ))
148            sys.exit(0)
149    def GetStatus(self):
150        """Returns a tuple (current, (queue)), where each element is a list of
151        arguments"""
152        return (self.queue.readcurrent(), self.queue.readqueue() )
153
154def VersionToDir(version):
155    """Given the version from the Trac version list, determine what the path
156    should be under that component.  trunk -> trunk, but the rest will be
157    branches/something."""
158    if version == "trunk":
159        versiondir = "trunk"
160    elif version.startswith("#"):
161        ticketnum = version[1:] # Strip the "#"
162        versiondir = "branches/ticket-%s" % (ticketnum)
163    else:
164        versiondir = "branches/release-%s" % (version)
165    return versiondir
166
167def testcase():
168    wc = WorkQueue(os.path.join(os.getcwd(), "test"), "noop")
169    print "Initial state."
170    print wc.readcurrent()
171    print wc.readqueue()
172
173    while wc.dequeue():
174        wc.completetask()
175
176    print wc.enqueue(["task1"])
177    print wc.enqueue(["task2"])
178
179    task = wc.dequeue()
180    wc.completetask()
181
182    print "Final state."
183    print wc.readcurrent()
184    print wc.readqueue()
185
186if __name__ == "__main__":
187    testcase()
188
189# vim:foldcolumn=4 foldmethod=indent
190# vim:tabstop=4 shiftwidth=4 softtabstop=4 expandtab
Note: See TracBrowser for help on using the repository browser.