# Perforce Defect Tracking Integration Project # # # TEST_P4DTI.PY -- TEST THE P4DTI # # Gareth Rees, Ravenbrook Limited, 2001-03-14 # # # 1. INTRODUCTION # # This test script tests the P4DTI. The initial plan is described in # [GDR 2000-12-31] and some of the design is given in [GDR 2001-03-14]. # # This script contains a lot of tricky code (for example, using the hostname or # operating system name to pick a module to import or a class to use; or # overriding a method in another module in order to snoop on its behaviour). # Please take care when editing it. # # It uses the PyUnit unit test framework [PyUnit]. # # The intended readership is project developers. # # This document is not confidential. # # The test cases depend on features of the test databases. It is # important that the cases be kept in sync with the test database and # with the design of those databases [GDR 2001-03-14]. When you add a # test, be sure to change the design and refer back to the test. # # # 1.1. Useful info # # The script creates a series of test Perforce servers (one for each test # case). When a test case starts, it kills all the running Perforce servers on # the required port. Each of these has its own directory, in the appropriate # place for temporary files for your system (typically /tmp on Unix, C:\Temp on # Windows). The P4DTI log is diverted from its real place and sent to the file # p4dti.log. # # # 1.2. Regression tests in this script # # The section * means that the defect is tested throughout as a simple # consequence of running the script; there is no particular test for it. # # Job Section Title # ---------------------------------------------------------------------- # job000003 6.3 It is not possible from the result of "p4 -G job -o # jobname" to tell whether a job has been returned, or # an error message # job000004 * "p4 -G jobspec -o" doesn't work # job000005 * TeamTrack integration may fail mysteriously on startup # in future releases of TeamTrack # job000037 6 Consistency checker script is inadequate # job000047 8.1 Historical bugs are replicated but uninteresting # job000048 6 Consistency checking of the configuration is # inadequate # job000050 8.1 There's no way to re-start replication # job000053 10 Implicit changes made by the DT don't get replicated # back # job000057 * Special characters in single-select keywords make the # replicator barf # job000075 6 No automatic check of configuration # job000107 9 You can't replicate an issue's project # job000111 7 Replicator destroys the jobspec that exists before # it's installed # job000112 9 Can't easily replicate by project # job000116 6 Bugzilla integration doesn't do enough checking # job000140 6.3 Logical field name "code" not allowed in TeamTrack # job000158 6.2 Obscure error if Perforce can't be reached or # p4_client_executable is wrong # job000168 6.2 Too easy to set dbms_port to a string # job000169 11 Change numbers are links in TeamTrack even when no # changelist URL has been specified # job000170 6.2 Replicator may be unable to send e-mail if the default # replicator_address is unchanged # job000181 * Assertion failure in translate_1_to_0 # job000182 10 Elapsed time fields aren't replicated properly # job000190 3.1 Connection to TeamTrack hangs for several minutes # job000199 4.5 Auxiliary scripts send e-mail # job000202 6.2 Errors from Perforce not reported well # job000219 7 Existing jobs in Perforce may become unusable when the # jobspec is changed # job000221 8.1 Refreshing Perforce jobs fails in Bugzilla integration # job000233 10 When you submit a new issue to TeamTrack it overwrites # the issue # job000240 7 Perforce server can crash if P4DTI is started with # existing jobs # job000311 9 If you have two TeamTrack projects with the same name, # the replicator stops working # job000325 * Can't read issues from TeamTrack 5.0 # job000338 * TeamTrack 5.0 doesn't update P4DTI_ fields # job000340 8.2 Consistency checker may miss some issues if you change # the start date # job000351 11 Bugzilla integration doesn't remove old configuration # items # job000354 * MySQL bug stops replicator from seeing some Bugzilla # changes # job000355 8 Bugzilla integration ignores start_date parameter # job000362 10.3 Can't update case from Perforce when using TeamTrack # 4.5 database upgraded to TeamTrack 5.0 # job000372 12.3 Check consistency stops with error if there are # unreplicated jobs # job000379 10 P4DTI corrupts date/time fields in TeamTrack if server # time zone is not UTC # job000381 10 P4DTI corrupts date/time fields in TeamTrack if # daylight savings time is in effect # job000385 13 Renumbered changelist causes P4DTI error and deletes fix import os import sys if os.environ.has_key('P4DTI_PATH'): p4dti_path = os.environ['P4DTI_PATH'] else: p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code', 'replicator') if p4dti_path not in sys.path: sys.path.append(p4dti_path) import cgi import copy import httplib import imp import logger import message import p4 import p4dti_unittest import re import socket import string import tempfile import time import types import unittest import urllib # 2. CONFIGURATION # # # 2.1. Limitations # # This script can only support one basic configuration on each host (see # section 2.2). Tests of other configurations have to be made by changing # configuration parameters in the script. # # This script can only test one defect tracker on each host (you have to change # dt_name in config_HOSTNAME.py to test another defect tracker). # # Could it use an environment variable to work out which configuration to use? # # # 2.2. Using a test configuration # # I want to be able to test different configurations, changes to # configurations, and to check whether erroneous configurations are # spotted correctly. # # I expect to find a working configuration either (1) in the file # specified by the environment variable P4DTI_CONFIG, if set, or (2) in # config_HOSTNAME.py, where HOSTNAME is the first component of the FQDN, # converted to lower case (e.g. 'swan' for 'swan.ravenbrook.com'). # # This configuration file is an ordinary P4DTI configuration file, # except that it must include two additional configuration parameters: # p4_license_file specifies the location of a Perforce license file # suitable for use on the machine running the tests; # p4_server_executable specifies the location of a suitable Perforce # server executable. # # The loaded configuration module is copied when a test is started and # the copy is installed in sys.modules['config']. This has two effects: # # 1. The P4DTI itself won't load the config module; see the # initialization code [RB 2000-12-08]. # # 2. The unit tests can make changes to the copy of the configuration # (e.g., to specify incorrect values, as in section 6) without # affecting the original, which can be restored for the next test. if os.environ.has_key('P4DTI_CONFIG'): config_file = os.environ['P4DTI_CONFIG'] else: hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_file = 'config_' + hostname + '.py' file = open(config_file) try: imp.load_source('config', config_file, file) finally: file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config def reset_configuration(): # Delete current config, just in case we added something. for k in sys.modules['config'].__dict__.keys(): if not re.match('^__.*__$', k): del sys.modules['config'].__dict__[k] # Restore old config. for (k,v) in original_configuration.items(): sys.modules['config'].__dict__[k] = v # 3. DEFECT TRACKER AND OPERATING INTERFACES # # Many of the test cases are generic: they don't depend on a particular defect # tracker or operating system. But they need an interface to the defect # tracker in order to restart it, and to the operating system in order to start # a new Perforce server. # # Each interface class should be called DT_OS_interface (where DT is # config.dt_name and OS is os.name) is and must define these methods: # # restart_defect_tracker() # restart_perforce() # # Note that there are no corresponding "stop" methods. The tests leave # Perforce and the defect tracker running so that failures can be investigated # without all the evidence having disappeared. # 3.1. Interface to TeamTrack on Windows NT class TeamTrack_nt_interface: # The TeamTrack server. server = None # Temporary directory for files created by this test. tempdir = None # 3.1.1. Clean up TeamTrack # # I would prefer to create a new TeamTrack database each time this test is # run (so that I can check that extra fields are added correctly to the # CASES table, for example). However, I don't know of a way to do that; # TeamTrack is controlled from the TeamTrack Administrator and I have no # idea how to automate it. # # So the next best thing to do is to clean out all P4DTI data from a # running TeamTrack server. This means setting all P4DTI_ fields in the # CASES table to the empty string, and deleting all records from the # VCACTIONS table. # # This isn't ideal: each new test ends up working with the set of cases # that resulted from the previous test. def restart_defect_tracker(self): # Connect to the TeamTrack server. This should take no more than 5 # seconds (regression test for job000190). import teamtrack self.server = teamtrack.connect(config.teamtrack_user, config.teamtrack_password, config.teamtrack_server) # Empty the P4DTI_* fields in the CASES table, if there are any. cases = self.server.query(self.server.case_table_id(), '') fields = ['P4DTI_JOBNAME', 'P4DTI_RID', 'P4DTI_SID'] for c in cases: changed = 0 for f in fields: if c.has_key(f) and c[f] != '': c[f] = '' changed = 1 if changed: c.update() # Delete all records in the VCACTIONS table. vcactions = self.server.query(teamtrack.table['VCACTIONS'],'') for v in vcactions: self.server.delete_record(teamtrack.table['VCACTIONS'], v['ID']) # 3.1.2. Kill old Perforce servers # # Find and terminate all running Perforce servers, if any. This code is # based on the script "killProcName.py" that comes with the Python Win32 # extensions. def kill_perforce(self): import win32pdhutil import win32api import win32con try: win32pdhutil.GetPerformanceAttributes('Process','ID Process',"p4d") except: pass pids = win32pdhutil.FindPerformanceAttributesByName("p4d") for pid in pids: h = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0, pid) win32api.TerminateProcess(h,0) win32api.CloseHandle(h) # 3.1.3. Start a new Perforce server # # Make a completely fresh Perforce server, with a new repository. def start_perforce(self): # Make a new repository directory. self.tempdir = tempfile.mktemp() os.mkdir(self.tempdir) # Copy the license. src = open(config.p4_license_file, 'r') dest = open(self.tempdir + '\\license', 'w') dest.writelines(src.readlines()) src.close() dest.close() # Work out Perforce's port number and start a Perforce server. match = re.match(".*:([0-9]+)$", config.p4_port) if match: port = int(match.group(1)) else: port = 1666 import win32api win32api.WinExec("%s -p 0.0.0.0:%d -r %s" % (config.p4_server_executable, port, self.tempdir)) # 3.1.4. Restart Perforce # # By killing the old server and starting a new one. def restart_perforce(self): self.kill_perforce() self.start_perforce() # 3.2. Interface to TeamTrack on Windows NT class Bugzilla_posix_interface: # The Bugzilla server. server = None # Temporary directory for files created by this test. tempdir = None def __init__(self): # The default temporary file prefix starts with an '@'. But # this means that temporary files will look like revision # specifications to Perforce. tempfile.gettempprefix = lambda: '%d.' % os.getpid() # 3.2.1. Restore Bugzilla # # Wipe out the existing Bugzilla database, if any, and restore a # known working one from a MySQL dump file. def restart_defect_tracker(self): db = config.dbms_database user = config.dbms_user version = config.__dict__.get('bugzilla_version', '2.10') os.system("mysqladmin -u %s --force drop %s > /dev/null" % (user, db)) os.system("mysqladmin -u %s create %s > /dev/null" % (user, db)) os.system("mysql -u %s %s < bugzilla-%s-mysqldump > /dev/null" % (user, db, version)) # 3.2.2. Kill running Perforce servers # # If there are any Perforce servers running on the magic port, # use p4 admin to kill them. def kill_running_perforce_servers(self): os.system("p4 -p %s -u %s admin stop > /dev/null" % (config.p4_port, config.p4_user)) # 3.2.3. Start a new Perforce server # # Make a completely fresh Perforce server, with a new repository. def start_perforce(self): # Make a new repository directory. self.tempdir = tempfile.mktemp() os.mkdir(self.tempdir) # Copy the license. src = open(config.p4_license_file, 'r') dest = open(os.path.join(self.tempdir, 'license'), 'w') dest.writelines(src.readlines()) src.close() dest.close() # Work out Perforce's port number and start a Perforce server. match = re.match(".*:([0-9]+)$", config.p4_port) if match: port = int(match.group(1)) else: port = 1666 os.system("%s -d -p 127.0.0.1:%d -r %s > /dev/null" % (config.p4_server_executable, port, self.tempdir)) # 3.2.4. Restart Perforce # # By killing the old server and starting a new one. def restart_perforce(self): self.kill_running_perforce_servers() self.start_perforce() # 4. P4DTI TEST CASE BASE CLASS # # The p4dti_base class is a generic P4DTI test case. It defines methods for # setting up the integration and some utilities for recording and interrogating # the output of the replicator. # # Other P4DTI test cases will inherit from p4dti_base. # # When a class implements several test cases, the methods that implement test # cases (in the PyUnit sense) should have names starting "test_". When a class # implements a single test case, the method should be called "runTest". class p4dti_base(p4dti_unittest.TestCase): # Defect tracker interface (an instance of one of the classes in section 3 # above). dti = eval(config.dt_name + '_' + os.name + '_interface')() # Messages written to the replicator's log. log_messages = [] # Mail messages sent by the replicator. Each message is a triple # (recipients, subject, body); see the definition of mail() method in # the replicator class. mail_messages = [] # The replicator. r = None # 4.1. Snoop on the logger # # I want to be able to test that the correct messages are appearing # in the log, so I override the log method in the # logger.multi_logger class so that it records the messages as well # as printing them to the log file. Printing to a log file is # necessary so that the test engineer can analyze what happened when # a test case fails. # # A disadvantage of this implementation is that the logging code is # not tested. Therefore there needs to be a separate set of logging # tests. def snoop_logger(self): self.log_file = open(config.log_file or 'p4dti.log', 'a') logger.multi_logger.log = self.log logger.file_logger.log = self.log logger.sys_logger.log = self.log def log(self, msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) self.log_file.write("%s %s\n" % (date, msg)) self.log_file.flush() self.log_messages.append(msg) # The clear_log method can be used to clear this record of the log # before carrying out a test. def clear_log(self): self.log_messages = [] # 4.2. Snoop on e-mail def snoop_mail(self): import replicator replicator.replicator.mail = self.mail def mail(self, to, subject, body): self.mail_messages.append((to, subject, body)) def clear_mail(self): self.mail_messages = [] def print_mail(self, stream): for (to, subject, body) in self.mail_messages: stream.write("To: %s\n" % str(to)) stream.write("Subject: %s\n\n" % str(subject)) for msg in body: stream.write("%s\n\n" % str(msg)) # 4.3. Check that the log is as expected # # These methods make various checks on the messages recorded in the log. def log_message_ids(self): return map(lambda m: m.id, self.log_messages) def expected_only(self, expected): for m in self.log_messages: assert m.id in expected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected_only_in_range(self, min, max, expected): for m in self.log_messages: if m.id >= min and m.id <= max: assert m.id in expected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected_not(self, unexpected): for m in self.log_messages: assert m.id not in unexpected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected(self, expected): found = {} for m in self.log_messages: if found.has_key(m.id): found[m.id] = found[m.id] + 1 else: found[m.id] = 1 for id in expected: assert found.has_key(id) and found[id] > 0, \ ("Expected message %d not found in log %s" % (id, self.log_message_ids())) found[id] = found[id] - 1 # 4.4. Set up everything so that test cases can run # # Get a fresh configuration, a new defect tracker and a new Perforce # server. def setup_everything(self): reset_configuration() self.dti.restart_defect_tracker() self.dti.restart_perforce() self.snoop_logger() self.snoop_mail() # Get a Perforce interface. self.p4 = p4.p4( port = config.p4_port, user = config.p4_user, password = config.p4_password, client_executable = config.p4_client_executable, logger = self ) # 4.5. Initialize the replicator # # Load the init module. If it's already loaded, reload it: we must reload # it because we want the init module to run again and to pick up on the new # configuration. def initialize_replicator(self): self.clear_mail() if sys.modules.has_key('init'): reload(sys.modules['init']) import init self.r = init.r # Regression test for job000199. assert self.mail_messages == [] # 4.6. Normal replicator startup # # This is a pseudo test case. It checks that the replicator can start up # normally. Other tests may depend on this having been run; for example, # the check_nothing test (5.1). This test must be run before any other # test cases, and therefore can't be part of a test suite (the tests of # which may be run singly or in any order). It should therefore be run # from the setUp() method of a subclass of this class (unless of course # that subclass has a different idea of what should happen at startup). def check_startup(self): self.initialize_replicator() # Expect to set up issues and replicate them, but no jobs or conflicts. self.clear_log() self.r.poll() self.expected([803, 804, 812]) self.expected_only_in_range(800, 1000, [803, 804, 812]) self.expected_not([806, 811]) # Expect to see no issues replicated, and no issues actually changed. self.clear_log() self.r.poll() self.expected_not([803, 804, 812, 824]) # 4.7. Exceptional replicator startup # # This method starts the replicator but expects to get the exception given # by 'error', with message id 'msgid'. def check_startup_error(self, error, msgid): try: self.initialize_replicator() except: (err, msg, _) = sys.exc_info() assert err == error, \ ("Expected error %s but got %s." % (error, err)) assert msg.id == msgid, \ ("Expected message %d but got %d (%s)" % (msgid, msg.id, str(msg))) else: self.fail("Expected error %s but didn't get it." % error) # 4.8. Check consistency # # This method checks that the databases are consistent. def check_consistency(self): self.clear_log() self.r.check_consistency() self.expected([871, 885]) self.expected_only_in_range(800, 1000, [871, 883, 884, 885, 890]) # 4.9. Check replication of a single issue def check_replication_dt_to_p4(self, first_time = 0): self.clear_log() self.r.poll() self.expected([804, 812]) if first_time: self.expected([803]) # set up for replication self.expected_only_in_range(800, 1000, [803, 804, 812]) # Nothing should come back from Perforce. self.clear_log() self.r.poll() self.expected_not([803, 804, 812, 824]) self.expected_only_in_range(800, 1000, [805, 825]) # 4.10. Initialize Perforce repository # # This method sets up Perforce clients and workspaces for a set of # users, and adds a file to the repository. It sets up the # following members: # # p4i: Map from user to a Perforce interface for that user. # workspace: Map from user to the client workspace for that user. # # This is only needed for tests involving Perforce fixes: that's why # it's not called in setup_everything(). def setup_perforce(self, users): # Create Perforce interfaces and workspaces for the dummy users. self.p4i = {} self.workspace = {} for user in users: self.workspace[user] = tempfile.mktemp() os.mkdir(self.workspace[user]) self.p4i[user] = p4.p4( port = config.p4_port, user = user, client = user + '_' + socket.gethostname(), client_executable = config.p4_client_executable, logger = self, ) client = self.p4i[user].run('client -o')[0] client['Root'] = self.workspace[user] self.p4i[user].run('client -i', client) # Add a file to the repository so that we have something with # which to make changes and fixes. user = 'rb' self.filename = os.path.join(self.workspace[user], 'test') open(self.filename, 'w') self.p4i[user].run('add -t ktext %s' % self.filename) change = self.p4i[user].run('change -o')[0] change['Description'] = 'Added test file' self.p4i[user].run('submit -i', change) # 4.11. Make changelist in Perforce # # This method makes a changelist in Perforce and edits a file in # that changelist. The changelist is returned without being # submitted. def edit_file(self, user): change = self.p4i[user].run('change -o')[0] change['Description'] = 'Edited test file' result = self.p4i[user].run('change -i', change)[0] assert result.has_key('data') match = re.match('^Change ([0-9]+) created', result['data']) assert match changelist = match.group(1) self.p4i[user].run('edit -c %s %s' % (changelist, self.filename)) open(self.filename, 'a').write("Foo\n") return self.p4i[user].run('change -o %s' % changelist)[0] # 4.12 Variant tests # # This class has variant methods for each defect tracker (e.g., TeamTrack, # Bugzilla) and calls the appropriate one. def run_variant(self): try: test = getattr(self, config.dt_name) except AttributeError: assert 0, "No test variant for " + config.dt_name + "." test() # 5. TEST CASES: NORMAL OPERATION # # If nothing has changed, then nothing happens when the replicator polls. # The databases are consistent. class normal(p4dti_base): def setUp(self): self.setup_everything() self.check_startup() def runTest(self): "Startup, replication to Perforce, consistency (test_p4dti.normal)" self.clear_log() self.r.poll() self.expected_only_in_range(800, 1000, []) self.check_consistency() # 6. TEST CASES: INCORRECT CONFIGURATIONS # # This is a regression test of job000037, job000075 and job000116. class bogus(p4dti_base): def setUp(self): self.setup_everything() # 6.1. An incorrect parameter generates an error # # This is a utility function for carrying out a range of tests. It resets # the configuration, sets the parameter named by 'param' to value, then # tries to start the replicator. It expects to get an exception, whose # message should have the message id 'msgid'. def check_param(self, param, value, msgid): reset_configuration() config.__dict__[param] = value try: self.initialize_replicator() except: (err, msg, _) = sys.exc_info() if isinstance(msg, message.message): if msg.id != msgid: self.addFailure("Set parameter %s to '%s': expected " "message %d but got %d (%s)" % (param, value, msgid, msg.id, str(msg))) else: self.addFailure("Set parameter %s to '%s': expected message " "%d but got '%s: %s' instead." % (param, value, msgid, err, msg)) else: self.addFailure("Set parameter %s to '%s': expected message %d " "but didn't get it." % (param, value, msgid)) # 6.2. Basic errors in parameters are caught quickly # # Basic errors in parameters (wrong type, wrong format) should be caught # quickly. # # This is a table of (parameter name, bogus value, message id of expected # error). bogus_parameters = [ # Regression for job000170: ('administrator_address', 'invalid e-mail address', 202), ('changelist_url', -1, 207), ('changelist_url', "http://invalid/%d/%s", 210), ('changelist_url', "http://invalid/no/format/specifier", 210), ('changelist_url', "http://invalid/%d/%%/%%%", 210), ('job_url', 42, 207), ('job_url', "http://invalid/%d/%s", 211), ('job_url', "http://invalid/no/format/specifier", 211), ('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211), ('log_file', -1, 208), ('log_level', 'not an int', 204), ('p4_client_executable', -1, 207), # Regression test for job000158: ('p4_client_executable', 'no such file', 705), ('p4_user', None, 207), ('p4_password', -1, 207), ('p4_port', None, 207), # Regression test for job000158, job000202: ('p4_port', '127.0.0.1:9999', 707), ('p4_server_description', -1, 207), ('poll_period', 'not an int', 204), ('replicator_address', 'invalid@e-mail@address', 202), ('rid', -1, 207), ('rid', '0abc', 209), ('rid', 'ab-c', 209), ('sid', -1, 207), ('sid', 'abcdefg+z', 209), ('smtp_server', -1, 207), ('start_date', '2001-02-03 24-00-00', 201), ('replicate_p', 'not a function', 203), ('closed_state', -1, 208), ('replicated_fields', 'not a list', 205), ('replicated_fields', ['not', 'a', 'list', 'of', 'strings', 0], 206), ] def test_parameters(self): import check_config for (param, value, msgid) in self.bogus_parameters: self.check_param(param, value, msgid) # 6.3. Basic errors in DT parameters are caught quickly # # As 6.2, but picks a set of tests based on config.dt_name. bogus_TeamTrack_parameters = [ ('teamtrack_server', None, 207), ('teamtrack_user', None, 207), ('teamtrack_password', -1, 207), ('closed_state', 'not a TeamTrack state', 401), ('replicated_fields', ['PRIORITY', 'not a TeamTrack state'], 403), ('replicated_fields', ['OWNER'], 404), ('replicated_fields', ['TITLE'], 404), ('replicated_fields', ['STATE'], 404), ('replicated_fields', ['PRIORITY', 'PRIORITY'], 405), ('replicated_fields', ['MULTISELECT'], 406), # Regression for job000003, job000140: ('replicated_fields', ['CODE'], 407), ] bogus_Bugzilla_parameters = [ ('bugzilla_directory', 'not a directory', 303), ('bugzilla_directory', '/', 304), ('closed_state', 'not a Bugzilla state', 301), ('dbms_database', -1, 207), ('dbms_host', -1, 207), # Regression for job000168: ('dbms_port', '1234', 204), ('dbms_user', -1, 207), ('dbms_password', -1, 207), # By using fake Perforce client executables we can check that # unsupported client and server versions are detected. ('p4_client_executable', './fake_p4.py', 704), ('p4_client_executable', './fake_p4d.py', 834), ('replicated_fields', ['bug_status'], 311), ('replicated_fields', ['assigned_to'], 311), ('replicated_fields', ['short_desc'], 311), ('replicated_fields', ['resolution'], 311), ('replicated_fields', ['longdesc', 'longdesc'], 312), ('replicated_fields', ['not a Bugzilla field'], 313), ] def test_dt_parameters(self): params = getattr(self, 'bogus_%s_parameters' % config.dt_name) for (param, value, msgid) in params: self.check_param(param, value, msgid) # 6.4. Parameter errors are caught by the defect tracker # # Like 6.2 and 6.3 this test sets a parameter to an incorrect value. In # this case the error is caught by the defect tracker, so a message object # isn't returned, but rather a string, which we must test directly rather # than by message id. # These three authentication failure messages are from the APIs for # builds 4509, 5034, and 50101 respectively. TeamTrack_auth_messages = [ "SERVER_ERROR: Authentication Failed. " "Invalid user id or password", "SOCKET_READ_ERROR: Socket error.\n" "One reason might be database needs up upgrading.\n" "Check event viewer for complete error message.\n", "SOCKET_READ_ERROR: Authentication Failed. " "Invalid user id, password, or licensing." ] erroneous_TeamTrack_parameters = [ ('dt_name', 'not a defect tracker', 'exceptions.ImportError', 'No module named configure_not a defect tracker'), ('teamtrack_server', 'host.invalid', 'TeamShare API error', 'SOCKET_CONNECT_FAILED: Socket Connect failed.'), ('teamtrack_user', 'invalid user', 'TeamShare API error', TeamTrack_auth_messages), ('teamtrack_password', 'invalid password', 'TeamShare API error', TeamTrack_auth_messages), ] erroneous_Bugzilla_parameters = [ ('dt_name', 'not a defect tracker', 'exceptions.ImportError', 'No module named configure_not a defect tracker'), ('dbms_host', 'host.invalid', '_mysql.OperationalError', '(2005, "Unknown MySQL Server Host \'host.invalid\' (2)")'), ('dbms_database', 'invalid', '_mysql.OperationalError', '(1049, "Unknown database \'invalid\'")'), # ('dbms_port', # 25, # '_mysql.OperationalError', # -1), ('dbms_password', 'not the Bugzilla password', '_mysql.OperationalError', '(1045, "Access denied for user:'), ('dbms_user', 'not the Bugzilla user', '_mysql.OperationalError', '(1045, "Access denied for user:'), ] def test_dt_errors(self): params = getattr(self, 'erroneous_%s_parameters' % config.dt_name) for (param, value, error, message_texts) in params: reset_configuration() config.__dict__[param] = value try: self.initialize_replicator() except: (err, msg, _) = sys.exc_info() if str(err) != error: self.addFailure("Set parameter %s to '%s': expected error " "'%s' but got error '%s'." % (param, value, error, str(err))) if isinstance(message_texts, types.ListType): texts = message_texts else: texts = [message_texts] found = 0 for text in texts: if str(msg)[0:len(text)] == text: found = 1 break if not found: self.addFailure("Set parameter %s to '%s': expected error " "message in %s but got '%s'." % (param, value, texts, msg)) else: self.addFailure("Set parameter %s to %s: expected error '%s' " "but there was no error." % (param, value, error)) def runTest(self): "Illegal configuration parameters (test_p4dti.bogus)" self.test_parameters() self.test_dt_parameters() self.test_dt_errors() # 7. TEST CASE: EXISTING JOB IN PERFORCE # # The replicator should refuse to start if there's a job in Perforce. # # This is a regression test for job000219 and job000240. class existing(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Startup with an existing job (test_p4dti.existing)" j = self.p4.run('job -o')[0] j['Description'] = 'Test job' self.p4.run('job -i', j) self.check_startup_error("P4DTI Initialization error", 1001) # 8. TEST CASE: MOVING THE START DATE # # # 8.1. Moving the start date backwards in time # # When start_date is set to the current time, no issues should be replicated # when the replicator starts. Similarly, refreshing Perforce has no effect. # But you can set start_date back in time and refresh Perforce, this time with # effect. # # This is a regression test for job000047, job000050, job000221. class start_1(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Moving the start_date backwards in time (test_p4dti.start_1)" config.start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) self.initialize_replicator() # When we poll, nothing should happen. self.clear_log() self.r.poll() self.expected([]) # Nor when we refresh. self.clear_log() self.r.refresh_perforce_jobs() self.expected([]) # The databases should report consistent. self.check_consistency() # Now set start date back in time and try refreshing again. config.start_date = "1971-01-01 00:00:00" self.initialize_replicator() self.clear_log() self.r.refresh_perforce_jobs() self.expected([803, 804, 812]) self.expected_only_in_range(800, 1000, [803, 804, 812]) self.expected_not([806, 811]) # The databases should still report consistent. self.check_consistency() # 8.2. Moving the start date forwards in time # # Start up with an old start date as normal, then move the start date # forwards in time. The databases should still report consistent, # because issues that were recorded as being replicated in the first # poll should still be recorded as replicated, even though they haven't # changed since the start date. # # This is a regression test for job000340. class start_2(p4dti_base): def setUp(self): self.setup_everything() self.check_startup() def runTest(self): "Moving the start_date forwards in time (test_p4dti.start_2)" # Set start date forward in time and check the consistency. config.start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) self.initialize_replicator() # When we poll, nothing should happen. self.clear_log() self.r.poll() self.expected([]) # The databases should report consistent. self.check_consistency() # 9. TEST CASE: REPLICATING BY PROJECT # # This is a regression test for job000107, job000112, job000311. class project(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Replicate by project (test_p4dti.project)" self.run_variant() def TeamTrack(self): config.replicated_fields = ['PROJECTID'] config.replicate_p = (lambda self: self['PROJECTID'] in [4,5]) self.initialize_replicator() self.clear_log() self.r.poll() self.expected_only_in_range(800, 1000, [803, 804, 812]) self.expected_not([806, 811]) # The database has two projects called 'P4DTI', so we should get a # warning. This is a regression test for job000311. self.expected([607]) jobs = self.p4.run('jobs') for j in jobs: assert j['Project'] in ['Elaborate editor', 'Clever compiler'], \ ("Job %s has project %s; shouldn't be replicated." % (j['Job'], j['Project'])) self.check_consistency() def Bugzilla(self): config.replicated_fields = ['product'] config.replicate_p = (lambda self: self['product'] == 'product 1') self.check_startup() jobs = self.p4.run('jobs') for j in jobs: assert j['Product'] == 'product 1\n', \ ("Job %s has product %s; shouldn't be replicated." % (j['Job'], j['Product'])) self.check_consistency() # 10. ISSUE LIFE CYCLE TEST CASES # # This test creates issues and checks that they are replicated # correctly. class lifecycle(p4dti_base): def setUp(self): self.setup_everything() self.setup_perforce(['rb']) self.check_startup() def runTest(self): "Issue life cycle (test_p4dti.lifecycle)" self.run_variant() def Bugzilla_submit(self): # Submit a new bug. fields = { 'reporter': 'nb@ravenbrook.com', 'product': 'product 1', 'version': 'unspecified', 'component': 'component 1.1', 'rep_platform': 'All', 'op_sys': 'All', 'priority': 'P1', 'severity': 'critical', 'assigned_to': '', 'cc': '', 'bug_file_loc': '', 'short_desc': 'Test bug', 'comment': 'Test long description.', 'submit': ' Commit ', 'form_name': 'enter_bug', 'Bugzilla_password': 'p4dtitest', 'Bugzilla_login': 'nb@ravenbrook.com', } h = httplib.HTTP('swan.ravenbrook.com') h.set_debuglevel(100) h.putrequest('POST', '/bugzilla/post_bug.cgi') content = string.join(map(lambda f: urllib.quote_plus(f[0]) + '=' + urllib.quote_plus(f[1]), fields.items()), '&') + '\r\n' h.putheader('Content-Length', str(len(content))) h.endheaders() h.send(content) h.getreply() bugid = None lines = h.getfile().readlines() for l in lines: match = re.search('

