# dt-teamtrack.py -- defect tracking interface (TeamTrack). from replicator import Defect_Tracker, error import socket import teamtrack # The TeamTrack class implements a generic interface between the replicator and # TeamShare's "TeamTrack" defect tracker. Some configuration can be done by # passing a configuration hash to the constructor; for more advanced # configruation you should subclass this and replace some of the methods. The # configuration assumes that the teamTrack server is on the same host as the # replicator. class TeamTrack(Defect_Tracker): config = { 'server' : socket.gethostname(), 'user' : 'joe', 'password' : '', 'job-case-fields': [ ( 'Description', 'DESCRIPTION' ) ], } server = None def __init__(self, config = {}): Defect_Tracker.__init__(self, config) # Connect to the TeamTrack server. self.server = teamtrack.connect(self.config['user'], self.config['password'], self.config['server']) def changed_issues(self): query = "TS_P4DTI_RID = '%s' AND TS_LASTMODIFIEDDATE " \ "> TS_P4DTI_REPLICATED" % self.replicator.rid() issues = self.server.query(teamtrack.table['CASES'], query) changed_issues = {} for i in issues: changed_issues[i['id']] = i return changed_issues def init(self): # Fields to add to the TS_CASES table. new_cases_fields = [ { 'name': 'P4DTI_RID', 'type': teamtrack.field_type['TEXT'], 'length': 32, 'attributes': 1, # Text, not memo. 'description': "P4DTI replicator identifier" }, { 'name': 'P4DTI_SID', 'type': teamtrack.field_type['TEXT'], 'length': 32, 'attributes': 1, # Text, not memo. 'description': "P4DTI Perforce server identifier" }, { 'name': 'P4DTI_JOBNAME', 'type': teamtrack.field_type['TEXT'], # TODO: The width of the jobname field should be 1024, since # jobnames can be that long in Perforce. But the TeamShare API # doesn't allow me to create a text field with that width, and I # don't want to make it a memo field since I want to query it. # So for the moment it's only 255 characters wide. 'length': 255, 'attributes': 1, # Text, not memo. 'description': "P4DTI Perforce jobname" }, { 'name': 'P4DTI_REPLICATED', 'type': teamtrack.field_type['DATETIME'], 'length': 0, 'attributes': 0, # Int, not float. 'description': "P4DTI last replicated time" }, ] # Make a TS_CASES record so we can see if the new fields are already # present. case = self.server.new_record(teamtrack.table['CASES']) # Add each new field if not present. new_fields = {} for new_field in new_cases_fields: if not case.has_key(new_field['name']): self.log("Installing field '%s' in the TS_CASES table." % new_field['name']) f = self.server.new_record(teamtrack.table['FIELDS']) f['TABLEID'] = teamtrack.table['CASES'] f['NAME'] = new_field['name'] f['DBNAME'] = new_field['name'] f['FLDTYPE'] = new_field['type'] f['LEN'] = new_field['length'] f['ATTRIBUTES'] = new_field['attributes'] f['STATUS'] = 0 # Active, not deleted. f['PROPERTY'] = 1 # Not editable. f['DESCRIPTION'] = new_field['description'] f.add_field() new_fields[new_field['name']] = 1 if 1: # Previous installation was not up to date. We may need to fill in # some values in the new fields. if len(new_fields) != len(new_cases_fields): self.log("Partially installed the new fields in the " \ "TS_CASES table. Previous installation was in error.") else: self.log("Installed all new fields in the TS_CASES table.") # Set up cases for possible replication. cases = self.server.query(teamtrack.table['CASES'],'') for case in cases: if self.replicate_issue_p(case): fields = { 'P4DTI_RID': self.replicator.rid(), 'P4DTI_SID': self.replicator.sid(), 'P4DTI_JOBNAME': self.issue_jobname(case), 'P4DTI_REPLICATED': 0 } updated = 0 for f in fields.keys(): if 1: case[f] = fields[f] updated = 1 if updated: case.update() self.log("Case %d replicates to job '%s'" % (case['ID'], case['P4DTI_JOBNAME'])) def issue(self, issue_id): try: return self.server.read_record(teamtrack.table['CASES'], int(issue_id)) except teamtrack.tsapi_error: return None def issue_conflicting_p(self, issue): return issue['P4DTI_REPLICATED'] == -1 def issue_id(self, case): return case['id'] def issue_jobname(self, case): jobname = case['P4DTI_JOBNAME'] if jobname: return jobname else: return '%s_%s' % (self.replicator.rid(), str(case['id'])) def transform_issue_to_job(self, case, job): # Work out the set of proposed changes. changes = {} changes['Job'] = case['P4DTI_JOBNAME'] changes['P4DTI-rid'] = self.replicator.rid() changes['P4DTI-issue-id'] = str(case['id']) for job_field, case_field in self.config['job-case-fields']: changes[job_field] = case[case_field] # Copy the proposed changes over to the case if necessary. for key, value in changes.items(): if job[key] != value: job[key] = value else: del changes[key] # Return new job and the set of changes. return job, changes def transform_job_to_issue(self, job, case): # Work out the set of proposed changes. changes = {} changes['P4DTI_RID'] = self.replicator.rid() changes['P4DTI_SID'] = self.replicator.rid() changes['P4DTI_JOBNAME'] = job['Job'] for job_field, case_field in self.config['job-case-fields']: changes[case_field] = job[job_field] # Copy the proposed changes over to the case if necessary. for key, value in changes.items(): if case[key] != value: case[key] = value else: del changes[key] # Return new case and the set of changes. return case, changes def update_issue(self, case): case.update() def update_issue_status(self, case, status): if status == 'ok': replicated = case['LASTMODIFIEDDATE'] elif status == 'conflicting': replicated = -1 else: raise error, "Unknown status: '%s'" % status if case['P4DTI_REPLICATED'] != replicated: case['P4DTI_REPLICATED'] = replicated case.update()