# replicator.py -- P4DTI replicator. # Gareth Rees, Ravenbrook Limited, 2000-08-09. # $Id: //info.ravenbrook.com/project/p4dti/branch/2000-09-13/demo-debugging/code/replicator/replicator.py#2 $ import re import socket import time # Error object for errors generated by the replicator. error = 'P4DTI replicator error' # TODO: add message priority constants. # The Defect_Tracker class is an abstract class representing the interface # between the replicator and the defect tracker. Don't use this class, # subclass it and use the subclass. class Defect_Tracker: config = {} replicator = None def __init__(self, config = {}): # Merge the supplied config with the default config (the former takes # precedence). for k in config.keys(): self.config[k] = config[k] # changed_issues(). Return a hash of issues that have changed in the # defect tracker. The hash maps issue id to issue. # init(). Set up the defect tracking database for the integration. Set up # issues for replication by this replicator according to the policy in the # `replicate_issue_p' method. # issue(issue_id). Return the defect tracking issue with the given # identifier, or None if there is no such issue. # issue_conflicting_p(issue). Return true iff the given issue is marked as # being in conflict with Perforce. # issue_id(issue). Return the identifier for the given defect tracking # issue. # issue_jobname(issue). Return the Perforce jobname for the given defect # tracking issue (if the issues specifies one), or generate and return an # appropriate jobname for the issue (otherwise). # log(message, priority). Write a message to the replicator's log with the # given priority. def log(self, message, priority = None): self.replicator.log(message, priority) # replicate_issue_p(issue). A policy used during initialization of the # integration to set up replication for issues where replication is not # specified. Return true if the issue should be replicated by this # replicator, false if it should not. The default policy is to replicate # all issues. def replicate_issue_p(self, case): return 1 # set_replicator(r). Set the replicator object to which this defect # tracking object belongs. def set_replicator(self, r): self.replicator = r # transform_issue_to_job(issue, old_job). Transform a defect tracking # issue into a Perforce job. The old_job argument is the old job # description corresponding to the issue (or a fresh job description if # there is no corresponding job). Return the new job and a hash of changes # (as a tuple). # transform_job_to_issue(job, old_issue). Transform a Perforce job into a # defect tracking issue. The old_issue argument is the old issue # corresponding to the job (or a fresh issue description if there is no # corresponding issue). Return the new issue and a hash of changes (as a # tuple). # update_issue(issue). Update the given issue in the defect tracking # system. Update the issue argument in-place so that it matches the record # in the defect tracking system. # update_issue_status(issue, status). Update the given issue in the defect # tracking system so that it is recorded as being consistent with Perforce # (if status == 'ok') or conflicting with Perforce (if status == # 'conflicting'). # The Replicator class is a generic replicator. Pass the constructor a defect # tracking interface and (optionally) a configuration hash. The generic # replicator assumes the Perforce server is on the same host as the replicator. class Replicator: config = { 'counter' : None, 'p4_client' : 'p4dti-%s' % socket.gethostname(), 'p4_client_executable': 'p4', 'p4_password' : '', 'p4_port' : '127.0.0.1:1667', 'p4_user' : None, 'poll_period' : 10, 'rid' : 'R0', 'sid' : 'p4_%s' % socket.gethostname(), } dt = None error = 'P4DTI Replicator error' def __init__(self, dt, config = {}): # Merge the supplied config with the default config (the former takes # precedence). for k in config.keys(): self.config[k] = config[k] # Make a counter name for this replicator. if not self.config['counter']: self.config['counter'] = 'P4DTI-%s' % self.rid() # Make a userid for the replicator. if not self.config['p4_user']: self.config['p4_user'] = 'P4DTI-%s' % self.rid() # Set up links between the replicator object and its defect tracker # object. Note that this makes a circular reference, so these two # objects won't be freed in Python 1.5. They should be freed in Python # 2.0. self.dt = dt self.dt.set_replicator(self) # changed_jobs(). Return a hash of changed jobs that are due for # replication by this replicator (that is, the P4DTI-rid field of the job # matches the replicator id), keyed by jobname. We make sure not to return # jobs that were last updated by the replicator, by looking at the # P4DTI-user field in each job. def changed_jobs(self): # Get all entries from the log. changes = self.p4_run('logger -c 0') # "p4 logger" doesn't return fully structured output. Instead, the # 'data' element of the returned structures is a string like "15 job # jobname". The job_re regular expression matches and parses these # strings. job_re = re.compile('([0-9]+) job (.*)$') changed_jobs = {} count = 0 # The last entry number in the log. for c in changes: if c.has_key('code') and c['code'] == 'info' and c.has_key('data'): match = job_re.match(c['data']) if match: count = int(match.group(1)) jobname = match.group(2) if not changed_jobs.has_key(jobname): job = self.job(jobname) if (job.has_key('P4DTI-rid') and job['P4DTI-rid'] == self.rid() and job.has_key('P4DTI-user') and job['P4DTI-user']!=self.config['p4_user']): changed_jobs[jobname] = job # Update counter to last entry number in the log. This has the # side-effect of deleting the log (see p4 help undoc). self.p4_run('logger -t %s -c %d' % (self.config['counter'], count)) return changed_jobs # init(). Set up Perforce and the defect tracking system so that # replication can proceed. def init(self): # Initialize the defect tracking system. self.dt.init() # TODO: Initialize Perforce by adding fields to jobspec if not present. # For the moment I can't add fields to the jobspec, so I'll just check # that they are there. I can't even check the presence of the # P4DTI-user field this way. job = self.job('P4DTI-no-such-job') for field in ['P4DTI-issue-id', 'P4DTI-rid']: if not job.has_key(field): raise error, "Field '%s' not found in Perforce jobspec" \ % field # Start the logger (see p4 help undoc). self.p4_run('counter logger 0') # job(jobname). Return the Perforce job with the given name if it exists, # or an empty job specification (otherwise). def job(self, jobname): jobs = self.p4_run('job -o %s' % jobname) if len(jobs) != 1 or not jobs[0].has_key('Job'): raise error, "expected a job but found %s" % `jobs` elif jobs[0]['Job'] != jobname: raise error, "asked for job '%s' but got job '%s'" \ % (jobname, job['Job']) else: return jobs[0] # log(message, priority). Do something appropriate with the message. def log(self, message, priority = None): print "%s: %s" % (self.rid(), message) # p4_run(arguments, input). Run the p4 client with the given arguments. # The arguments should be a Perforce command and its arguments, like "jobs # -o //foo/...". Options should generally include -i and/or -o to avoid # forms being put up interactively. If input is supplied, then it should # be a list of dictionaries. These dictionaries are sent one by one to the # process. The results are read into a list and returned. Since the # output of the process is read to EOF and the input is closed, there # should be no process left hanging about. The id parameter is an string # used as part of the name of a temporary file. def p4_run(self, arguments, input = None): import marshal, os, tempfile, win32pipe temp_filename = None command = '%s -G -p "%s" -u "%s" -P "%s" -c "%s" %s' \ % (self.config['p4_client_executable'], self.config['p4_port'], self.config['p4_user'], self.config['p4_password'], self.config['p4_client'], arguments) if input: tempfile.template = 'p4dti_%s_data' % self.rid() temp_filename = tempfile.mktemp() temp_file = open(temp_filename, 'w') marshal.dump(input[0], temp_file) temp_file.close() command = "%s < %s" % (command, temp_filename) stream = win32pipe.popen(command, 'r') results = [] try: while 1: results = results + [marshal.load(stream)] except EOFError: if (temp_filename): os.remove(temp_filename) if len(results) == 1 and results[0].has_key('code') \ and results[0]['code'] == 'error': raise error, "Perforce error: '%s'" % results[0]['data'] else: return results # poll(). Poll the DTS for changed issues. Replicate them. def poll(self): # Get the changed issues and jobs. changed_issues = self.dt.changed_issues() if changed_issues: self.log('%d issues have changed' % len(changed_issues)) changed_jobs = self.changed_jobs() if changed_jobs: self.log('%d jobs have changed' % len(changed_jobs)) # Make a list of pairs of (defect tracking issue, Perforce job). Use # None where the corresponding object hasn't changed. changed_pairs = [] for issue in changed_issues.values(): jobname = self.dt.issue_jobname(issue) if changed_jobs.has_key(jobname): job = changed_jobs[jobname] del changed_jobs[jobname] else: job = None changed_pairs = changed_pairs + [(issue, job)] for job in changed_jobs.values(): changed_pairs = changed_pairs + [(None, job)] # Now process each pair. for issue, job in changed_pairs: # Issue only changed? if issue and not job: jobname = self.dt.issue_jobname(issue) job = self.job(jobname) if job['P4DTI-status'] == 'conflicting': self.log("conflict between issue '%s' and job '%s'" \ % (str(self.dt.issue_id(issue)), job['Job'])) else: self.log("Replicating issue '%s' to job '%s'" \ % (str(self.dt.issue_id(issue)), jobname)) self.replicate_issue_to_job(issue, job) # Job only changed? elif job and not issue: id = job['P4DTI-issue-id'] issue = self.dt.issue(id) if not issue: raise error, "No issue '%s'" % id if self.dt.issue_conflicting_p(issue): self.log("conflict between issue '%s' and job '%s'" \ % (str(self.dt.issue_id(issue)), job['Job'])) else: self.log("Replicating job '%s' to issue '%s'" \ % (str(id), job['Job'])) self.replicate_job_to_issue(job, issue) # Both changed -- mark as being conflicting. TODO: they might not # really be conflicting. We ought to consult a policy here. else: self.dt.update_issue_status(issue, 'conflicting') self.update_job_status(job, 'conflicting') self.log("conflict between issue '%s' and job '%s'" \ % (str(self.dt.issue_id(issue)), job['Job'])) # replicate_issue_to_job(issue, old_job). Replicate the given issue from # the defect tracker to Perforce. def replicate_issue_to_job(self, issue, old_job): # Transform the issue into a job. new_job, changes = self.dt.transform_issue_to_job(issue, old_job) if changes: self.log("Changed fields: %s" % `changes`) self.p4_run('job -i', [new_job]) self.dt.update_issue_status(issue, 'ok') self.update_job_status(new_job, 'ok') # replicate_job_to_issue(job, old_issue). Replicate the given job from # Perforce the defect tracker. TODO: I ought to change the specifications # of these methods so that they only deal with change descriptions. Then # everything would be much neater. def replicate_job_to_issue(self, job, old_issue): # Transform the job into an issue. new_issue, changes = self.dt.transform_job_to_issue(job, old_issue) if changes: self.log("Changed fields: %s" % `changes`) self.dt.update_issue(new_issue) self.dt.update_issue_status(new_issue, 'ok') self.update_job_status(job, 'ok') # rid(). Return the replicator id. def rid(self): return self.config['rid'] # run() repeatedly polls the DTS. def run(self): while 1: self.log("Polling...") self.poll() time.sleep(self.config['poll_period']) # sid(). Return the Perforce server id. def sid(self): return self.config['sid'] # update_job_status(job, status). def update_job_status(self, job, status): if job['P4DTI-status'] != status: job['P4DTI-status'] = status self.p4_run('job -i', [job])