Bug ([0-9]+) posted

', l) if match: bugid = match.group(1) if bugid == None: self.fail("Tried to submit a bug to Bugzilla, but got the following in reply: %s." % string.join(lines)) # It gets replicated to Perforce. This is a regression test for # job000233. self.check_replication_dt_to_p4(first_time = 1) # Check the job in Perforce. bug = self.r.dt.issue(bugid) assert bug jobname = bug.corresponding_id() job = self.p4.run('job -o %s' % jobname)[0] return bug, job def TeamTrack_submit(self, s, user): import teamtrack # Submit a new case. case = s.new_record(s.case_table_id()) case['TITLE'] = 'Issue lifecycle test' case['DESCRIPTION'] = 'This is a test issue for the lifecycle test.' case['ISSUETYPE'] = 1 # Bug report case['PROJECTID'] = 4 # Editor case['SEVERITY'] = 45 # Regression test for job000182: case['ESTTIMETOFIX'] = 7199 # 1:59:59 case['ACTTIMETOFIX'] = 445556 # 123:45:56 # Regression test for job000379 and job000381 (note that date is # in the summer so DST is in effect): case['CLOSEDATE'] = 962532245 # 2000-07-02 03:04:05 issueid = case.submit(user) # It gets replicated to Perforce. This is a regression test for # job000233. self.check_replication_dt_to_p4(first_time = 1) # Check the job in Perforce. case2 = s.query(s.case_table_id(), "TS_ISSUETYPE=%d AND TS_ISSUEID='%05d'" % (case['ISSUETYPE'], issueid))[0] job = self.p4.run('job -o %s' % case2['P4DTI_JOBNAME'])[0] for (field, expected) in [('State', '_new'), ('Title', 'Issue lifecycle test'), ('Est._Time_to_Fix', '1:59:59'), ('Actual_Time_to_Fix', '123:45:56')]: assert job[field] == expected, \ ("Expected new job %s to have %s '%s', but found '%s'." % (job['Job'], field, expected, job[field])) return case2, job def TeamTrack_assign(self, s, case, job, user): import teamtrack # Find the assign transition. assign = s.query(teamtrack.table['TRANSITIONS'], "TS_NAME = 'Assign'")[0]['ID'] # Assign the issue in TeamTrack. case.transition(user, assign) # It gets replicated to Perforce. self.check_replication_dt_to_p4() # The state should now be assigned. job = self.p4.run('job -o %s' % job['Job'])[0] assert job['State'] == 'assigned', \ ("Expected assigned job %(Job)s to have state " "'assigned' but it has state %(State)s." % job) return case, job def TeamTrack_close(self, job, expected): # The closed job gets replicated back to TeamTrack. TeamTrack # will change the owner field and the replicator will replicate # that back. This is a regression test for job000053. self.clear_log() self.r.poll() self.expected(expected) self.expected_only_in_range(800, 1000, expected) job2 = self.p4.run('job -o %s' % job['Job'])[0] assert job2['Owner'] != job['Owner'], \ ("Closed job %(Job)s but owner is still %(Owner)s." % job2) def Bugzilla(self): bug, job = self.Bugzilla_submit() self.fail('unimplemented') def TeamTrack(self): # We need to be logged in to TeamTrack as a user other than the # replicator in order to make a change that isn't ignored. import teamtrack user = 'rb' s = teamtrack.connect(user,'', config.teamtrack_server) # 10.3.1. Simple issue lifecycle # # This is a simple issue cycle: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by editing the job). # 5. The closure gets replicated back to the defect tracker. case, job = self.TeamTrack_submit(s, user) case, job = self.TeamTrack_assign(s, case, job, user) # Close the job in Perforce. job['State'] = 'closed' self.p4.run('job -i', job) self.TeamTrack_close(job, [805, 824, 826]) self.r.poll() # Regression test for job000182. case = s.read_record(s.case_table_id(), case['ID']) assert case['ESTTIMETOFIX'] == 7199 assert case['ACTTIMETOFIX'] == 445556 # 10.3.2. Issue lifecycle (fix in Perforce). # # This is an issue cycle in which the issue is associated with a # changelist in Perforce: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by making a fix). # 5. Closure and fix get replicated back to the defect tracker. case, job = self.TeamTrack_submit(s, user) case, job = self.TeamTrack_assign(s, case, job, user) self.p4.run('fix -c 1 %s' % job['Job']) self.TeamTrack_close(job, [802, 805, 819, 820, 824, 826]) self.r.poll() # 10.3.3. Issue lifecycle (fix on submission in Perforce). # # This tests an issue lifecycle in which the issue is associated # with a pending changelist and closed on submission. # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. The job description is edited in Perforce (this is a # regression test for job000362). # 5. A fix is made with a pending changelist. # 6. The change is submitted. # 7. Job, fix get replicated back to the defect tracker. case, job = self.TeamTrack_submit(s, user) case, job = self.TeamTrack_assign(s, case, job, user) self.r.poll() job['Description'] = job['Description'] + '\nEdited.' self.p4i[user].run('job -i', job) self.clear_log() self.r.poll() self.expected([805, 824]) self.expected_only_in_range(800, 1000, [805, 824]) change = self.edit_file(user) self.p4i[user].run('fix -c %s %s' % (change['Change'], job['Job'])) self.p4i[user].run('submit -c %s' % change['Change']) self.TeamTrack_close(job, [802, 805, 819, 820, 824, 826]) self.r.poll() # We're done; one last check for luck. self.check_consistency() # 11. P4DTI CONFIGURATION DATABASE # # This checks that parameters get added and removed from the # configuration database. This is a regression test for job000169 and # job000351. class configdb(p4dti_base): def setUp(self): self.setup_everything() def TeamTrack_config_item(self, item): import teamtrack query = ("TS_TYPE = 4 AND TS_CHAR1 = '%s' AND TS_CHAR2 = " "'%s'" % (config.rid, item)) rr = self.dti.server.query(teamtrack.table['VCACTIONS'], query) for r in rr: dict = eval(r['FILENAME']) if (dict.has_key('sid') and dict['sid'] == config.sid and dict.has_key('description')): return dict['description'] return None TeamTrack_config_map = { 'p4_server_description': 'SERVER', 'job_url': 'JOB_URL', 'changelist_url': 'CHANGELIST_URL' } def TeamTrack_config(self): cf = {} for k, v in self.TeamTrack_config_map.items(): cf[k] = self.TeamTrack_config_item(v) return cf def Bugzilla_config(self): return self.r.dt.bugzilla.get_config() def check(self, cf1): reset_configuration() for k, v in cf1.items(): config.__dict__[k] = v self.initialize_replicator() cf2 = getattr(self, config.dt_name + '_config')() for k, v in cf1.items(): if v != cf2.get(k): self.addFailure("Set parameter %s to '%s', but found " "'%s' in the config database." % (k, v, cf2.get(k))) def runTest(self): "Replicator configuration database (test_p4dti.configdb)" cf1 = { 'p4_server_description': 'spong', 'changelist_url': 'http://spong/changelist?%d', 'job_url': 'http://spong/job?%s' } cf2 = { 'p4_server_description': 'spong', 'changelist_url': None, 'job_url': None } self.check(cf1) self.check(cf2) self.check(cf1) # 12. INCONSISTENCIES # # This test case checks that various inconsistencies are detected # correctly. class inconsistencies(p4dti_base): def setUp(self): self.setup_everything() self.setup_perforce(['rb']) self.check_startup() def runTest(self): "Consistency check failures (test_p4dti.inconsistencies)" user = 'rb' # 12.1. Unreplicated fix in Perforce change = self.edit_file(user) job = self.p4.run('jobs')[0] self.p4i[user].run('fix -s %s -c %s %s' % (job[config.job_status_field], change['Change'], job['Job'])) self.clear_log() self.r.check_consistency() self.expected([871, 878, 886]) self.expected_only_in_range(800, 1000, [871, 878, 883, 884, 886, 890]) self.r.poll() # 12.2. Changed job in Perforce job['Description'] = job['Description'] + '...\n' self.p4i[user].run('job -i', job) self.clear_log() self.r.check_consistency() self.expected([871, 875, 886]) self.expected_only_in_range(800, 1000, [871, 875, 883, 884, 886, 890]) self.r.poll() # 12.3. Unreplicated job in Perforce # # This is a regression test for job000372. j = self.p4i[user].run('job -o')[0] for k, v in j.items(): if string.find(v, '') == 0: j[k] = 'Test!' j['P4DTI-rid'] = config.rid j['P4DTI-issue-id'] = "999999" self.p4i[user].run('job -i', j) self.clear_log() self.r.check_consistency() self.expected([871, 882, 886]) self.expected_only_in_range(800, 1000, [871, 882, 883, 884, 886, 890]) # 13. RACE DURING REPLICATION OF FIXES # # This is a regression test for job000385. class race_385(p4dti_base): user = 'rb' change = None original_fixes_differences = None race_flag = None def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def race(self, dt_fixes, p4_fixes): fix_diffs = self.original_fixes_differences(dt_fixes, p4_fixes) # Submit the change (this is the race). But only once. if self.race_flag == 0: self.p4i[self.user].run('submit -c %s' % self.change['Change']) self.race_flag = 1 return fix_diffs def runTest(self): "Race during replication of fixes (test_p4dti.race_385)" # Create a pending change in Perforce and make a fix to that # change. self.change = self.edit_file(self.user) job = self.p4.run('jobs')[0] self.p4i[self.user].run('fix -s %s -c %s %s' % (job[config.job_status_field], self.change['Change'], job['Job'])) # Create a second pending change in Perforce (so that the first # change will get renumbered when submitted). change_2 = self.edit_file(self.user) # We'll make sure that the change gets submitted after # fixes_differences gets called -- this is our last opportunity # to run the race before the possibly illegal call to p4 change # -o. self.original_fixes_differences = self.r.fixes_differences self.r.fixes_differences = self.race self.race_flag = 0 # Replicate. self.clear_log() self.r.poll() # Restore the original fixes_differences. self.r.fixes_differences = self.original_fixes_differences mm = open('p4dti.mail', 'a') self.print_mail(mm) mm.close() # Check that the replication succeeded. expected = [802, 805, 819, 820] self.expected(expected) self.expected_only_in_range(800, 1000, expected + [825]) self.check_consistency() # RUNNING THE TESTS def tests(): suite = unittest.TestSuite() for t in [bogus, configdb, existing, inconsistencies, normal, project, race_385, start_1, start_2]: suite.addTest(t()) if os.name == 'nt': suite.addTest(lifecycle()) return suite if __name__ == "__main__": unittest.main(defaultTest="tests") # A. REFERENCES # # [GDR 2000-12-31] "Automated testing plan" (e-mail message); Gareth # Rees; Ravenbrook Limited; # ; 2000-12-31. # # [GDR 2001-03-14] "Automatic test of TeamTrack integration" (e-mail # message); Gareth Rees; Ravenbrook Limited; # ; 2001-03-14. # # [PyUnit] "PyUnit - a unit testing framework for Python"; Steve Purcell; # . # # [RB 2000-12-08] "init.py -- Initialize replicator and defect tracker"; # Richard Brooksby; Ravenbrook Limited; 2000-12-08. # # # B. DOCUMENT HISTORY # # 2001-03-14 GDR Created. # # 2001-03-15 GDR Added list of regression tests, discussion of limitations, # snoping on e-mail, p4dti_variant class, test cases for existing jobs, recent # start date and replicating by project. # # 2001-03-17 GDR Added test_dt_errors test case for errors caught by the defect # tracker or its database. # # 2001-05-01 GDR Use p4dti_unittest module so that we can report many errors in # the course of a single test case. Added regression test for job000311. # # 2001-05-17 GDR Don't look for messages 844-847 (replicator doesn't # output them any more). Fixed bug in issue lifecycle test. # # 2001-05-18 GDR Extended the lifecycle test so that it tests # replication of fixes. # # 2001-06-07 GDR Added references to design. # # 2001-07-02 GDR Made bogus_config and lifecycle test cases portable # between TeamTrack 4.5 and TeamTrack 5.0. # # 2001-07-03 GDR Moved TeamTrack connection time test to # test_teamtrack.py where it belongs. # # 2001-07-09 NB Added changelist_url test. # # 2001-07-17 GDR Added start_2 (regression test for job000340) and # configdb (regression test for job000169, job000351). # # 2001-07-25 GDR Added consistency checker tests (regression test for # job000372). # # 2001-09-24 GDR Use environment variable P4DTI_PATH (if set) to find # the P4DTI sources, so that testers can test installed copies of the # P4DTI as well as copies synced from the repository. Use environment # variable P4DTI_CONFIG (if set) to find the configuration. # # 2001-09-25 GDR Use Bugzilla database and user from the configuration; # see job000394. Restore the mysqldump for the Bugzilla version given # by the bugzilla_version configuration parameter (if specified). The # tests start_1 and start_2 were incorrectly using gmtime when making # timestamps for the Bugzilla database -- this could lead to problems # when testing in timezones not equal to UTC with recently-created # defects. Use localtime instead. # # 2001-10-03 GDR Added test case for job000385. # # # C. COPYRIGHT AND LICENCE # # This file is copyright (c) 2001 Perforce Software, Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # # $Id: //info.ravenbrook.com/project/p4dti/version/1.2/test/test_p4dti.py#2 $