# replicator.py -- P4DTI replicator.
# Gareth Rees, Ravenbrook Limited, 2000-08-09.
# $Id: //info.ravenbrook.com/project/p4dti/branch/2000-12-01/no-conflicts/code/replicator/replicator.py#2 $
#
# See "Perforce Defect Tracking Integration Architecture"
# for the architecture of the integration;
# "Replicator design" for the design of the
# replicator; and "Replicator classes in Python"
# for the class organization of the
# replicator.
#
# Copyright 2000 Ravenbrook Limited. This document is provided "as is",
# without any express or implied warranty. In no event will the authors
# be held liable for any damages arising from the use of this document.
# You may make and distribute copies and derivative works of this
# document provided that (1) you do not charge a fee for this document or
# for its distribution, and (2) you retain as they appear all copyright
# and licence notices and document history entries, and (3) you append
# descriptions of your modifications to the document history.
import logger
import re
import smtplib
import socket
import string
import sys
import time
import traceback
import types
# The defect_tracker_issue class is an abstract class representing an issue in
# the defect tracker. You can't use this class; you must subclass it and use
# the subclass.
class defect_tracker_issue:
# action(). Return the action field (replicate/wait/keep/discard).
# add_filespec(filespec). Associate a filespec with the issue.
# add_fix(change, client, date, status, user). Associate a fix with the
# issue.
# id(). Return an identifier for the given defect tracking issue, as a
# string. This identifier can be used to fetch the issue from the defect
# tracker, by passing it to defect_tracker.issue.
# filespecs(). Return a list of filespecs for the issue. The elements of
# the list belong to a subclass of the defect_tracker_filespec class.
# fixes(). Return a list of fixes for the issue. The elements of the list
# belong to a subclass of the defect_tracker_fix class.
# readable_name(). Return a human-readable name for the issue, as a
# string. This should be suitable for use as a Perforce jobname.
# replicate_p(). A policy used to set up replication for issues where
# replication is not yet specified. Return true if the issue should be
# replicated by this replicator, false if it should not. The default
# policy is to replicate all issues.
def replicate_p(self):
return 1
# rid(). Return the identifier of the replicator which is in charge of
# replicating this issue, or the empty string if the issue is not being
# replicated.
# setup_for_replication(server_id). Set up the issue for replication. The
# server_id argument is the Perforce server id that the replicator
# replicates to.
# update(user, changes). Update the issue record in the defect tracking
# system by applying changes, which is a dictionary mapping field name to
# the new value for that field name. User is the user who made the change
# in Perforce.
# update_action(action). Update the issue in the defect tracking system so
# that it's action field is set to the action argument.
# The defect_tracker_filespec class is an abstract class representing an
# associated filespec record in the defect tracker. You can't use this class;
# you must subclass it and use the subclass.
class defect_tracker_filespec:
pass
# delete(). Delete the filespec record.
# name(). Return the filespec's name.
# The defect_tracker_fix class is an abstract class representing a fix record
# in the defect tracker. You can't use this class; you must subclass it and
# use the subclass.
class defect_tracker_fix:
pass
# change(). Return the change number for the fix record.
# delete(). Delete the fix record.
# status(). Return the status for the fix record.
# update(change, client, date, status, user). Update the fix record so
# that it corresponds to the Perforce fix.
# The defect_tracker class is an abstract class representing the interface
# between the replicator and the defect tracker. You can't use this class; you
# must subclass it and use the subclass.
class defect_tracker:
config = { 'logger' : logger.file_logger() }
rid = None
sid = None
error = "Defect tracker error"
def __init__(self, rid, sid, config = {}):
assert isinstance(rid, types.StringType)
assert isinstance(sid, types.StringType)
assert isinstance(config, types.DictType)
if not re.match('^[A-Za-z_][A-Za-z_0-9]*$', rid):
raise self.error, \
("Replicator identifier must consist of letters, "
"numbers and underscores only: '%s' is not allowed." % rid)
if not re.match('^[A-Za-z_][A-Za-z_0-9]*$', sid):
raise self.error, \
("Perforce server identifier must consist of letters, "
"numbers and underscores only: '%s' is not allowed." % sid)
self.rid = rid
self.sid = sid
# Merge the supplied config with the default config (the former takes
# precedence).
for k in config.keys():
self.config[k] = config[k]
# all_issues(). Return a list of all defect tracking issues that are being
# replicated by this replicator, or which are not being replicated by any
# replicator. Each element of the list belongs to the defect_tracker_issue
# class (or a subclass).
# changed_entities(). Return a triple consisting of (a) a list of the
# issues that have changed in the defect tracker and which are either
# replicated by this replicator, or are new issues that are not yet
# replicated; (b) a list of the changelists that have changed; and (c) a
# marker. Each element of the first list belongs to the
# defect_tracker_issue class (or a subclass). The marker will be passed to
# the method mark_changes_done when that is called after all the issues
# have been replicated. For defect trackers other than Perforce, the
# second list should be empty.
# mark_changes_done(marker). Called when all the issues returned by
# changed_entities have been replicated. The argument is the second
# element of the pair returned by the call to changed_entities. (The idea
# behind this is that the defect tracker interface may have some way of
# recording that it has considered all these issues -- perhaps by recording
# the last key on a changes table. It is important not to record this
# until it is true, so that if the replicator crashes between getting the
# changed issues and replicating them then we'll consider the same set of
# changed issues the next time round, and hopefully this will give us a
# chance to either replicate them correctly or else discover that they are
# conflicting.)
# init(). Set up the defect tracking database for the integration. Set up
# issues for replication by this replicator according to the policy in the
# replicate_p method of the issue.
# issue(issue_id). Return the defect tracking issue whose identifier has
# the string form given by issue_id, or None if there is no such issue.
# log(format, arguments, priority). Write a message to the replicator's
# log.
def log(self, format, arguments = (), priority = None):
assert isinstance(format, types.StringType)
assert priority == None
format = "%s\t" + format
if isinstance(arguments, types.TupleType):
arguments = (self.rid,) + arguments
else:
arguments = (self.rid, arguments)
self.config['logger'].log("P4DTI-0", format, arguments, priority)
# replicate_changelist(change, client, date, description, status, user).
# Replicate the changelist to the defect tracking database. Return 1 iff
# the changelist was changed, 0 if it was already in the database and was
# unchanged.
# A translator subclass translates between corresponding fields in two defect
# trackers. For example, we could have a state translator that translates
# between a state field in TeamTrack and the status field in Perforce.
# The translator class itself is the null translator. It doesn't change
# values, but does check types.
class translator:
# Translate a field from defect tracker 0 to defect tracker 1. The issues
# argument is a pair of issues to which the translation applies, or None if
# the translation isn't being applied to issues.
#
# Note that some translators may only be applied to fields in issues. For
# example, the TeamTrack to Perforce state translator can only work if it
# knows the project to which the TeamTrack issue belongs (the same logical
# state may be represented by several actua; states in different projects).
# So that translator insists on the issues argument being supplied.
def translate_0_to_1(self, value, dt0, dt1, issue0 = None, issue1 = None):
assert isinstance(dt0, defect_tracker)
assert isinstance(dt1, defect_tracker)
assert issue0 == None or isinstance(issue0, defect_tracker_issue)
# Jobs aren't defect tracker issues (yet?).
#assert issue1 == None or isinstance(issue1, defect_tracker_issue)
return value
# Translate a field from defect tracker 1 to defect tracker 0. The issues
# argument is a pair of issues to which the translation applies, or None if
# the translation isn't being applied to issues.
def translate_1_to_0(self, value, dt0, dt1, issue0 = None, issue1 = None):
assert isinstance(dt0, defect_tracker)
assert isinstance(dt1, defect_tracker)
assert issue0 == None or isinstance(issue0, defect_tracker_issue)
# Jobs aren't defect tracker issues (yet?).
#assert issue1 == None or isinstance(issue1, defect_tracker_issue)
return value
# The replicator class is a generic replicator. Pass the constructor a defect
# tracking interface and (optionally) a configuration hash. The generic
# replicator assumes the Perforce server is on the same host as the replicator.
class replicator:
config = {
'administrator-address': None,
'counter' : None,
'job-status-field' : 'Status',
'job-owner-field' : 'User',
'logger' : logger.file_logger(),
'poll-period' : 10,
'replicator-address' : 'p4dti',
'smtp-server' : None,
'user-translator': None,
'date-translator': None,
'replicated-fields': [],
}
dt = None
# This is a placeholder; at the moment there is no separate dt_perforce
# class to use, but eventually there will be.
dt_p4 = None
rid = None
p4 = None
# Error object for fatal errors raised by the replicator.
error = 'P4DTI Replicator error'
# The action_table maps the (dt_action, p4_action) to the action that the
# replicator should take: 'normal' means apply the normal decision rules;
# 'conflict' means that the entities are already marked as conflicting; a
# conflict should be reported if entities have changed; 'p4' means that the
# issue should be overwritten with the job; 'dt' means that the job should
# be overwritten with the issue.
action_table = {
( 'replicate', 'replicate' ): 'normal',
( 'wait', 'wait' ): 'conflict',
( 'wait', 'keep' ): 'p4',
( 'wait', 'discard' ): 'dt',
( 'keep', 'wait' ): 'dt',
( 'keep', 'discard' ): 'dt',
( 'discard', 'wait' ): 'p4',
( 'discard', 'keep' ): 'p4',
}
def __init__(self, rid, dt, p4, config = {}):
assert isinstance(rid, types.StringType)
assert isinstance(dt, defect_tracker)
assert isinstance(config, types.DictType)
if not re.match('^[A-Za-z_][A-Za-z_0-9]*$', rid):
raise self.error, \
("Replicator identifier must consist of letters, "
"numbers and underscores only.")
self.dt = dt
# This is a placeholder. Eventually there will be a real defect
# tracker interface to Perforce.
self.dt_p4 = defect_tracker(rid,'dummy')
self.rid = rid
self.p4 = p4
# Replicator ids must match.
if self.rid != self.dt.rid:
raise self.error, \
("Replicator's RID '%s' doesn't match defect tracker's "
"RID ('%s').", self.rid, self.dt.rid)
# Merge the supplied config with the default config (the former takes
# precedence).
for k in config.keys():
self.config[k] = config[k]
# Make a counter name for this replicator.
if not self.config['counter']:
self.config['counter'] = 'P4DTI-%s' % self.rid
# changed_entities(). Return a 3-tuple consisting of (a) changed jobs
# (b) changed changelists and (c) the last log entry that was considered.
# The changed jobs are those that are due for replication by this
# replicator (that is, the P4DTI-rid field of the job matches the
# replicator id). The last log entry will be passed to
# mark_changes_done.
def changed_entities(self):
# Get all entries from the log since the last time we updated the
# counter.
log_entries = self.p4.run('logger -t %s' % self.config['counter'])
jobs = {}
changelists = []
last_log_entry = None # The last entry number in the log.
for e in log_entries:
last_log_entry = int(e['sequence'])
if e['key'] == 'job':
jobname = e['attr']
if not jobs.has_key(jobname):
job = self.job(jobname)
if (job.has_key('P4DTI-rid')
and job['P4DTI-rid'] == self.rid
# We ought to make sure not to return jobs that were
# last updated by the replicator, by looking at the
# P4DTI-user field in each job. But this doesn't work
# yet: see job000014.
# and job.has_key('P4DTI-user')
# and job['P4DTI-user']!=self.p4.config['p4-user']
):
jobs[jobname] = job
elif e['key'] == 'change':
change_number = e['attr']
try:
changelist = self.p4.run('change -o %s' % change_number)[0]
changelists.append(changelist)
except p4.error:
# The changelist might not exist, because it might have
# been a pending changelist that's been renumbered. So
# don't replicate it. Should it be deleted? GDR
# 2000-11-02.
pass
return jobs, changelists, last_log_entry
# mark_changes_done(log_entry). Update the Perforce database to
# record the fact that the replicator has replicated all changes up to
# log_entry.
def mark_changes_done(self, log_entry):
assert log_entry == None or isinstance(log_entry, types.IntType)
# Update counter to last entry number in the log that we've replicated.
# If this is the last entry in the log, it has the side-effect of
# deleting the log (see "p4 help undoc").
if log_entry:
self.p4.run('logger -t %s -c %d' % (self.config['counter'], log_entry))
# check_consistency(). Run a consistency check on the two databases,
# reporting any inconsistencies.
def check_consistency(self):
print "Checking consistency for replicator '%s'" % self.rid
inconsistencies = 0 # Number of inconsistencies found.
# Get hashes of issues (by id) and jobs (by jobname).
issues = {}
for issue in self.dt.all_issues():
issues[issue.id()] = issue
jobs = {}
for j in self.p4.run('jobs -e P4DTI-rid=%s' % self.rid):
jobs[j['Job']] = j
for id, issue in issues.items():
# Progress indication.
sys.stdout.write(".")
# Report if issue has no corresponding job.
if issue.rid() != self.rid:
if issue.replicate_p():
print "Issue '%s' should be replicated but is not." % id
inconsistencies = inconsistencies + 1
continue
jobname = issue.readable_name()
if not jobs.has_key(jobname):
print("Issue '%s' should be replicated to job '%s' but that "
"job either does not exists or is not replicated."
% (id, jobname))
inconsistencies = inconsistencies + 1
continue
# Get corresponding job.
job = jobs[jobname]
del jobs[jobname]
# Report if mapping is in error.
if job['P4DTI-issue-id'] != id:
print("Issue '%s' is replicated to job '%s' but that job "
"is replicated to issue '%s'."
% (id, jobname, job['P4DTI-issue-id']))
inconsistencies = inconsistencies + 1
# Report if actions aren't replicate/replicate.
issue_action = issue.action()
job_action = job['P4DTI-action']
if not self.action_table.has_key((issue_action, job_action)):
print("Issue '%s' has action '%s' and job '%s' has action "
"'%s': this combination is illegal."
% (id, issue_action, jobname, job_action))
inconsistencies = inconsistencies + 1
else:
action = self.action_table[(issue_action, job_action)]
if action != 'normal':
print("Issue '%s' has action '%s' and job '%s' has action "
"'%s'." % (id, issue_action, jobname, job_action))
inconsistencies = inconsistencies + 1
# Report if job and issue don't match.
changes = self.translate_issue_dt_to_p4(issue, job)
if changes:
print("Job '%s' would need the following set of changes in "
"order to match issue '%s': %s."
% (jobname, id, str(changes)))
inconsistencies = inconsistencies + 1
# Report if the sets of filespecs differ.
p4_filespecs = self.job_filespecs(job)
dt_filespecs = issue.filespecs()
for p4_filespec, dt_filespec in self.filespecs_differences(dt_filespecs, p4_filespecs):
if p4_filespec and not dt_filespec:
print("Job '%s' has associated filespec '%s' but there is "
"no corresponding filespec for issue '%s'."
% (jobname, p4_filespec, id))
inconsistencies = inconsistencies + 1
elif not p4_filespec and dt_filespec:
print("Issue '%s' has associated filespec '%s' but there "
"is no corresponding filespec for job '%s'."
% (id, dt_filespec.name(), jobname))
inconsistencies = inconsistencies + 1
else:
# Corresponding filespecs can't differ (since their only
# attribute is their name).
assert 0
# Report if the sets of fixes differ.
p4_fixes = self.job_fixes(job)
dt_fixes = issue.fixes()
for p4_fix, dt_fix in self.fixes_differences(dt_fixes, p4_fixes):
if p4_fix and not dt_fix:
print("Change %s fixes job '%s' but there is "
"no corresponding fix for issue '%s'."
% (p4_fix['Change'], jobname, id))
inconsistencies = inconsistencies + 1
elif not p4_fix and dt_fix:
print("Change %d fixes issue '%s' but there is "
"no corresponding fix for job '%s'."
% (dt_fix.change(), id, jobname))
inconsistencies = inconsistencies + 1
else:
print("Change %s fixes job '%s' with status '%s', but "
"change %d fixes issue '%s' with status '%s'."
% (p4_fix['Change'], jobname, p4_fix['Status'],
dt_fix.change(), id, dt_fix.status()))
inconsistencies = inconsistencies + 1
# There should be no remaining jobs, so any left are in error.
for job in jobs.values():
if issues[job['P4DTI-issue-id']]:
print("Job '%s' is marked as being replicated to issue '%s' "
"but that issue is being replicated to job '%s'."
% (job['Job'], job['P4DTI-issue-id'],
issues[job['P4DTI-issue-id']].readable_name()))
inconsistencies = inconsistencies + 1
else:
print("Job '%s' is marked as being replicated to issue '%s' "
"but that issue either doesn't exist or is not being "
"replicated by this replicator."
% (job['Job'], job['P4DTI-issue-id']))
inconsistencies = inconsistencies + 1
# Report on success/failure.
print
print "Consistency check completed."
print len(issues), "issues checked."
if inconsistencies == 0:
print "Looks all right to me."
elif inconsistencies == 1:
print "1 inconsistency found."
else:
print inconsistencies, "inconsistencies found."
# fixes_differences(dt_fixes, p4_fixes). Each argument is a list of fixes
# for the same job/issue. Return list of pairs (p4_fix, dt_fix) of
# corresponding fixes which differ. Elements of pairs are None where there
# is no corresponding fix.
def fixes_differences(self, dt_fixes, p4_fixes):
assert isinstance(dt_fixes, types.ListType)
assert isinstance(p4_fixes, types.ListType)
# Make hash from change number to p4 fix.
p4_fix_by_change = {}
for p4_fix in p4_fixes:
assert isinstance(p4_fix, types.DictType)
p4_fix_by_change[int(p4_fix['Change'])] = p4_fix
# Make pairs (dt fix, corresponding p4 fix or None).
pairs = []
for dt_fix in dt_fixes:
assert isinstance(dt_fix, defect_tracker_fix)
if not p4_fix_by_change.has_key(dt_fix.change()):
pairs.append((None, dt_fix))
else:
p4_fix = p4_fix_by_change[dt_fix.change()]
del p4_fix_by_change[dt_fix.change()]
if dt_fix.status() != p4_fix['Status']:
pairs.append((p4_fix, dt_fix))
# Remaining p4 fixes are unpaired.
for p4_fix in p4_fix_by_change.values():
pairs.append((p4_fix, None))
return pairs
# filespecs_differences(dt_filespecs, p4_filespecs). Each argument is a
# list of filespecs for the same job/issue. Return list of pairs
# (p4_filespec, dt_filespec) of filespecs which differ. Elements of pairs
# are None where there is no corresponding filespec (this is always the
# case since there is no associated information with a filespec; the
# function is like this for consistency with fixes_differences, and so that
# it is easy to extend if there is ever a way to associate information with
# a filespec, for example the nature of the association -- see requirement
# 55).
def filespecs_differences(self, dt_filespecs, p4_filespecs):
assert isinstance(dt_filespecs, types.ListType)
assert isinstance(p4_filespecs, types.ListType)
# Make hash from name to p4 filespec.
p4_filespec_by_name = {}
for p4_filespec in p4_filespecs:
assert isinstance(p4_filespec, types.StringType)
p4_filespec_by_name[p4_filespec] = p4_filespec
# Make pairs (dt filespec, None).
pairs = []
for dt_filespec in dt_filespecs:
assert isinstance(dt_filespec, defect_tracker_filespec)
if not p4_filespec_by_name.has_key(dt_filespec.name()):
pairs.append((None, dt_filespec))
else:
del p4_filespec_by_name[dt_filespec.name()]
# Make pairs (None, p4 filespec).
for p4_filespec in p4_filespec_by_name.values():
pairs.append((p4_filespec, None))
return pairs
# init(). Set up Perforce and the defect tracking system so that
# replication can proceed.
def init(self):
# Check that the Perforce server version is supported by the
# integration.
server_version_re = re.compile('Server version: '
'[^/]+/[^/]+/[^/]+/([0-9]+)')
changelevel = 0
supported_changelevel = 18548
for x in self.p4.run('info'):
if x.has_key('code') and x['code'] == 'info' and x.has_key('data'):
match = server_version_re.match(x['data'])
if match:
changelevel = int(match.group(1))
if changelevel < supported_changelevel:
raise self.error, \
("Perforce server changelevel %d is not "
"supported by P4DTI. Server must be at "
"changelevel %d or above"
% (changelevel, supported_changelevel))
else:
break
if not changelevel:
raise self.error, "p4 info didn't report a recognisable version"
# Initialize the defect tracking system.
self.dt.init()
# Make a client for the replicator.
self.p4.run('client -i', self.p4.run('client -o'))
# TODO: Initialize Perforce by adding fields to jobspec if not present.
# For the moment I can't add fields to the jobspec, so I'll just check
# that they are there. I can't even check the presence of the
# P4DTI-user field this way, since it isn't given a value for a
# non-existent job like the one we're asking for here.
job = self.job('P4DTI-no-such-job')
for field in ['P4DTI-filespecs', 'P4DTI-issue-id', 'P4DTI-rid',
'P4DTI-action']:
if not job.has_key(field):
raise self.error, ("Field '%s' not found in Perforce jobspec"
% field)
# Has the logger been started? (We must be careful not to set the
# logger counter to 0 more than once; this will confuse Perforce
# according to Chris Seiwald's e-mail .
logger_started = 0
logger_re = re.compile('logger = ([0-9]+)$')
counters = self.p4.run('counters')
for c in counters:
if (c.has_key('counter') and c['counter'] == 'logger'):
logger_started = 1
# If not, start it.
if not logger_started:
self.p4.run('counter logger 0')
# job(jobname). Return the Perforce job with the given name if it exists,
# or an empty job specification (otherwise).
def job(self, jobname):
assert isinstance(jobname, types.StringType)
jobs = self.p4.run('job -o %s' % jobname)
if len(jobs) != 1 or not jobs[0].has_key('Job'):
raise self.error, ("Expected a job but found %s. "
"Perforce server down?" % str(jobs))
elif jobs[0]['Job'] != jobname:
raise self.error, ("Asked for job '%s' but got job '%s'."
% (jobname, jobs[0]['Job']))
else:
return jobs[0]
# job_filespecs(job). Return a list of filespecs for the given job. Each
# element of the list is a filespec, as a string.
def job_filespecs(self, job):
assert isinstance(job, types.DictType)
filespecs = string.split(job['P4DTI-filespecs'], '\n')
# Since Perforce text fields are terminated with a newline, the last
# item of the list should be empty. Remove it.
if filespecs:
if filespecs[-1] != '':
raise self.error, ("P4DTI-filespecs field '%s' does not end "
"in a newline." % job['P4DTI-filespecs'])
filespecs = filespecs[:-1]
return filespecs
# job_fixes(job). Return a list of fixes for the given job. Each element
# of the list is a dictionary with keys Change, Client, User, Job, and
# Status.
def job_fixes(self, job):
assert isinstance(job, types.DictType)
return self.p4.run('fixes -j %s' % job['Job'])
# log(format, arguments, priority). Write the message to the replicator's
# log.
def log(self, format, arguments = (), priority = None):
assert isinstance(format, types.StringType)
assert priority == None
format = "%s\t" + format
if isinstance(arguments, types.TupleType):
arguments = (self.rid,) + arguments
else:
arguments = (self.rid, arguments)
self.config['logger'].log("P4DTI-0", format, arguments, priority)
# mail(to, subject, body). Send e-mail to the given recipient
# integration with the given subject and body.
def mail(self, to, subject, body):
assert isinstance(to, types.StringType)
assert isinstance(subject, types.StringType)
assert isinstance(body, types.StringType)
if self.config['administrator-address'] and self.config['smtp-server']:
self.log("Mailing '%s' re: '%s'", (to, subject))
smtp = smtplib.SMTP(self.config['smtp-server'])
if to == self.config['administrator-address']:
cc = ''
else:
cc = self.config['administrator-address']
message = ("From: %s\n"
"To: %s\n"
"Cc: %s\n"
"Subject: %s\n\n"
"%s"
% (self.config['replicator-address'], to, cc,
subject, body))
smtp.sendmail(self.config['replicator-address'], to, message)
smtp.quit()
# conflict(issue, job, message). Report that the given issue conflicts
# with the given job. The message argument is a string containing
# additional detail about the conflict. Mark the issue and job as
# conflicting if they are not already.
def conflict(self, issue, job, message):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
assert isinstance(message, types.StringType)
try:
# We can't just update the issue in hand because that might have
# changed in the course of replicating some fields. So fetch it
# again. This is quite unsatisfactory: we should keep better track
# of the old and new versions of the issue. GDR 2000-10-27.
old_issue = self.dt.issue(issue.id())
old_issue.update_action('wait')
# The same problem applies to the job. GDR 2000-10-27.
old_job = self.job(job['Job'])
self.update_job_action(old_job, 'wait')
finally:
self.log("Issue '%s' conflicts with corresponding job '%s'. %s",
(issue.readable_name(), job['Job'], message))
subject = ("Issue '%s' conflicts with corresponding job '%s'."
% (issue.readable_name(), job['Job']))
body = ("%s\n\n%s\n\nIssue:\n%s\n\nJob:\n%s"
% (subject, message, issue, job))
self.mail(self.config['administrator-address'], subject, body)
# conflict_policy(issue, job). This method is called when both the issue
# and the corresponding job have changed since the last time they were
# consistent. Return 'p4' if the Perforce job is correct and should be
# replicated to the defect tracker. Return 'dt' if the defect tracking
# issue is correct and should be replicated to Perforce. Return 'none' if
# the replicator should take no further action. Any other result indicates
# that the replicator should treat the situation as a conflict and proceed
# accordingly.
#
# The default policy is to return 'dt'. This is because we're treating the
# Perforce jobs database as a scratch copy of the real data in the defect
# tracker. So when there's a conflict the defect tracker is correct. See
# job000102 for details.
def conflict_policy(self, issue, job):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
return 'dt'
# poll(). Poll the DTS for changed issues. Poll Perforce for changed jobs
# and changelists. Replicate all of these entities.
def poll(self):
# Get the changed issues (ignore changed changelists if any since we
# only replicate changelists from Perforce to the defect tracker).
changed_issues, _, dt_marker = self.dt.changed_entities()
if len(changed_issues) == 1:
self.log('1 issue has changed')
elif len(changed_issues) > 1:
self.log('%d issues have changed', len(changed_issues))
changed_jobs, changelists, p4_marker = self.changed_entities()
if len(changed_jobs) == 1:
self.log('1 job has changed')
elif len(changed_jobs) > 1:
self.log('%d jobs have changed', len(changed_jobs))
# Replicate the issues and the jobs.
self.replicate_many(changed_issues, changed_jobs)
# Replicate the affected changelists.
for c in changelists:
self.replicate_changelist_p4_to_dt(c)
# Tell the defect tracker and Perforce that we've finished replicating
# these changes.
self.dt.mark_changes_done(dt_marker)
self.mark_changes_done(p4_marker)
# replicate_all_dt_to_p4(). Go through all the issues in the defect
# tracker, set them up for replication if necessary, and replicate them to
# Perforce.
def replicate_all_dt_to_p4(self):
issues = self.dt.all_issues()
self.replicate_many(issues, {})
def replicate_changelist_p4_to_dt(self, changelist):
assert isinstance(changelist, types.DictType)
change = int(changelist['Change'])
client = changelist['Client']
date = self.config['date-translator'].translate_1_to_0(changelist['Date'], self.dt, self.dt_p4)
description = changelist['Description']
status = changelist['Status']
user = self.config['user-translator'].translate_1_to_0(changelist['User'], self.dt, self.dt_p4)
if self.dt.replicate_changelist(change, client, date, description, status, user):
self.log("Replicated changelist %d", change)
# replicate_many(issues, jobs). Replicate the issues and jobs. The issues
# argument is a list of issues (which must belong to a subclass of
# defect_tracker_issue; the jobs list is a hash from jobname to job).
#
# The reason why the arguments have different conventions (list vs hash) is
# that the algorithm for getting the changed jobs from the p4 logger outpt
# involves constructing a hash from jobname to job, and it seems silly to
# turn this hash back into a list only to immediately turn it back into a
# hash again.
def replicate_many(self, issues, jobs):
assert isinstance(issues, types.ListType)
assert isinstance(jobs, types.DictType)
# Make a list of triples of (defect tracking issue, Perforce job,
# changed). Changed is 'dt' if the defect tracking issue has changed
# but not the Perforce job; 'p4' if vice versa; 'both' if both have
# changed. TODO: mitigate effects of race conditions?
triples = []
# Go through issues making triples. Set up issues for replication if
# they are not already replicated. Delete corresponding jobs from the
# jobs dictionary.
for issue in issues:
assert isinstance(issue, defect_tracker_issue)
# Issue not set up for replication yet?
if not issue.rid():
# Should issue be replicated by this replicator?
if issue.replicate_p():
issue.setup_for_replication()
self.log("Set up issue '%s' to replicate to job '%s'",
(issue.id(), issue.readable_name()))
else:
# Don't replicate this issue at all.
continue
jobname = issue.readable_name()
if jobs.has_key(jobname):
job = jobs[jobname]
triples.append((issue, job, 'both'))
del jobs[jobname]
else:
job = self.job(jobname)
triples.append((issue, job, 'dt'))
# Now go through the remaining changed jobs.
for job in jobs.values():
assert isinstance(job, types.DictType)
issue = self.dt.issue(job['P4DTI-issue-id'])
if not issue:
raise self.error, "No issue '%s'" % job['P4DTI-issue-id']
triples.append((issue, job, 'p4'))
# Now process each triple.
for issue, job, changed in triples:
self.replicate(issue, job, changed)
# replicate(issue, job, changed). Replicate an issue to or from the
# corresponding job. The changed argument is 'dt' if the defect tracking
# issue has changed but not the Perforce job; 'p4' if vice versa; 'both' if
# both have changed.
#
# Basically this method is a series of conditions that end in one of the
# following cases:
#
# 1. Replicate the issue to the job or vice versa (the normal mode of
# operation).
#
# 2. Overwrite the job with the issue or vice versa (if the actions say to
# do so, or if they have both changed and the conflict policy says to do
# do). This is just like replication, except that the old version of the
# overwritten entity gets mailed to its owner as a record in case data was
# lost.
#
# 3. Report a conflict (if the actions say to do so, or if both have
# changed and the conflict policy says to do so.) This sends e-mail to the
# administrator.
#
# 4. Do nothing (if both have changed and the conflict policy says to do
# nothing).
#
# 5. Revert the job from the issue (if we tried to replicate the job to the
# issue but it failed, probably due to lack of privileges or invalid data).
def replicate(self, issue, job, changed):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
assert changed in ['dt','p4','both']
issuename = issue.readable_name()
jobname = job['Job']
# Figure out what to do with this issue and job. Report a conflict?
# Do nothing? Overwrite issue with job? Overwrite job with issue?
# The action arguments may tell us what to do immediately.
issue_action = issue.action()
job_action = job['P4DTI-action']
if not self.action_table.has_key((issue_action, job_action)):
self.conflict(issue, job,
"Issue '%s' has action '%s' and job '%s' has "
"action '%s': this combination is illegal."
% (issuename, issue_action, jobname, job_action))
return
action = self.action_table[(issue_action, job_action)]
# Action 'conflict' means that there was already a conflict between the
# job and the issue, and now one or both have changed. Report this,
# since the administrator may need to know what's been happening to
# these entities in order to resolve the conflict.
if action == 'conflict':
if changed == 'dt':
self.conflict(issue, job, "Existing conflict; issue changed.")
elif changed == 'p4':
self.conflict(issue, job, "Existing conflict; job changed.")
else:
assert changed == 'both'
self.conflict(issue, job, "Existing conflict; both changed.")
# Action 'p4' means Perforce is correct: overwrite the defect tracker.
elif action == 'p4':
reason = ("The issue's action is '%s' and the job's action is '%s'"
% (issue_action, job_action))
self.overwrite_issue_p4_to_dt(issue, job, reason)
# Action 'dt' means the defect tracker is correct: overwrite Perforce.
elif action == 'dt':
reason = ("The issue's action is '%s' and the job's action is '%s'"
% (issue_action, job_action))
self.overwrite_issue_dt_to_p4(issue, job, reason)
# Action 'normal' means that the ordinary decision procedure should
# apply, based on whether the issue, or the job, or both have changed
# since the last time we replicated.
else:
assert action == 'normal'
# Only the defect tracker issue has changed.
if changed == 'dt':
self.log("Replicating issue '%s' to job '%s'",
(issuename, jobname))
self.replicate_issue_dt_to_p4(issue, job)
# Only the Perforce job has changed.
elif changed == 'p4':
self.log("Replicating job '%s' to issue '%s'",
(jobname, issuename))
try:
self.replicate_issue_p4_to_dt(issue, job)
except:
self.revert_issue_dt_to_p4(issue, job)
# Both have changed. Apply the conflict resolution policy.
else:
assert changed == 'both'
self.log("Issue '%s' and job '%s' have both changed. "
"Consulting conflict resolution policy.",
(issuename, jobname))
action = self.conflict_policy(issue, job)
if action == 'none':
self.log("Conflict resolution policy decided: no action.")
elif action == 'dt':
reason = ("Issue and job both changed. The conflict "
"resolution policy decided to overwrite the "
"job with the issue.")
self.overwrite_issue_dt_to_p4(issue, job, reason)
elif action == 'p4':
reason = ("Issue and job both changed. The conflict "
"resolution policy decided to overwrite the "
"issue with the job.")
self.overwrite_issue_dt_to_p4(issue, job, reason)
else:
self.conflict(issue, job, "Both changed.")
# revert_issue_dt_to_p4(self, issue, job). This is called when an error
# has occurred in replicating from Perforce to the defect tracker. The
# most likely reason for this is a privilege failure (the user is not
# allowed to edit that issue in that way) or a failure to put valid values
# in the job fields. In this case, set the job back to a copy of the
# issue.
def revert_issue_dt_to_p4(self, issue, job):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
error_message = string.join(apply(traceback.format_exception,
sys.exc_info()), '')
issuename = issue.readable_name()
jobname = job['Job']
subject = ("Job '%s' could not be replicated to issue '%s'"
% (jobname, issuename))
self.log("%s: %s", (subject, error_message))
try:
# Get the issue again, since it might have been changed in memory
# in the course of the failed replication. Note new variable name
# so as not to overwrite the old issue. (Can we avoid all this
# nonsense by keeping better track of old and new issues?) GDR
# 2000-10-31.
issue_2 = self.dt.issue(issue.id())
if not issue_2:
raise self.error, ("Issue '%s' not found." % issue.id())
reason = ("%s, because the following error occurred:\n\n%s"
% (subject, error_message))
self.overwrite_issue_dt_to_p4(issue_2, job, reason)
except:
# Replicating back to Perforce failed. Report both errors to the
# administrator.
error_message_2 = string.join(apply(traceback.format_exception,
sys.exc_info()), '')
self.log("%s: %s", (subject, error_message_2))
body = ("%s, because the following error occurred:\n\n%s.\n\n"
"The replicator attempted to restore the job to a copy of "
"the issue, but this failed too, because the following "
"error occurred:\n\n%s\n\nThe replicator has now given up."
% (subject, error_message_1, error_message_2))
email = (self.user_email_addess(job[self.config['job-owner-field']])
or self.config['administrator-address'])
self.mail(email, subject, body)
# overwrite_issue_p4_to_dt(self, issue, job, reason). As
# replicate_issue_p4_to_dt, but e-mails an old copy of the issue to the
# owner of the job and the administrator. The reason argument is a string
# given a reason for the overwriting. Return true if the replication
# was successful, otherwise throw an exception.
def overwrite_issue_p4_to_dt(self, issue, job, reason):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
assert isinstance(reason, types.StringType)
issuename = issue.readable_name()
jobname = job['Job']
self.log("Overwrite issue '%s' with job '%s' (%s)",
(issuename, jobname, reason))
self.replicate_issue_p4_to_dt(issue, job)
subject = "Issue '%s' overwritten by job '%s'" % (issuename, jobname)
body = ("%s.\n\n%s\n\n"
"The issue looked like this before being overwritten:\n\n%s"
% (subject, reason, issue))
email = (self.user_email_addess(job[self.config['job-owner-field']])
or self.config['administrator-address'])
self.mail(email, subject, body)
return 1
# overwrite_issue_dt_to_p4(self, issue, job, reason). As
# replicate_issue_dt_to_p4, but e-mails an old copy of the issue to the
# owner of the job and the administrator. Return true if the replication
# was successful, otherwise throw an exception.
def overwrite_issue_dt_to_p4(self, issue, job, reason):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
assert isinstance(reason, types.StringType)
issuename = issue.readable_name()
jobname = job['Job']
self.log("Overwrite job '%s' with issue '%s' (%s)",
(jobname, issuename, reason))
self.replicate_issue_dt_to_p4(issue, job)
subject = "Job '%s' overwritten by issue '%s'" % (jobname, issuename)
body = ("%s.\n\n%s\n\n"
"The job looked like this before being overwritten:\n\n%s"
% (subject, reason, job))
email = self.user_email_address(job['Owner'])
if email:
self.mail(email, subject, body)
return 1
# replicate_issue_dt_to_p4(issue, old_job). Replicate the given issue from
# the defect tracker to Perforce. Return true if the issue was replicated
# successfully. Otherwise throw an exception.
def replicate_issue_dt_to_p4(self, issue, job):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
# Transform the issue into a job. This has to be done first because
# the job might be new, and we won't be able to replicate fixes or
# filespecs until the job's been created (p4 fix won't accept
# non-existent jobnames). I suppose I could create a dummy job to act
# as a placeholder here, but that's not easy at all -- you have to know
# quite a lot about the jobspec to be able to create a job.
changes = self.translate_issue_dt_to_p4(issue, job)
if changes:
self.log("-- Changed fields: %s", `changes`)
self.update_job(job, changes)
else:
self.log("-- No issue fields were replicated.")
# Replicate filespecs.
dt_filespecs = issue.filespecs()
p4_filespecs = self.job_filespecs(job)
if self.filespecs_differences(dt_filespecs, p4_filespecs):
names = map(lambda(f): f.name(), dt_filespecs)
self.update_job(job, { 'P4DTI-filespecs': string.join(names,'\n') })
self.log("-- Filespecs changed to '%s'", string.join(names))
# Replicate fixes.
p4_fixes = self.job_fixes(job)
dt_fixes = issue.fixes()
job_status = job[self.config['job-status-field']]
for p4_fix, dt_fix in self.fixes_differences(dt_fixes, p4_fixes):
if p4_fix and not dt_fix:
self.p4.run('fix -d -c %s %s'
% (p4_fix['Change'], p4_fix['Job']))
self.log("-- Deleted fix for change %s", p4_fix['Change'])
elif not p4_fix and dt_fix:
self.p4.run('fix -s %s -c %d %s'
% (dt_fix.status(), dt_fix.change(),
issue.readable_name()))
job_status = dt_fix.status()
self.log("-- Added fix for change %d with status %s",
(dt_fix.change(), dt_fix.status()))
elif p4_fix['Status'] != dt_fix.status():
self.p4.run('fix -s %s -c %d %s'
% (dt_fix.status(), dt_fix.change(),
issue.readable_name()))
job_status = dt_fix.status()
self.log("-- Fix for change %d updated to status %s",
(dt_fix.change(), dt_fix.status()))
else:
# This should't happen, since fixes_differences returns only a
# list of pairs which differ.
assert 0
# It might be the case that the job status has been changed by
# replicating a fix from the defect tracker. But this changed status
# won't be right. So restore the correct status if necessary.
if job_status != job[self.config['job-status-field']]:
self.update_job(job, { 'Status': job[self.config['job-status-field']] })
# Job and issue are up to date.
issue.update_action('replicate')
self.update_job_action(job, 'replicate')
return 1
# replicate_issue_p4_to_dt(issue, job). Replicate the given job from
# Perforce to the defect tracker. Return true if the job was replicated
# successfully. Otherwise, throw an exception.
def replicate_issue_p4_to_dt(self, issue, job):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
# Replicate fixes.
p4_fixes = self.job_fixes(job)
dt_fixes = issue.fixes()
fix_diffs = self.fixes_differences(dt_fixes, p4_fixes)
for p4_fix, dt_fix in fix_diffs:
self.log("p4_fix = %s", (p4_fix,))
if dt_fix and not p4_fix:
dt_fix.delete()
self.log("-- Deleted fix for change %d", dt_fix.change())
elif not dt_fix:
change, client, date, status, user = self.translate_fix_p4_to_dt(p4_fix)
issue.add_fix(change, client, date, status, user)
self.log("-- Added fix for change %s with status %s",
(p4_fix['Change'], p4_fix['Status']))
elif dt_fix.status() != p4_fix['Status']:
change, client, date, status, user = self.translate_fix_p4_to_dt(p4_fix)
dt_fix.update(change, client, date, status, user)
self.log("-- Fix for change %s updated to status %s",
(p4_fix['Change'], p4_fix['Status']))
else:
# This should't happen, since fixes_differences returns only a
# list of pairs which differ.
assert 0
# Replicate filespecs.
p4_filespecs = self.job_filespecs(job)
dt_filespecs = issue.filespecs()
filespec_diffs = self.filespecs_differences(dt_filespecs, p4_filespecs)
for p4_filespec, dt_filespec in filespec_diffs:
if dt_filespec and not p4_filespec:
dt_filespec.delete()
self.log("-- Deleted filespec %s", dt_filespec.name())
elif not dt_filespec:
issue.add_filespec(p4_filespec)
self.log("-- Added filespec %s", p4_filespec)
else:
# This should't happen, since filespecs_differences returns
# only a list of pairs which differ.
assert 0
# Transform the job into an issue and update the issue.
changes = self.translate_issue_p4_to_dt(issue, job)
if changes:
self.log("-- Changed fields: %s", repr(changes))
user = self.config['user-translator'].translate_1_to_0(job['P4DTI-user'], self.dt, self.dt_p4)
issue.update(user, changes)
else:
self.log("-- No job fields were replicated.")
# The issue may have changed as a consequence of updating it. For
# example, in TeamTrack the issue's owner changes when an issue goes
# through a transition. So we fetch the issue again, check for changes
# and replicate them back to the job if we find them. See job000053.
new_issue = self.dt.issue(issue.id())
new_changes = self.translate_issue_dt_to_p4(new_issue, job)
if new_changes:
self.log("-- Defect tracker made changes as a result of "
"the update: %s", repr(new_changes))
self.update_job(job, new_changes)
# Job and issue are up to date.
new_issue.update_action('replicate')
self.update_job_action(job, 'replicate')
return 1
def replicate_changelists(self):
# Replicate all the changelists.
self.log("Checking changelists to see if they need replicating...")
changelists = self.p4.run('changes')
self.log("-- %d changelists to check", len(changelists))
for c in changelists:
c2 = self.p4.run('change -o %s' % c['change'])[0]
self.replicate_changelist_p4_to_dt(c2)
# run() repeatedly polls the DTS.
def run(self):
while 1:
self.log("Polling...")
try:
self.poll()
except AssertionError:
# Assertions indicate severe bugs in the replicator. It might
# cause serious data corruption if we continue. We also want
# these failures to be reported, and they might go unreported
# if the replicator carried on going.
raise
except KeyboardInterrupt:
# Allow people to stop the replicator with Control-C.
raise
except:
error_message = string.join(apply(traceback.format_exception,
sys.exc_info()), '')
subject = "The replicator failed to poll successfully"
self.log("%s: %s", (subject, error_message))
body = ("%s, because the following error occurred:\n\n%s"
% (subject, error_message))
self.mail(self.config['administrator-address'], subject,
body)
time.sleep(self.config['poll-period'])
# translate_fix_p4_to_dt(p4_fix).
def translate_fix_p4_to_dt(self, p4_fix):
assert isinstance(p4_fix, types.DictType)
change = int(p4_fix['Change'])
client = p4_fix['Client']
date = self.config['date-translator'].translate_1_to_0(p4_fix['Date'], self.dt, self.dt_p4)
status = p4_fix['Status']
user = self.config['user-translator'].translate_1_to_0(p4_fix['User'], self.dt, self.dt_p4)
return (change, client, date, status, user)
# translate_issue_dt_to_p4(issue, job). Return changes as a dictionary but
# don't apply them yet.
def translate_issue_dt_to_p4(self, issue, job):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
changes = { }
# Do the P4DTI fields need to be changed? If so, record in changes.
for key, value in [('P4DTI-rid', self.rid),
('P4DTI-issue-id', issue.id()),
('P4DTI-action', 'replicate')]:
if job[key] != value:
changes[key] = value
# What about the replicated fields?
for dt_field, p4_field, trans in self.config['replicated-fields']:
p4_value = trans.translate_0_to_1(issue[dt_field], self.dt, self.dt_p4, issue, job)
if not job.has_key(p4_field) or p4_value != job[p4_field]:
changes[p4_field] = p4_value
return changes
# translate_issue_p4_to_dt(issue, job). Return changes as a dictionary but
# don't apply them yet.
def translate_issue_p4_to_dt(self, issue, job):
assert isinstance(issue, defect_tracker_issue)
assert isinstance(job, types.DictType)
changes = {}
for dt_field, p4_field, trans in self.config['replicated-fields']:
if job.has_key(p4_field):
p4_value = job[p4_field]
else:
p4_value = None
dt_value = trans.translate_1_to_0(p4_value, self.dt, self.dt_p4, issue, job)
if dt_value != issue[dt_field]:
changes[dt_field] = dt_value
return changes
# update_job(job, changes).
def update_job(self, job, changes = {}):
assert isinstance(job, types.DictType)
assert isinstance(changes, types.DictType)
for key, value in changes.items():
job[key] = value
self.p4.run('job -i', [job])
# update_job_action(job, action).
def update_job_action(self, job, action):
assert isinstance(job, types.DictType)
assert action in ['keep', 'discard', 'replicate', 'wait']
# Only update the job action if it's being replicated by this
# replicator. See job000020.
if job['P4DTI-rid'] == self.rid and job['P4DTI-action'] != action:
self.update_job(job, { 'P4DTI-action': action })
# Return the e-mail address of a Perforce user, or None if the address
# can't be found.
def user_email_address(self, user):
assert isinstance(user, types.StringType)
try:
return self.p4.run('user -o %s' % user)[0]['Email']
except:
return None