# Perforce Defect Tracking Integration Project # # # TEST_P4DTI.PY -- TEST THE P4DTI # # Gareth Rees, Ravenbrook Limited, 2001-03-14 # # # 1. INTRODUCTION # # This test script tests the P4DTI. The initial plan is described in # [GDR 2000-12-31] and some of the design is given in [GDR 2001-03-14]. # # This script contains a lot of tricky code (for example, using the # hostname or operating system name to pick a module to import or a # class to use; or overriding a method in another module in order to # snoop on its behaviour). Please take care when editing it. # # It uses the PyUnit unit test framework [PyUnit]. # # The intended readership is project developers. # # This document is not confidential. # # The test cases depend on features of the test databases. It is # important that the cases be kept in sync with the test database and # with the design of those databases [GDR 2001-03-14]. When you add a # test, be sure to change the design and refer back to the test. # # # 1.1. Useful info # # The script creates a series of test Perforce servers (one for each # test case). When a test case starts, it kills all the running # Perforce servers on the required port. Each of these has its own # directory, in the appropriate place for temporary files for your # system (typically /tmp on Unix, C:\Temp on Windows). The P4DTI log is # diverted from its real place and sent to the file p4dti.log. # # # 1.2. Regression tests in this script # # The section * means that the defect is tested throughout as a simple # consequence of running the script; there is no particular test for it. # # Job Section Title # ---------------------------------------------------------------------- # job000003 6.3 It is not possible from the result of "p4 -G job -o # jobname" to tell whether a job has been returned, or # an error message # job000004 * "p4 -G jobspec -o" doesn't work # job000007 10.4 The fixes keyword can't be set on submit (except to # "closed") # job000013 10.3 Deleting fixes not replicated # job000016 22 Double replication causes many conflicts # job000022 17 No migration from Perforce jobs to defect tracker # job000036 15 New jobs in Perforce don't get replicated to the # defect tracker # job000037 6 Consistency checker script is inadequate # job000042 22 Rapid changes in the DT cause conflicts # job000046 16 The replicator process is hard to manage # job000047 8.1 Historical bugs are replicated but uninteresting # job000048 6 Consistency checking of the configuration is # inadequate # job000050 8.1 There's no way to re-start replication # job000051 * There's no Bugzilla integration # job000053 10 Implicit changes made by the DT don't get replicated # back # job000057 * Special characters in single-select keywords make the # replicator barf # job000075 6 No automatic check of configuration # job000086 21 Users can "fix" issues that they don't have permission # to change # job000092 * Long descriptions aren't replicated by Bugzilla # integration # job000107 9 You can't replicate an issue's project # job000111 7 Replicator destroys the jobspec that exists before # it's installed # job000112 9 Can't easily replicate by project # job000116 6 Bugzilla integration doesn't do enough checking # job000118 10.1 Resolving a Bugzilla bug without setting the # resolution always causes a conflict # job000133 10.2 You can't close a job by fixing it # job000149 16 We don't use system logging facilities on Windows # job000158 6.2 Obscure error if Perforce can't be reached or # p4_client_executable is wrong # job000168 6.2 Too easy to set dbms_port to a string # job000170 6.2 Replicator may be unable to send e-mail if the default # replicator_address is unchanged # job000173 6.3 Wrong Perforce server version causes installation to # fail mysteriously # job000181 * Assertion failure in translate_1_to_0 # job000182 10 Elapsed time fields aren't replicated properly # job000184 * Bugzilla integration doesn't work if your database is # not called 'bugs' # job000199 4.5 Auxiliary scripts send e-mail # job000202 6.2 Errors from Perforce not reported well # job000219 7 Existing jobs in Perforce may become unusable when the # jobspec is changed # job000221 8.1 Refreshing Perforce jobs fails in Bugzilla integration # job000222 10.3 Deleting fix causes Bugzilla integration to crash # job000225 10.3 If you "p4 fix" when there's no closed state, the # replicator can't replicate # job000240 7 Perforce server can crash if P4DTI is started with # existing jobs # job000249 17 Refresh script fails if you change the closed_state # job000262 14 Bugzilla: Can't use Perforce to confirm an unconfirmed # bug # job000271 17 The migrate script loses fixes # job000289 * Users with spaces produce confusing errors # job000340 8.2 Consistency checker may miss some issues if you change # the start date # job000351 11 Bugzilla integration doesn't remove old configuration # items # job000352 20 Bugzilla emailsuffix parameter not supported # job000354 * MySQL bug stops replicator from seeing some Bugzilla # changes # job000355 8 Bugzilla integration ignores start_date parameter # job000372 12.3 Check consistency stops with error if there are # unreplicated jobs # job000385 13 Renumbered changelist causes P4DTI error and deletes # fix # job000390 * P4DTI doesn't support Bugzilla 2.14 # job000410 14 Can't confirm an unconfirmed Bugzilla bug from # Perforce # job000422 17 Refreshing fails after migration if new workflow # doesn't match old workflow # job000427 12.8 Can't delete associated filespec in Bugzilla # integration # job000442 10 Can't replicate 'line' fields with hashes in them to # Perforce # job000445 19 Bugzilla replicator doesn't like spaces in enum # fields import os import sys # Add the nearby "code" directory to the module loading path # to get the P4DTI modules. If P4DTI_PATH is set then add # that instead, so that other versions of P4DTI can be tested. if os.environ.has_key('P4DTI_PATH'): p4dti_path = os.environ['P4DTI_PATH'] else: p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code', 'replicator') if p4dti_path not in sys.path: sys.path.append(p4dti_path) import copy import sgmllib import imp import logger import message import p4 import p4dti_unittest import popen2 import re import socket import string import time import types import unittest import urllib import random import shutil import tempfile # http://www.python.org/doc/2.2p1/lib/module-tempfile.html # The default temporary file prefix starts with an '@'. But # that would mean that temporary files will look like revision # specifications to Perforce. So use a prefix that's acceptable # to Perforce. # [Why does that matter? RB 2002-10-29] tempfile.gettempprefix = lambda: '%d.' % os.getpid() # 2. CONFIGURATION # # # 2.1. Limitations # # This script can only support one basic configuration on each host (see # section 2.2). Tests of other configurations have to be made by # changing configuration parameters in the script. # section 2.2). Tests of other configurations have to be made by # changing configuration parameters in the script. # # This script can only test one defect tracker on each host (you have # to change dt_name in config_HOSTNAME.py to test another defect # tracker). # # This script can only have one current invocation on each host. # # Could it use an environment variable to work out which configuration # to use? # # # 2.2. Using a test configuration # # I want to be able to test different configurations, changes to # configurations, and to check whether erroneous configurations are # spotted correctly. # # I expect to find a working configuration either (1) in the file # specified by the environment variable P4DTI_CONFIG, if set, or (2) in # config_HOSTNAME.py, where HOSTNAME is the first component of the FQDN, # converted to lower case (e.g. 'swan' for 'swan.ravenbrook.com'). # # This configuration file is an ordinary P4DTI configuration file, # except that it must include two additional configuration parameters: # p4_license_file specifies the location of a Perforce license file # suitable for use on the machine running the tests; # p4_server_executable specifies the location of a suitable Perforce # server executable. # # The loaded configuration module is copied when a test is started and # the copy is installed in sys.modules['config']. This has two effects: # # 1. The P4DTI itself won't load the config module; see the # initialization code [RB 2000-12-08]. # # 2. The unit tests can make changes to the copy of the configuration # (e.g., to specify incorrect values, as in section 6) without # affecting the original, which can be restored for the next test. if os.environ.has_key('P4DTI_CONFIG'): config_filename = os.environ['P4DTI_CONFIG'] else: hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_filename = 'config_' + hostname + '.py' if not os.path.exists(config_filename): print "Could not find config file", config_filename print "Either create one, or set P4DTI_CONFIG to the name of one." config_file = open(config_filename) try: imp.load_source('config', config_filename, config_file) finally: config_file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config # 2.3. Logging # # We need to log to a log file. The P4DTI log will get redirected to # this file, as will the output of various commands. log_filename = (config.log_file or time.strftime('p4dti.log.%Y%m%dT%H%M%S', time.gmtime(time.time()))) log_filename = os.path.abspath(log_filename) log_file = open(log_filename, "a") def log_message(msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) log_file.write("%s %s\n" % (date, msg)) log_file.flush() sys.stdout.write("P4DTI test suite, logging to %s.\n" % log_filename) sys.stdout.flush() email_suffix = "@ravenbrook.com" def reset_configuration(): # Delete current config, just in case we added something. for k in sys.modules['config'].__dict__.keys(): if not re.match('^__.*__$', k): del sys.modules['config'].__dict__[k] # Restore old config. for k, v in original_configuration.items(): sys.modules['config'].__dict__[k] = v class p4dti_html_parser(sgmllib.SGMLParser): def attrs(self, attrs_list): attrs = { } for a, v in attrs_list: attrs[a] = v return attrs # 3. DEFECT TRACKER AND OPERATING SYSTEM INTERFACES # # Many of the test cases are generic: they don't depend on a particular # defect tracker or operating system. But they need an interface to the # defect tracker in order to restart it, and to the operating system in # order to start a new Perforce server. # # Each interface class should be called DT_OS_interface (where DT is # config.dt_name and OS is os.name) is and must define these methods: # # restart_defect_tracker() # restart_perforce() # # Note that there are no corresponding "stop" methods. The tests leave # Perforce and the defect tracker running so that failures can be # investigated without all the evidence having disappeared. # 3.1. Perforce mixin # # This class supplies the restart_perforce method for use by # defect tracker interfaces. It is suitable for use on both Posix # and Windows. It also provides "system", which is like os.system, # but captures output, checks for errors, and writes to the log. class Perforce_mixin: # Temporary directory for Perforce server and associated files. p4dir = None # 3.1.1. Stop Perforce server # # If there are any Perforce servers running on the magic port, # use p4 admin to stop them. def stop_perforce(self): # Stop with and without unicode flags, as we don't know # whether or not the previous Perforce server was set to # unicode mode. NB 2005-10-18. self.system('p4 -p "%s" -u "%s" -P "%s" -C utf8 admin stop' % (config.p4_port, config.p4_user, config.p4_password), ignore_failure = 1) self.system('p4 -p "%s" -u "%s" -P "%s" admin stop' % (config.p4_port, config.p4_user, config.p4_password), ignore_failure = 1) # 3.1.2. Start a new Perforce server # # Make a completely fresh Perforce server, with a new repository. def start_perforce(self, unicode=False): # Make a new repository directory. self.p4dir = tempfile.mktemp() os.mkdir(self.p4dir) log_message("Perforce repository directory %s." % self.p4dir) # Copy the license if config.p4_license_file: shutil.copyfile(config.p4_license_file, os.path.join(self.p4dir, 'license')) # Work out Perforce's port number and start a Perforce server. match = re.match(".*:([0-9]+)$", config.p4_port) if match: port = int(match.group(1)) else: port = 1666 if os.name == 'nt': # p4d on Windows doesn't detach, to we can't use "system" # to start it. RB 2002-10-28 import win32api if unicode: win32api.WinExec("%s -r %s -xi" % (config.p4_server_executable, self.p4dir)) win32api.WinExec("%s -p 0.0.0.0:%d -r %s" % (config.p4_server_executable, port, self.p4dir)) time.sleep(2) else: # For some reason, p4d doesn't detach properly when called # by os.popen4, so we have to use os.system here instead # of self.system. RB 2002-10-29 if unicode: os.system('%s -r "%s" -xi >> %s 2>&1' % (config.p4_server_executable, self.p4dir, log_filename)) os.system('%s -d -p 0.0.0.0:%d -r "%s" >> %s 2>&1' % (config.p4_server_executable, port, self.p4dir, log_filename)) # 3.1.3. Restart Perforce # # By killing the old server and starting a new one. def restart_perforce(self, unicode=False): self.stop_perforce() self.start_perforce(unicode) # 3.1.4. Run command and check results # # Calls an external program, raising an exception upon any error # and returning standard output and standard error from the program, # as well as writing them to the log. def system(self, command, ignore_failure = 0, input_lines = None): log_file.write('Executing %s\n' % repr(command)) (child_stdin, child_stdout) = os.popen4(command) if input_lines: child_stdin.writelines(input_lines) child_stdin.close() output = child_stdout.read() result = child_stdout.close() log_file.write(output) log_file.flush() if not ignore_failure and result: message = ('Command "%s" failed with result code %d.' % (command, result)) log_file.write(message + '\n') log_file.flush() raise message return output # 3.2. Interface to TeamTrack on Windows NT: removed 2003-05-23. # 3.3. Bugzilla interface mixin # # This is the interface to Bugzilla, suitable for use on both # Posix and Windows. # # This class installs and initializes a complete new Bugzilla # every time. # # The command lines used are carefully # designed to work with both /bin/sh and CMD.EXE, so be careful # when you edit it. As far as possible, Python's shutil tools # are used for cross-platform work. class Bugzilla_mixin: # The Bugzilla server. server = None # 3.3.1. Install and configure Bugzilla # # install_bugzilla() creates a new Bugzilla installation by copying # the Bugzilla source code from its location relative to the test # suite into a temporary directory and then carrying out the initial # configuration steps that the Bugzilla administrator would carry # out; see [Barnson 2001-08-29, 3.2.14-15]. def install_bugzilla(self): # Make a temporary directory. config.bugzilla_directory = tempfile.mktemp() log_message("Bugzilla directory %s." % config.bugzilla_directory) # Create Bugzilla database. self.empty_bugzilla_database() if config.bugzilla_mysqldump: self.system('mysql --user="%s" --password="%s" "%s" < "%s"' % (config.dbms_user, config.dbms_password, config.dbms_database, config.bugzilla_mysqldump)) log_message("Bugzilla database %s created from %s." % (config.dbms_database, config.bugzilla_mysqldump)) # Copy Bugzilla sources. bz_path = os.path.abspath(os.path.join( os.getcwd(), os.pardir, 'code', 'bugzilla-%s' % config.bugzilla_version)) shutil.copytree(bz_path, config.bugzilla_directory) log_message("Copied Bugzilla from %s to %s." % (bz_path, config.bugzilla_directory)) cwd = os.getcwd() try: os.chdir(config.bugzilla_directory) if os.name == 'nt': # Patch Bugzilla to make it suitable for running on # windows, using a pre-cooked patch derived from a # working Bugzilla on Windows installation. patch_path = os.path.abspath(os.path.join(cwd, 'bugzilla-%s-win32-patch' % config.bugzilla_version)) if os.path.exists(patch_path): self.system("patch -p1 < %s" % patch_path) log_message('patched Bugzilla for Windows.') if os.access("processmail", os.F_OK): # Copy processmail to processmail.pl, as this is expected # by the patched Bugzilla. os.rename("processmail", "processmail.pl") log_message('patched Bugzilla for Windows.') else: # Change any #!/usr/bonsaitools/bin/perl shebang lines in Bugzilla # to point instead to the path to perl. perl_path = string.strip(self.system("which perl")) self.system("perl -pi -e 's@#!/usr/bonsaitools/bin/perl@#!%s@' * 2>/dev/null" % perl_path) log_message('patched Bugzilla shebang lines to point to %s.' % perl_path) # Run checksetup.pl for the first time (no input required). self.system('perl checksetup.pl') log_message('ran checksetup.pl for the first time.') # Edit the newly-created localconfig so that db_name is # correct. Also, on Windows, edit the webservergroup # for some mysterious reason not explained in the Bugzilla # documentation. f = open('localconfig', 'a') f.write('$db_name = "%s";\n' % config.dbms_database) f.write('$db_user = "%s";\n' % config.dbms_user) f.write('$db_pass = "%s";\n' % config.dbms_password) if os.name == 'nt': webservergroup = '8' else: # Set webservergroup to some real group on this system # (it doesn't actually have to be the right group for # the web server, but that would be best). We do this # by looking down a list of likely groups until we see # a real group. webservergroups = ['apache', 'www', 'www-data', 'wheel'] import grp for webservergroup in webservergroups: try: grp.getgrnam(webservergroup) break except: pass f.write('$webservergroup = "%s";\n' % webservergroup) f.close() log_message('patched localconfig.') # Run checksetup.pl for the second time. This time, if # the database is empty, Bugzilla prompts us to enter some # configuration parameters. We supply values from the # test configuration, as follows. answers = {'SMTP_SERVER': config.smtp_server, 'ADMIN_EMAIL': config.bugzilla_admin_user, 'ADMIN_PASSWORD': config.bugzilla_admin_password, 'ADMIN_REALNAME': 'Bugzilla administrator', 'ADMIN_OK': 'Y' } f = open('checksetup-answers', 'w') for (key, value) in answers.items(): f.write("$answer{'%s'} = '%s';\n" % (key, value)) f.close() # The checksetup.pl script attempts to disable and enable # input echoing (when receiving a password) by calling stty. # These stty calls fail harmlessly here because # checksetup.pl's input is not a terminal. self.system('perl checksetup.pl checksetup-answers') finally: os.chdir(cwd) # 3.3.2. Empty the Bugzilla database # # Drops (deletes) and recreates the Bugzilla test database on # the MySQL server. def empty_bugzilla_database(self): db = config.dbms_database user = config.dbms_user pwd = config.dbms_password self.system('mysqladmin --user="%s" --password="%s" --force drop "%s"' % (user, pwd, db), ignore_failure = 1) self.system('mysqladmin --user="%s" --password="%s" create "%s"' % (user, pwd, db)) # 3.3.3. Restart Bugzilla # # Install Bugzilla. If the configuration parameter # bugzilla_mysqldump is not None, then drop the existing Bugzilla # database and replace it with one from the MySQL dump file. def restart_defect_tracker(self): self.install_bugzilla() # 3.3.4. Run a Bugzilla CGI script # # run_script(script, params) runs a Bugzilla CGI script as if # invoked by a web server as a result of an HTTP request. It # returns a string containing the output of the script. The script # argument is the name of the script (relative to # config.bugzilla_directory) and the params argument is a dictionary # mapping form parameter name to value. def run_script(self, script, params): data = urllib.urlencode(params) env_additions = { 'REQUEST_METHOD': 'POST', 'REMOTE_ADDR': '127.0.0.1', 'CONTENT_TYPE': 'application/x-www-form-urlencoded', 'CONTENT_LENGTH': str(len(data)), 'HTTP_USER_AGENT': 'Wombat', } cwd = os.getcwd() try: for k, v in env_additions.items(): os.environ[k] = v os.chdir(config.bugzilla_directory) if config.bugzilla_version >= '2.16': command_options = '-T' else: command_options = '' command = 'perl %s "%s"' % (command_options, script) log_file.write('Executing Bugzilla script %s\n' % repr(command)) child_out, child_in = popen2.popen2(command) child_in.write(data) log_file.write(' ... with input %s ...' % data) child_in.close() result = child_out.read() log_file.write(' ... and output %s\n' % result) return result finally: for e in env_additions.keys(): del os.environ[e] os.chdir(cwd) # 3.3.5. Set Bugzilla server parameters # # Sets specific Bugzilla server parameters. # # On Bugzilla versions prior to 2.22, it works by running the # "editparams.cgi" CGI script, parsing the output to get # a current set of parameters, and then constructing a request to # the "doeditparams.cgi" script. # # On Bugzilla versions from 2.22 onwards, that no longer works # because the parameters are divided into a number of separate # configuration panels. class editparams_parser(p4dti_html_parser): def __init__(self): self.params = { } self.panels = [] self.collecting_params = 0 self.textarea_name = None self.textarea_contents = None self.select_name = None self.select_value = None sgmllib.SGMLParser.__init__(self) def attrs(self, attrs_list): attrs = { } for a, v in attrs_list: attrs[a] = v return attrs def start_form(self, attrs_list): action = self.attrs(attrs_list).get('action') if (action == 'doeditparams.cgi' or (config.bugzilla_version >= '2.22' and action == 'editparams.cgi')): self.collecting_params = 1 def end_form(self): self.collecting_params = 0 def start_input(self, attrs_list): if not self.collecting_params: return attrs = self.attrs(attrs_list) if attrs.get('type') == 'radio' and attrs.get('checked'): self.params[attrs['name']] = attrs['value'] elif attrs.get('type', 'text') == 'text': value = re.sub('&', '&', attrs['value']) self.params[attrs['name']] = value def start_textarea(self, attrs_list): if not self.collecting_params: return attrs = self.attrs(attrs_list) self.textarea_name = attrs['name'] self.textarea_contents = [] def end_textarea(self): if not self.collecting_params: return value = string.join(self.textarea_contents, '') self.params[self.textarea_name] = value self.textarea_name = None self.textarea_contents = None def start_option(self, attrs_list): if not self.collecting_params: return attrs = self.attrs(attrs_list) if attrs.has_key('selected') and attrs.has_key('value'): self.select_value = attrs['value'] def start_select(self, attrs_list): if not self.collecting_params: return attrs = self.attrs(attrs_list) self.select_name = attrs['name'] def end_select(self): if not self.collecting_params: return self.params[self.select_name] = self.select_value self.select_name = None self.select_value = None def start_a(self, attrs_list): if config.bugzilla_version < '2.22': return attrs = self.attrs(attrs_list) if not attrs.has_key('href'): return m = re.match('editparams.cgi\?section=(.*)', attrs['href']) if m: self.panels.append(m.group(1)) def handle_data(self, data): if self.collecting_params and self.textarea_name: self.textarea_contents.append(data) def server_parameters(self): params = { 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, } result = self.run_script('editparams.cgi', params) parser = self.editparams_parser() parser.feed(result) parser.close() if config.bugzilla_version < '2.22': return parser.params else: table = {'core': parser.params} panels = parser.panels for panel in panels: params = { 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, 'section': panel, } result = self.run_script('editparams.cgi', params) parser = self.editparams_parser() parser.feed(result) parser.close() table[panel] = parser.params return table def edit_parameters(self, new_params): params = self.server_parameters() if config.bugzilla_version < '2.22': params.update({ 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, }) params.update(new_params) self.run_script('doeditparams.cgi', params) else: log_file.write('Updating parameters...\n') for (panel, dict) in params.items(): updated = False for (k,v) in new_params.items(): if dict.has_key(k): log_file.write('Panel %s key %s value %s...\n' % (panel, k, v)) dict[k] = v updated = True if updated: log_file.write('updating...\n') dict.update({ 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, 'action': 'save', 'section': panel, }) self.run_script('editparams.cgi', dict) # It takes a moment for the new table to become available to # other MySQL connections, so wait a bit. time.sleep(4) # 3.2.8. Run command and check results # # Calls an external program, raising an exception upon any error # and returning standard output and standard error from the program, # as well as writing them to the log. def system(self, command, ignore_failure = 0, input_lines = None): log_file.write('Executing %s\n' % repr(command)) (child_stdin, child_stdout) = os.popen4(command) if input_lines: child_stdin.writelines(input_lines) child_stdin.close() output = child_stdout.read() result = child_stdout.close() log_file.write(output) if not ignore_failure and result: message = ('Command "%s" failed with result code %d.' % (command, result)) log_file.write(message + '\n') raise message return output class Bugzilla_nt_interface(Bugzilla_mixin, Perforce_mixin): pass class Bugzilla_posix_interface(Bugzilla_mixin, Perforce_mixin): pass # 4. P4DTI TEST CASE BASE CLASS # # The p4dti_base class is a generic P4DTI test case. It defines methods # for setting up the integration and some utilities for recording and # interrogating the output of the replicator. # # Other P4DTI test cases will inherit from p4dti_base. # # When a class implements several test cases, the methods that implement # test cases (in the PyUnit sense) should have names starting "test_". # When a class implements a single test case, the method should be # called "runTest". class p4dti_base(p4dti_unittest.TestCase): # Defect tracker interface (an instance of one of the classes in # section 3 above). dti = eval(config.dt_name + '_' + os.name + '_interface')() # Messages written to the replicator's log. log_messages = [] # Mail messages sent by the replicator. Each message is a triple # (recipients, subject, body); see the definition of mail() method # in the replicator class. mail_messages = [] # The replicator. r = None # 4.1. Snoop on the logger # # I want to be able to test that the correct messages are appearing # in the log, so I override the log method in the # logger.multi_logger class so that it records the messages as well # as printing them to the log file. Printing to a log file is # necessary so that the test engineer can analyze what happened when # a test case fails. # # A disadvantage of this implementation is that the logging code is # not tested. Therefore there needs to be a separate set of logging # tests. def snoop_logger(self): logger.multi_logger.log = self.log logger.file_logger.log = self.log logger.sys_logger.log = self.log def log(self, msg): log_message(msg) self.log_messages.append(msg) # The clear_log method can be used to clear this record of the log # before carrying out a test. def clear_log(self): self.log_messages = [] self.mail_messages = [] # 4.2. Snoop on e-mail def snoop_mail(self): sys.modules['smtplib'] = self def log_separator(self): log_file.write("-" * 72) log_file.write("\n") def SMTP(self, server): self.log_separator() return self def quit(self): self.log_separator() def sendmail(self, from_address, to, text): self.mail_messages.append((to, text)) log_file.write(text) # 4.3. Check that the log is as expected # # These methods make various checks on the messages recorded in the # log. def log_message_ids(self): return map(lambda m: m.id, self.log_messages) def expected_only(self, expected): for m in self.log_messages: assert m.id in expected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected_only_in_range(self, min, max, expected): for m in self.log_messages: if m.id >= min and m.id <= max: assert m.id in expected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected_not(self, unexpected): for m in self.log_messages: assert m.id not in unexpected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected(self, expected): found = {} for m in self.log_messages: if found.has_key(m.id): found[m.id] = found[m.id] + 1 else: found[m.id] = 1 for id in expected: assert found.has_key(id) and found[id] > 0, \ ("Expected message %d not found in log %s" % (id, self.log_message_ids())) found[id] = found[id] - 1 def expectation(self, expected, maybe=[]): self.expected(expected) self.expected_only_in_range(800, 1000, expected + maybe) # 4.4. Set up everything so that test cases can run # # Get a fresh configuration, a new defect tracker and a new Perforce # server. def setup_everything(self, config_changes = {}, perforce_unicode = False): reset_configuration() for key, value in config_changes.items(): setattr(config, key, value) self.dti.restart_defect_tracker() self.dti.restart_perforce(unicode = perforce_unicode) self.snoop_logger() self.snoop_mail() # Get a temporary Perforce interface suitable for setting the # Perforce password. if config.p4_password: p4i = p4.p4(port = config.p4_port, user = config.p4_user, client_executable = config.p4_client_executable, logger = self ) p4i.run('user -o') # Problem with quoting strings in arguments to p4 on # Windows using os.popen (as p4i.run does). NB 2002-10-28. if os.name == 'nt': if ' ' in config.p4_password: raise "space in p4_password on Windows" p4i.run('passwd -P %s' % config.p4_password) else: p4i.run('passwd -O "" -P "%s"' % config.p4_password) # Get a permanent Perforce interface using the password. self.p4 = p4.p4(port = config.p4_port, user = config.p4_user, password = config.p4_password, client_executable = config.p4_client_executable, logger = self ) # Set the user's email address to the replicator_address user = self.p4.run('user -o')[0] user['Email'] = config.replicator_address self.p4.run('user -i', user) # 4.5. Initialize the replicator # # Load the init module. If it's already loaded, reload it: we must # reload it because we want the init module to run again and to pick # up on the new configuration. (Some other modules have the same # properties.) def initialize_replicator(self): self.mail_messages = [] for m in ['init']: if sys.modules.has_key(m): del sys.modules[m] import init self.r = init.r # Regression test for job000199. assert self.mail_messages == [] # 4.6. Normal replicator startup # # This is a pseudo test case. It checks that the replicator can # start up normally. Other tests may depend on this having been # run; for example, the check_nothing test (5.1). This test must be # run before any other test cases, and therefore can't be part of a # test suite (the tests of which may be run singly or in any order). # It should therefore be run from the setUp() method of a subclass # of this class (unless of course that subclass has a different idea # of what should happen at startup). def check_startup(self): self.initialize_replicator() # Expect to get a startup e-mail self.clear_log() self.r.prepare_to_run() self.expectation([800, 866], [867, 870, 910]) # Expect to set up issues and replicate them, but no jobs or # conflicts. self.poll() self.expectation([803, 804, 812, 911, 912]) # There should be no further activity if we replicate again. self.poll() self.expected_only_in_range(800, 1000, [911, 912]) # 4.7. Exceptional replicator behaviour # # This method calls the given function but expects to get the # exception given by 'error', with message id 'msgid'. def check_exception(self, function, error, msgid): try: function() except: err, msg = sys.exc_info()[0:2] assert err == error, \ ("Expected error %s but got %s." % (error, err)) assert msg.id == msgid, \ ("Expected message %d but got %d (%s)" % (msgid, msg.id, str(msg))) else: self.fail("Expected error %s but didn't get it." % error) # check_startup_error() is a special case of check_exception for the # initialize_replicator method. def check_startup_error(self, error, msgid): self.check_exception(self.initialize_replicator, error, msgid) # 4.8. Check consistency # # This method checks that the databases are consistent. def check_consistency(self): self.clear_log() self.r.check_consistency() self.expectation([871, 885], [883, 884, 890]) # 4.9. Check replication of a single issue def check_replication_dt_to_p4(self, first_time = 0): self.poll() if first_time: self.expected([803]) # set up for replication self.expectation([804, 812, 911, 912], [803]) # Nothing should come back from Perforce. self.poll() self.expected_only_in_range(800, 1000, [911, 912]) # 4.10. Initialize Perforce repository # # This method sets up Perforce clients and workspaces for a set of # users, and adds a file to the repository. It sets up the # following members: # # p4i: Map from user to a Perforce interface for that user. # workspace: Map from user to the client workspace for that user. # # This is only needed for tests involving Perforce fixes: that's why # it's not called in setup_everything(). def setup_perforce(self, users): # Create Perforce interfaces and workspaces for the dummy users. self.p4i = {} self.workspace = {} for user in users: self.workspace[user] = os.path.join(self.dti.p4dir, user) os.mkdir(self.workspace[user]) self.p4i[user] = p4.p4( port = config.p4_port, user = user, client = user + '_' + socket.gethostname(), client_executable = config.p4_client_executable, logger = self, ) # Make the Perforce user record. p4_user = self.p4i[user].run('user -o')[0] p4_user['Email'] = ('%s%s' % (user,email_suffix)) self.p4i[user].run('user -i', p4_user) # Make a Perforce client. client = self.p4i[user].run('client -o')[0] client['Root'] = self.workspace[user] self.p4i[user].run('client -i', client) # Add a file to the repository so that we have something with # which to make changes and fixes. user = users[0] filename = os.path.join(self.workspace[user], 'test') open(filename, 'w') self.p4i[user].run('add -t ktext %s' % filename) change = self.p4i[user].run('change -o')[0] change['Description'] = 'Added test file' self.p4i[user].run('submit -i', change) # 4.11. Make changelist in Perforce # # This method makes a changelist in Perforce and edits a file in # that changelist. The changelist is returned without being # submitted. def edit_file(self, user): change = self.p4i[user].run('change -o')[0] change['Description'] = 'Edited test file' result = self.p4i[user].run('change -i', change)[0] changelist = string.split(result['data'])[1] filename = os.path.join(self.workspace[user], 'test') # Perforce reports "files up to date" as an error, so ignore it. try: self.p4i[user].run('sync %s' % filename) except p4.error: pass self.p4i[user].run('edit -c %s %s' % (changelist, filename)) f = open(filename, 'a') f.write("Foo\n") f.close() return self.p4i[user].run('change -o %s' % changelist)[0] # 4.12 Variant tests # # This class has variant methods for each defect tracker (e.g., # Bugzilla) and calls the appropriate one. def run_variant(self): try: test = getattr(self, config.dt_name) except AttributeError: assert 0, "No test variant for " + config.dt_name + "." test() # 4.13. Poll the databases, expecting no errors expected_notices = [607] def poll(self): self.clear_log() self.r.carefully_poll_databases() for m in self.log_messages: if (isinstance(m, message.message) and m.id not in self.expected_notices): assert m.priority >= message.INFO, \ ("Expected no errors, but message '%s' has " "priority %d." % (str(m), m.priority)) # 4.14. Create a job # # Fill in fields. Return the name of the job. def create_job(self, p4i, job): for k, v in job.items(): if string.find(v, '') == 0: job[k] = 'foo' result = p4i.run('job -i', job) return string.split(result[0]['data'])[1] # 5. TEST CASES: NORMAL OPERATION # # If nothing has changed, then nothing happens when the replicator # polls. The databases are consistent. class normal(p4dti_base): unicode = False def setUp(self): self.setup_everything(perforce_unicode = self.unicode) self.check_startup() def runTest(self): "Startup, replication to Perforce (test_p4dti.normal)" self.check_consistency() # 6. TEST CASES: INCORRECT CONFIGURATIONS # # This is a regression test of job000037, job000075 and job000116. class bogus(p4dti_base): def setUp(self): self.setup_everything() # 6.1. An incorrect parameter generates an error # # This is a utility function for carrying out a range of tests. It # resets the configuration, sets the parameter named by 'param' to # value, then tries to start the replicator. It expects to get an # exception, whose message should have the message id 'msgid'. def check_param(self, param, value, msgid): reset_configuration() setattr(config, param, value) try: self.initialize_replicator() except: err, msg = sys.exc_info()[0:2] if isinstance(msg, message.message): if msg.id != msgid: self.addFailure("Set parameter %s to '%s': " "expected message %d but got %d " "(%s)" % (param, value, msgid, msg.id, str(msg))) else: self.addFailure("Set parameter %s to '%s': expected " "message %d but got '%s: %s' instead." % (param, value, msgid, err, msg)) else: self.addFailure("Set parameter %s to '%s': expected " "message %d but didn't get it." % (param, value, msgid)) # 6.2. Basic errors in parameters are caught quickly # # Basic errors in parameters (wrong type, wrong format) should be # caught quickly. # # This is a table of (parameter name, bogus value, message id of # expected error). bogus_basic_parameters = [ # Regression for job000170: ('administrator_address', 'invalid e-mail address', 202), ('changelist_url', -1, 207), ('changelist_url', "http://invalid/%d/%s", 210), ('changelist_url', "http://invalid/no/format/specifier", 210), ('changelist_url', "http://invalid/%d/%%/%%%", 210), ('closed_state', -1, 208), ('configure_name', -1, 207), ('job_url', 42, 207), ('job_url', "http://invalid/%d/%s", 211), ('job_url', "http://invalid/no/format/specifier", 211), ('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211), ('log_file', -1, 208), ('log_level', 'not an int', 204), ('migrate_p', 'not a function', 203), ('p4_client_executable', -1, 207), # Regression test for job000158: ('p4_client_executable', 'no such file', 705), ('p4_user', None, 207), ('p4_password', -1, 207), ('p4_password', 'incorrectpassword', 706), ('p4_port', None, 207), # Regression test for job000158, job000202: ('p4_port', '127.0.0.1:9999', 707), ('p4_server_description', -1, 207), ('poll_period', 'not an int', 204), ('prepare_issue', 'not a function', 203), ('replicator_address', 'invalid@e-mail@address', 202), ('replicate_p', 'not a function', 203), ('replicate_job_p', 'not a function', 203), ('replicated_fields', 'not a list', 205), ('replicated_fields', ['not', 'list', 'of', 'strings', 0], 206), ('replicated_fields', ['not', ('list', 'of'), 'strings', 0], 206), ('rid', -1, 207), ('rid', '0abc', 209), ('rid', 'ab-c', 209), ('sid', -1, 207), ('sid', 'abcdefg+z', 209), ('smtp_server', -1, 207), ('start_date', '2001-02-03 24-00-00', 201), ('use_deleted_selections', 'neither 0 nor 1', 200), ('use_deleted_selections', -1, 200), ('use_deleted_selections', 2, 200), ('use_perforce_jobnames', 'neither 0 nor 1', 200), ('use_perforce_jobnames', -1, 200), ('use_perforce_jobnames', 2, 200), ('use_stdout_log', 'neither 0 nor 1', 200), ('use_stdout_log', -1, 200), ('use_stdout_log', 2, 200), ] def test_basic_parameters(self): for param, value, msgid in self.bogus_basic_parameters: self.check_param(param, value, msgid) # 6.3. Basic errors in DT parameters are caught quickly # # As 6.2, but picks a set of tests based on config.dt_name. bogus_Bugzilla_parameters = [ ('bugzilla_directory', 'not a directory', 303), ('bugzilla_directory', '/', 304), ('closed_state', 'not a Bugzilla state', 301), ('dbms_database', -1, 207), ('dbms_host', -1, 207), # Regression for job000168: ('dbms_port', '1234', 204), ('dbms_user', -1, 207), ('dbms_password', -1, 207), ('migrated_user_password', -1, 207), ('replicated_fields', ['bug_status'], 311), ('replicated_fields', ['assigned_to'], 311), ('replicated_fields', ['short_desc'], 311), ('replicated_fields', ['resolution'], 311), ('replicated_fields', ['longdesc', 'longdesc'], 312), ('replicated_fields', ['not a Bugzilla field'], 307), ] def test_dt_parameters(self): dt_params = getattr(self, 'bogus_%s_parameters' % config.dt_name) for param, value, msgid in dt_params: self.check_param(param, value, msgid) # 6.4. Parameter errors are caught by the defect tracker # # Like 6.2 and 6.3 this test sets a parameter to an incorrect value. # In this case the error is caught by the defect tracker, so a # message object isn't returned, but rather a string, which we must # test directly rather than by message id. mysql_errors = [ '_mysql.OperationalError', '_mysql_exceptions.OperationalError', ] erroneous_Bugzilla_parameters = [ ('dt_name', 'not a defect tracker', 'exceptions.ImportError', 'No module named configure_not a defect tracker'), ('dbms_host', 'host.invalid', mysql_errors, '(2005, "Unknown MySQL Server Host'), ('dbms_database', 'invalid', mysql_errors, ['(1049, "Unknown database \'invalid\'")', '(1044, "Access denied for user:' ]), ('dbms_password', 'not the Bugzilla password', mysql_errors, ['(1045, "Access denied for user:', '(1044, "Access denied for user:']), ('dbms_user', 'not the Bugzilla user', mysql_errors, ['(1045, "Access denied for user:', '(1044, "Access denied for user:']), ] def test_dt_errors(self): params = getattr(self, 'erroneous_%s_parameters' % config.dt_name) for param, value, errors, message_texts in params: reset_configuration() setattr(config, param, value) try: self.initialize_replicator() except: (err, msg, _) = sys.exc_info() if not isinstance(errors, types.ListType): errors = [errors] if str(err) not in errors: self.addFailure("Set parameter %s to '%s': " "expected error in %s but got " "error '%s'." % (param, value, errors, str(err))) if isinstance(message_texts, types.ListType): texts = message_texts else: texts = [message_texts] found = 0 for text in texts: if str(msg)[0:len(text)] == text: found = 1 break if not found: self.addFailure("Set parameter %s to '%s': " "expected error message in %s but " "got '%s'." % (param, value, texts, msg)) else: self.addFailure("Set parameter %s to %s: expected " "error in %s but there was no error." % (param, value, errors)) # 6.5. OS-specific parameter tests # # As 6.3, but picks a set of tests based on os.name. bogus_nt_parameters = [ ('use_windows_event_log', -1, 200), ('use_windows_event_log', 2, 200), ('use_windows_event_log', 'neither 0 nor 1', 200), ] bogus_posix_parameters = [ # By using fake Perforce client executables we can check that # unsupported client and server versions are detected. # Regression test for job000173. ('p4_client_executable', './fake_p4.py', 704), ('p4_client_executable', './fake_p4d_old_changelevel.py', 834), ('p4_client_executable', './fake_p4d_no_changelevel.py', 835), ] def test_os_parameters(self): os_params = getattr(self, 'bogus_%s_parameters' % os.name) for param, value, msgid in os_params: self.check_param(param, value, msgid) def runTest(self): "Illegal configuration parameters (test_p4dti.bogus)" self.test_basic_parameters() self.test_dt_parameters() self.test_dt_errors() self.test_os_parameters() # 7. TEST CASE: EXISTING JOB IN PERFORCE # # The replicator should refuse to start if there's a job in Perforce. # # This is a regression test for job000219 and job000240. class existing(p4dti_base): def setUp(self): self.setup_everything() self.initialize_replicator() def runTest(self): "Startup with an existing job (test_p4dti.existing)" j = self.p4.run('job -o')[0] j['Description'] = 'Test job' self.p4.run('job -i -f', j) self.check_exception(self.r.poll, self.r.error, 914) # 8. TEST CASE: MOVING THE START DATE # # # 8.1. Moving the start date backwards in time # # When start_date is set to the current time, no issues should be # replicated when the replicator starts. Similarly, refreshing Perforce # has no effect. But you can set start_date back in time and refresh # Perforce, this time with effect. # # This is a regression test for job000047, job000050, job000221. class start_1(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Moving the start_date backwards in time (test_p4dti.start_1)" reset_configuration() config.start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) self.initialize_replicator() # When we poll, nothing should happen. self.clear_log() self.r.poll() self.expected_only_in_range(800, 1000, [911, 912]) # Nor when we refresh. self.clear_log() self.r.refresh_perforce_jobs() self.expected_only_in_range(800, 1000, [911, 912]) # The databases should report consistent. self.check_consistency() # Now set start date back in time and try refreshing again. reset_configuration() config.start_date = "1971-01-01 00:00:00" self.initialize_replicator() self.clear_log() self.r.refresh_perforce_jobs() self.expectation([803, 804, 812]) # The databases should still report consistent. self.check_consistency() # 8.2. Moving the start date forwards in time # # Start up with an old start date as normal, then move the start date # forwards in time. The databases should still report consistent, # because issues that were recorded as being replicated in the first # poll should still be recorded as replicated, even though they haven't # changed since the start date. # # This is a regression test for job000340. class start_2(p4dti_base): def setUp(self): self.setup_everything() self.check_startup() def runTest(self): "Moving the start_date forwards in time (test_p4dti.start_2)" # Set start date forward in time and check the consistency. reset_configuration() config.start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) self.initialize_replicator() # When we poll, nothing should happen. self.poll() self.expected_only_in_range(800, 1000, [911, 912]) # The databases should report consistent. self.check_consistency() # 9. TEST CASE: REPLICATING BY PROJECT # # This is a regression test for job000107, job000112, job000311. class project(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Replicate by project (test_p4dti.project)" self.run_variant() def Bugzilla(self): config.replicated_fields = ['product'] config.replicate_p = lambda self: self['product'] == 'product 1' self.check_startup() jobs = self.p4.run('jobs') for j in jobs: assert j['Product'] == 'product 1\n', \ ("Job %s has product %s; shouldn't be replicated." % (j['Job'], j['Product'])) self.check_consistency() # 10. ISSUE LIFE CYCLE TEST CASES # # This test creates issues and takes them through various kinds of # lifecycle, checking that they are replicated correctly at each step. class lifecycle(p4dti_base): user = 'rb' user1 = 'gdr' def setUp(self): self.setup_everything() self.setup_perforce([self.user, self.user1]) self.check_startup() # Submit a new issue to the defect tracker. Run a replication # cycle; check that the issue gets replicated to perforce. Return a # pair of the defect tracker issue and the Perforce job. def submit(self, user): id = getattr(self, config.dt_name + '_submit')(user) # It gets replicated to Perforce. This is a regression test for # job000233. self.check_replication_dt_to_p4(first_time = 1) # Check that the job has been created in Perforce. issue = self.r.dt.issue(id) assert issue jobname = issue.corresponding_id() job = self.p4.run('job -o %s' % jobname)[0] # Defect-tracker specific checks. getattr(self, config.dt_name + '_submitted')(issue, job) return issue, job # Assign the issue to the user in the defect tracker. Run a # replication cycle; check that the assignment gets replicated. # Return the updated defect trakcer issue and Perforce job. def assign(self, issue, job, user): getattr(self, config.dt_name + '_assign')(issue, job, user) # It gets replicated to Perforce. self.check_replication_dt_to_p4() # Defect-tracker specific checks. issue = self.r.dt.issue(issue.id()) job = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_assigned')(issue, job) return issue, job # The job has been closed in Perforce. Check that the closure is # replicated to the defect tracker and that the expected messages # appear. def close(self, issue, job, expected): self.poll() self.expectation(expected + [911, 912]) issue1 = self.r.dt.issue(issue.id()) job1 = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_closed')(issue, job, issue1, job1) def Bugzilla_submit(self, user): fields = { 'reporter': 'nb@ravenbrook.com', 'product': 'product 1', 'version': 'unspecified', 'component': 'component 1.1', 'rep_platform': 'All', 'op_sys': 'All', 'priority': 'P1', 'bug_severity': 'critical', 'assigned_to': 'gdr@ravenbrook.com', 'cc': '', 'bug_file_loc': '', 'short_desc': 'Life cycle # test bug', 'comment': 'Life cycle # test long description.', 'submit': ' Commit ', 'form_name': 'enter_bug', 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, } result = self.dti.run_script('post_bug.cgi', fields) match = re.search('<[Hh]2>Bug ([0-9]+) (posted|has been added to the database)', result) if match: bugid = match.group(1) else: self.fail("Tried to submit a bug to Bugzilla, but got the " "following in reply: %s." % result) # Sleep for a second so that we can be sure to pick this # up on the next poll. time.sleep(1) return bugid def Bugzilla_submitted(self, bug, job): for field, expected in [ ('Status', 'bugzilla_new'), ('Summary', 'Life cycle # test bug\n'), ('Priority', 'P1'), ('Severity', 'critical'), ]: assert job[field] == expected, \ ("Expected new job %s to have %s '%s', but found " "'%s'." % (job['Job'], field, expected, job[field])) def Bugzilla_assign(self, bug, job, user): # Assign the issue in Bugzilla. Ideally this should go through # the Bugzilla web interface, but that's horrible. So cheat for # now by going through the very low-level Bugzilla interface. # (This is horrible too!) bz_user = self.r.config.user_translator.translate_1_to_0( user, self.r.dt, self.r.dt_p4) changes = {'bug_status': 'ASSIGNED', 'assigned_to': bz_user} bug_id = bug['bug_id'] self.r.dt.bugzilla.update_row('bugs', changes, 'bug_id = %d' % bug_id) for k, v in changes.items(): if k not in self.r.dt.bugzilla.fields_not_in_bugs_activity: activity = { 'bug_id': bug_id, 'who': bz_user, 'bug_when': self.r.dt.bugzilla.now(), 'fieldid': self.r.dt.bugzilla.fieldid(k), } if self.r.dt.bugzilla.user_fields.has_key(('bugs', k)): users = self.r.dt.bugzilla.users() activity['removed'] = users.get(bug[k])['login_name'] activity['added'] = users.get(v)['login_name'] else: activity['removed'] = str(bug[k]) activity['added'] = str(v) self.r.dt.bugzilla.insert_row('bugs_activity', activity) # Wait two seconds so that the bug will be picked up in the next # poll, not the one after. time.sleep(2) def Bugzilla_assigned(self, bug, job): # The status should now be assigned. job = self.p4.run('job -o %s' % job['Job'])[0] assert job['Status'] == 'assigned', \ ("Expected assigned job %(Job)s to have status " "'assigned' but it has state %(Status)s." % job) def Bugzilla_closed(self, bug, job, bug1, job1): pass def runTest(self): "Issue life cycle (test_p4dti.lifecycle)" # 10.1. Simple issue lifecycle # # This is a simple issue cycle: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by editing the job). # 5. The closure gets replicated back to the defect tracker. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) # Close the job in Perforce. This is a regression test for # job000118. job[self.r.config.job_status_field] = 'closed' job['User'] = self.user1 self.p4.run('job -i -f', job) self.close(issue, job, [805, 824, 826]) # 10.2. Issue lifecycle (fix in Perforce) # # This is an issue cycle in which the issue is associated with a # changelist in Perforce: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by making a fix). # 5. Closure and fix get replicated back to the defect # tracker. # # This is a regression test for job000133. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) self.p4.run('fix -c 1 %s' % job['Job']) self.close(issue, job, [802, 805, 819, 820, 824, 826]) # 10.3. Issue lifecycle (fix on submission in Perforce) # # This tests an issue lifecycle in which the issue is associated # with a pending changelist and closed on submission. # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. The job description is edited in Perforce (this is a # regression test for job000362). # 5. A fix is made with a pending changelist. # 6. The change is submitted. # 7. Job, fix get replicated back to the defect tracker. # # This is a regression test for job000225. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) job['Description'] = job['Description'] + '\nEdited.' self.p4i[self.user].run('job -i -f', job) self.poll() self.expectation([805, 824, 911, 912], [826]) change = self.edit_file(self.user) self.p4i[self.user].run('fix -c %s %s' % (change['Change'], job['Job'])) self.p4i[self.user].run('submit -c %s' % change['Change']) self.close(issue, job, [802, 805, 819, 820, 824, 826]) # Delete fix in Perforce and check that the deletion is # replicated (regression test for job000013 and job000222). self.p4i[self.user].run('fix -d -c %s %s' % (change['Change'], job['Job'])) self.poll() self.expected([818]) fixes = self.p4.run('fixes -j %s' % job['Job']) assert len(fixes) == 0, ("Expected no fixes for %s, but found " "%s." % (job['Job'], fixes)) # 10.4. Issue lifecycle (fix to assigned on submission in # Perforce) # # As section 10.3, but the fix is to the job's current state # rather than "closed", so the job state doesn't change. # Regression test for job000007. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) change = self.edit_file(self.user) status = job[self.r.config.job_status_field] self.p4i[self.user].run('fix -s %s -c %s %s' % (status, change['Change'], job['Job'])) self.p4i[self.user].run('submit -c %s' % change['Change']) self.poll() self.expectation([802, 805, 819, 820, 825, 911, 912]) # Change fix status (only -- note that we set the job status # back); check that it's replicated. self.p4i[self.user].run('fix -s closed -c %s %s' % (change['Change'], job['Job'])) self.p4i[self.user].run('job -i -f', job) self.poll() self.expectation([805, 819, 821, 825, 911, 912]) # 10.5. Simultaneous edit # # Provoke a conflict by changing an issue simultaneously in both # systems. Then try again, but with a user other than the job # owner. (Check that mail goes to both.) # # Make, update and delete a fix in Perforce simultaneously with # the change. Make sure the fixes are respectively deleted, # updated and restored. Ditto for filespecs. change1 = self.edit_file(self.user) # make change2 = self.edit_file(self.user) # update change3 = self.edit_file(self.user) # delete self.poll() for u in [self.user, self.user1]: issue, job = self.submit(self.user) status = job[self.r.config.job_status_field] # Make the fixes that we are going to update and delete. self.p4i[self.user].run('fix -s %s -c %s %s' % (status, change2['Change'], job['Job'])) self.p4i[self.user].run('fix -s %s -c %s %s' % (status, change3['Change'], job['Job'])) job['P4DTI-filespecs'] = 'filespec_1\n' self.p4i[self.user].run('job -i -f', job) self.poll() # Now edit the job simultaneously in DT and Perforce. getattr(self, config.dt_name + '_assign')(issue, job, self.user) job[self.r.config.job_status_field] = 'closed' job['P4DTI-filespecs'] = 'filespec_2\n' self.p4i[u].run('job -i -f', job) # Make, update and delete those fixes. self.p4i[u].run('fix -c %s %s' % (change1['Change'], job['Job'])) self.p4i[u].run('fix -c %s %s' % (change2['Change'], job['Job'])) self.p4i[u].run('fix -d -c %s %s' % (change3['Change'], job['Job'])) self.clear_log() self.r.carefully_poll_databases() self.expectation([800, 806, 811, 814, 815, 816, 817, 841, 860, 861, 853, 862, 812, 910, 911, 912]) issue = self.r.dt.issue(issue.id()) job = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_assigned')(issue, job) self.poll() fixes = self.p4.run('fixes -j %s' % job['Job']) assert job['P4DTI-filespecs'] == 'filespec_1\n' assert len(fixes) == 2 assert fixes[0]['Change'] == change3['Change'] assert fixes[0]['Status'] == status assert fixes[1]['Change'] == change2['Change'] assert fixes[1]['Status'] == status for m in self.log_messages: if m.id == 800: assert string.find(m.text, config.administrator_address) assert string.find(m.text, self.user) != -1 assert string.find(m.text, u) != -1 # 10.6. Illegal changes # # Regression test for job000429. tests = getattr(self, config.dt_name + '_illegal_changes') for field, value, expected in tests: issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) job[field] = value self.p4i[self.user].run('job -i -f', job) self.clear_log() self.r.carefully_poll_databases() self.expectation(expected + [800, 805, 811, 851, 860, 861, 862, 812, 852, 853, 910, 911, 912], [824, 923]) self.poll() # We're done; one last check for luck. self.check_consistency() Bugzilla_illegal_changes = [ ('Product', 'no_such_product', [504]), ('Description', 'foo', [505]), ('Status', 'verified', [503]), ] # 11. P4DTI CONFIGURATION DATABASE # # This checks that parameters get added and removed from the # configuration database. This is a regression test for job000169 and # job000351. class configdb(p4dti_base): def setUp(self): self.setup_everything() def Bugzilla_config(self): return self.r.dt.bugzilla.get_config() def check(self, cf1): reset_configuration() for k, v in cf1.items(): setattr(config, k, v) self.initialize_replicator() cf2 = getattr(self, config.dt_name + '_config')() for k, v in cf1.items(): if v != cf2.get(k): self.addFailure("Set parameter %s to '%s', but found " "'%s' in the config database." % (k, v, cf2.get(k))) def runTest(self): "Replicator configuration database (test_p4dti.configdb)" cf1 = { 'p4_server_description': 'spong', 'changelist_url': 'http://spong/changelist?%d', 'job_url': 'http://spong/job?%s' } cf2 = { 'p4_server_description': 'spong', 'changelist_url': None, 'job_url': None } self.check(cf1) self.check(cf2) self.check(cf1) # 12. INCONSISTENCIES # # This test case checks that each kind of inconsistency that can be # reported by the consistency checking script is detected and reported. class inconsistencies(p4dti_base): user = 'rb' def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def runTest(self): "Consistency check failures (test_p4dti.inconsistencies)" # 12.1. Unreplicated fix in Perforce change = self.edit_file(self.user) job1 = self.p4.run('jobs')[0] self.p4i[self.user].run('fix -s %s -c %s %s' % (job1[config.job_status_field], change['Change'], job1['Job'])) self.clear_log() self.r.check_consistency() self.expectation([871, 878, 886, 890], [883, 884]) self.poll() # 12.2. Fix with wrong status in Perforce job1status = job1[config.job_status_field] assert job1status != 'assigned' self.p4i[self.user].run('fix -s assigned -c %s %s' % (change['Change'], job1['Job'])) self.clear_log() self.r.check_consistency() self.expectation([871, 880, 886, 890], [883, 884]) self.p4i[self.user].run('fix -s %s -c %s %s' % (job1status, change['Change'], job1['Job'])) # 12.3. Changed job in Perforce job1['Description'] = job1['Description'] + '...\n' self.p4i[self.user].run('job -i -f', job1) self.clear_log() self.r.check_consistency() self.expectation([871, 875, 886, 890], [883, 884]) self.poll() # 12.4. Unreplicated job in Perforce # # This is a regression test for job000372. job4 = self.p4i[self.user].run('job -o')[0] job4['P4DTI-rid'] = config.rid job4['P4DTI-issue-id'] = "999999" job4name = self.create_job(self.p4i[self.user], job4) self.clear_log() self.r.check_consistency() self.expectation([871, 882, 886, 890], [883, 884]) # 12.5. Unreplicated job pointing to replicated issue issueid = self.p4.run('jobs')[0]['P4DTI-issue-id'] job5 = self.p4i[self.user].run('job -o %s' % job4name)[0] job5['P4DTI-issue-id'] = issueid self.p4i[self.user].run('job -i -f', job5) self.clear_log() self.r.check_consistency() self.expectation([871, 881, 886, 890], [883, 884]) self.p4i[self.user].run('job -d %s' % job5['Job']) # 12.6. Missing job in Perforce job6 = self.p4.run('jobs')[0] self.p4i[self.user].run('job -d %s' % job6['Job']) self.clear_log() self.r.check_consistency() self.expectation([871, 873, 886, 890], [883, 884]) # 12.7. Job pointing to wrong issue # # We also get "(P4DTI-875X) Job '%s' would need the following # set of changes ..." because P4DTI-issue-id is wrong, and # "(P4DTI-8793) Change %d fixes issue '%s' but there is no # corresponding fix for job '%s'." because we added a fix and # replicated it in section 12.1 above but now it's missing in # Perforce. id = job6['P4DTI-issue-id'] job6['P4DTI-issue-id'] = '999999' self.p4i[self.user].run('job -i -f', job6) self.clear_log() self.r.check_consistency() self.expectation([871, 874, 875, 879, 887, 890], [883, 884]) job6['P4DTI-issue-id'] = id # Put things back to rights, and add a filespec in preparation # for section 12.8 below. job6['P4DTI-filespecs'] = 'filespec_1\n' self.p4i[self.user].run('job -i -f', job6) self.poll() self.check_consistency() # 12.8. Incorrect filespecs # # By changing the filespec we can provoke messages about # filespecs being missing in both sides. job6['P4DTI-filespecs'] = 'filespec_2\n' self.p4i[self.user].run('job -i -f', job6) self.clear_log() self.r.check_consistency() self.expectation([871, 876, 877, 887, 890], [883, 884]) # Polling should sort everything out. self.poll() self.check_consistency() # 13. RACE DURING REPLICATION OF FIXES # # This is a regression test for job000385. class race_385(p4dti_base): user = 'rb' change = None original_fixes_differences = None race_flag = None def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def race(self, dt_fixes, p4_fixes): fix_diffs = self.original_fixes_differences(dt_fixes, p4_fixes) # Submit the change (this is the race). But only once. if self.race_flag == 0: self.p4i[self.user].run('submit -c %s' % self.change['Change']) self.race_flag = 1 return fix_diffs def runTest(self): "Race during replication of fixes (test_p4dti.race_385)" # Create a pending change in Perforce and make a fix to that # change. self.change = self.edit_file(self.user) job = self.p4.run('jobs')[0] self.p4i[self.user].run('fix -s %s -c %s %s' % (job[config.job_status_field], self.change['Change'], job['Job'])) # Create a second pending change in Perforce (so that the first # change will get renumbered when submitted). self.edit_file(self.user) # We'll make sure that the change gets submitted after # fixes_differences gets called -- this is our last opportunity # to run the race before the possibly illegal call to p4 change # -o. self.original_fixes_differences = self.r.fixes_differences self.r.fixes_differences = self.race self.race_flag = 0 # Replicate. self.poll() # Restore the original fixes_differences. self.r.fixes_differences = self.original_fixes_differences # Check that the replication succeeded. self.expectation([802, 805, 819, 820], [825, 911, 912]) self.check_consistency() # 14. CAN CONFIRM AN UNCONFIRMED BUG # # This is a regression test for job000262 and job000410. Bugzilla only. class unconfirmed(p4dti_base): bug_id = 27 # this bug is UNCONFIRMED in our test database user = 'rb' def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def runTest(self): "Confirm an UNCONFIRMED bug (test_p4dti.unconfirmed)" job = self.p4.run('job -o bug%d' % self.bug_id)[0] assert job['Status'] == 'unconfirmed' job['Status'] = 'bugzilla_new' self.p4i[self.user].run('job -i -f', job) # Replicate. self.poll() # Check that the replication succeeded. self.expectation([805, 824], [911, 912]) self.check_consistency() # 15. REPLICATE NEW JOBS FROM PERFORCE # # This checks that a new job in Perforce can be submitted to the defect # tracker and successfully replicated thereafter. This is a regression # test for job000036. class new_p4_job(p4dti_base): user = 'rb' Bugzilla_job = { 'Summary': 'test summary', 'User': user, 'Description': 'test description', 'Priority': 'P3', 'Severity': 'normal', 'Product': 'product 1', } def Bugzilla_prepare_issue(self, issue, job): default_component = { 'product 1': 'component 1.1', 'product 2': 'component 2.1', 'unconfirmed': 'unconf1', 'group': 'group1', } if issue['product'] == '': issue['product'] = 'product 1' if issue['component'] == '': issue['component'] = default_component[issue['product']] if issue['version'] == '': issue['version'] = 'unspecified' def setUp(self): self.setup_everything() config.prepare_issue = getattr(self, config.dt_name + '_prepare_issue') config.replicate_job_p = lambda(job): 1 self.setup_perforce([self.user]) self.check_startup() def runTest(self): "Replicate a new job from Perforce (test_p4dti.new_p4_job)" # Add a job to Perforce. job = self.p4i[self.user].run('job -o')[0] for k, v in getattr(self, config.dt_name + '_job').items(): job[k] = v jobname = self.create_job(self.p4i[self.user], job) # Replicate it. self.poll() self.expected([892, 894]) self.expected_only_in_range(800, 914, [826, 892, 894, 911, 912]) # Check that the job is replicated. self.check_consistency() job = self.p4.run('job -o %s' % jobname)[0] assert job['P4DTI-rid'] == config.rid # 16. WINDOWS NT SERVICE AND EVENT LOG # # This is a regression test for job000046 and job000149. class nt_service(p4dti_base): # Service name in the Windows registry. _svc_name_ = 'p4dti_service' # If the service can't replicate its initial batch of issues in five # minutes, we'll consider it to have failed (this depends on the # test database not having too many issues). timeout = 300 # Number of polls before we expect consistency to be achieved. polls_for_consistency = 3 def setUp(self): self.setup_everything() self.initialize_replicator() # Handle on Service Manager. import win32service access_level = win32service.SC_MANAGER_ALL_ACCESS self.hscm = win32service.OpenSCManager(None, None, access_level) # Handle on Event Log. import win32evtlog self.hevt = win32evtlog.OpenEventLog(None, 'Application') # Establish configuration environment for service. We want the # current configuration with the following changes: NT Event # logging must be set; logging level is DEBUG; email from the # replicator is supressed. controls = (('P4DTI_CONFIG', os.path.abspath(config_filename)), ('P4DTI_EVTLOG', ''), ('P4DTI_LOGLEVEL', str(message.DEBUG)), ('P4DTI_ADMINADDR', ''), ) for key, value in controls: os.environ[key] = value # If the service is already there, remove it. if self.query_service(): self.remove_service() def tearDown(self): import win32service import win32evtlog win32service.CloseServiceHandle(self.hscm) win32evtlog.CloseEventLog(self.hevt) # Return None if service not currently installed. Otherwise # return currentState field as reported by Service Manager. def query_service(self): import win32service type_filter = win32service.SERVICE_WIN32 state_filter = win32service.SERVICE_STATE_ALL query = win32service.EnumServicesStatus # List all registered services services = query(self.hscm, type_filter, state_filter) # Is ours there? for svc_name, description, status in services: if svc_name == self._svc_name_: # Looks like it was. currentState = status[1] return currentState # Looks like it wasn't. return None def wait(self, function): timeout = self.timeout while timeout > 0: if function(): return 1 time.sleep(1) timeout = timeout - 1 return 0 def wait_for_status(self, status): query = self.query_service return self.wait(lambda query=query, status=status: query() == status) def read_event_log_first_time(self): import win32evtlog hevt = self.hevt oldest_record = win32evtlog.GetOldestEventLogRecord(hevt) record_count = win32evtlog.GetNumberOfEventLogRecords(hevt) newest_record = oldest_record + record_count - 1 # Random access into the Event Log. read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ + win32evtlog.EVENTLOG_SEEK_READ) # Read newest event and ignore it. This has the required # side-effect of setting the handle's reading position in the # log, ready for future reads. win32evtlog.ReadEventLog(hevt, read_flags, newest_record) def wait_for_event_log(self, msg): return self.wait(lambda self=self, msg=msg: self.match_event_log(msg)) # Read all the log entries that have been written since last # time. It's remotely possible that we might flush the message # we're looking for twice in one call to ReadEventLog, but this is # acceptable as it is harmless to pool again. def match_event_log(self, msg): import win32evtlog hevt = self.hevt # Sequential access from where we left off, last time we read # on this handle. read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ + win32evtlog.EVENTLOG_SEQUENTIAL_READ) import catalog # This is the string we're looking for. wanted = str(catalog.msg(msg)) # We have to make an undeterminable number of reads to exhaust # the log. Each read will return some undeterminable number of # records. (I can't tell from documentation whether zero is a # possible number in cases where there are records to be # read. This doesn't matter, as we're prepared to come back # several times.) records = win32evtlog.ReadEventLog(hevt, read_flags, 0) while records: for record in records: message = record.StringInserts[0] if message == wanted: return 1 records = win32evtlog.ReadEventLog(hevt, read_flags, 0) return 0 # Simplify hook into service.main() def main(self, *args): import service service.main([''] + list(args)) # The next four methods have to use service.py, as that's what # we're testing. def install_service(self): self.main() def remove_service(self): self.main('remove') def start_service(self): self.main('start') def halt_service(self): self.main('stop') def runTest(self): "Manage NT service (test_p4dti.nt_service)" import win32service # Install the service and ensure that it's there. self.install_service() assert self.query_service() # Start it running. self.read_event_log_first_time() self.start_service() assert self.wait_for_status(win32service.SERVICE_RUNNING) # Allow the replication to happen. for i in range(self.polls_for_consistency): # (message.DEBUG, "Poll finished.") assert self.wait_for_event_log(912) # Halt the service. self.halt_service() assert self.wait_for_status(win32service.SERVICE_STOPPED) # Remove service and ensure that it's gone away. self.remove_service() assert not self.query_service() # Check that replication succeeded. self.check_consistency() # 17. MIGRATION # # This tests that jobs in Perforce can be migrated from the default # Perforce jobspec to the defect tracker and replicated thereafter. # This is a regression test for job000022, job000249, job000422 and # job000426. class migrate(p4dti_base): n_jobs = 20 # include a non-existent user for migration to create. users = ['rb', 'nb', 'gdr', 'spong'] states = ['open', 'closed', 'suspended'] fixes = {} def setUp(self): self.setup_everything() for param in ["migrate_p", "translate_jobspec", "prepare_issue"]: setattr(config, param, getattr(self, config.dt_name + "_" + param)) self.setup_perforce(self.users) self.initialize_replicator() self.create_jobs() # delete the replicator user from the Bugzilla database, so # we can test that migration creates it. self.dti.system('mysql --user="%s" --password="%s" "%s" -e "delete from profiles where login_name = \'%s\'"' % (config.dbms_user, config.dbms_password, config.dbms_database, config.replicator_address)) def create_jobs(self): self.fixes = {} # Create some jobs in Perforce. for i in range(self.n_jobs): job = self.p4.run('job -o')[0] user = self.users[i % len(self.users)] status = self.states[i % len(self.states)] changes = { 'Description': ("First line of job %d\n" "Remainder of job %d\n" "Blah blah blah...\n" % (i, i)), 'User': user, 'Status': status, } self.r.update_job(job, changes) self.p4i[user].run('fix -s %s -c 1 %s' % (status, job['Job'])) self.fixes[job['Job']] = status def runTest(self): "Migration from Perforce jobs (test_p4dti.migrate)" self.clear_log() self.r.migrate_users() self.r.migrate() self.expected([892, 895]) self.expected_only_in_range(800, 914, [802, 819, 820, 892, 895]) self.r.refresh_perforce_jobs() self.check_consistency() # Check that no fixes have been lost, changed or added. # Regression test for job000271. for f in self.p4.run('fixes'): if not self.fixes.has_key(f['Job']): self.addFailure("Found unexpected fix for job '%s'." % f['Job']) else: if self.fixes[f['Job']] != f['Status']: self.addFailure("Expected fix for job '%s' to have " "status '%s', but found '%s'." % (f['Job'], self.fixes[f['Job']], f['Status'])) del self.fixes[f['Job']] for j in self.fixes.keys(): self.addFailure("Expected a fix for job '%s', but didn't " "find it." % j) def Bugzilla_migrate_p(self, job): return 1 def Bugzilla_prepare_issue(self, issue, job): issue["product"] = "product 1" issue["component"] = "component 1.1" issue["version"] = "unspecified" def Bugzilla_translate_jobspec(self, job): desc = job.get("Description", "") newline = string.find(desc, "\n") job["Summary"] = desc[:newline] job["Description"] = desc[newline+1:] job["User"] = job.get("User", "") if 'reporter' in config.replicated_fields: job["Reporter"] = job.get("User", "") if 'qa_contact' in config.replicated_fields: job["QA_Contact"] = "None" status_map = { "open": ("assigned", ""), "closed": ("closed", "FIXED"), "suspended": ("closed", "LATER"), } (status, resolution) = status_map[job.get("Status", "open")] job["Status"] = status job["Resolution"] = resolution job["Severity"] = "blocker" job["Priority"] = "P1" return job # 18. BUGZILLA PARAMETERS # # Check that editing Bugzilla parameters causes the # p4dti_bugzilla_parameters table to be created in the Bugzilla # database, and that the parameters we expect to be there really are. class bugzilla_params(p4dti_base): expected_parameters = [ 'emailregexp', 'emailregexpdesc', 'emailsuffix', 'p4dti', ] def runTest(self): "Bugzilla parameters (test_p4dti.bugzilla_params)" p4dti_param = random.randint(1,1000000) # Confirm that the replicator spots the absense of the # p4dti_bugzilla_parameters table; this doesn't work in # Bugzilla 2.22, when that table is created during checksetup. self.setup_everything({ 'bugzilla_mysqldump': None }) self.clear_log() self.initialize_replicator() if config.bugzilla_version < '2.22': self.expected([129]) # Follow the instructions in [RB 2000-08-10, 5.4.3]: create the # p4dti_bugzilla_parameters table by editing the parameters. We # take the opportunity to set the 'p4dti' parameter to a random # number which we will read back later to check that the table # is being updated correctly. self.dti.edit_parameters({ 'p4dti': str(p4dti_param), }) # Confirm that replicator finds the p4dti_bugzilla_parameters # table and the required parameters are present. self.clear_log() reset_configuration() self.initialize_replicator() self.expected_not([129, 130]) for p in self.expected_parameters: assert config.bugzilla.params.has_key(p) # Confirm that the p4dti_parameter matches. assert int(config.bugzilla.params['p4dti']) == p4dti_param # 19. ENUM KEYWORDS WITH SPACES # # This is a regression test for job000445. Bugzilla only. class enum_spaces(p4dti_base): def do_sql(self, command): self.dti.system('mysql --user="%s" --password="%s" -e "%s" "%s"' % (config.dbms_user, config.dbms_password, command, config.dbms_database)) severity_values = [('blocker', '1 blocker'), ('critical', '2 critical'), ('major', '3 major'), ('normal', '4 normal'), ('minor', '5 minor'), ('trivial', '6 trivial'), ('enhancement', '7 enhancement'), ] def change_schema(self): if config.bugzilla_version < '2.20': temp_enum = string.join(map ((lambda x: "'%s', '%s'" % (x[0],x[1])), self.severity_values), ', ') new_enum = string.join(map ((lambda x: "'%s'" % x[1]), self.severity_values), ', ') new_default = self.severity_values[0][1] self.do_sql("alter table bugs change bug_severity bug_severity enum(%s) not null default '%s'" % (temp_enum, new_default)) for (old, new) in self.severity_values: self.do_sql("update bugs set bug_severity = '%s', delta_ts = delta_ts where bug_severity = '%s'" % (new, old)) self.do_sql("alter table bugs change bug_severity bug_severity enum(%s) not null default '%s'" % (new_enum, new_default)) else: for (old_val, new_val) in self.severity_values: self.do_sql("update bug_severity set value='%s' where value='%s'" % (new_val, old_val)) self.do_sql("update bugs set bug_severity='%s' where bug_severity='%s'" %(new_val, old_val)) def runTest(self): "Bugzilla enums containing spaces (test_p4dti.enum_spaces)" self.setup_everything() self.change_schema() self.check_startup() self.check_consistency() # 20. BUGZILLA EMAILSUFFIX # # This checks that when the Bugzilla "emailsuffix" parameter is set, # that the P4DTI correctly translates users between Bugzilla and # Perforce. Bugzilla only. Regression test for job000352. class emailsuffix(p4dti_base): users = ['gdr', 'nb', 'ndl', 'rb'] def runTest(self): "Bugzilla 'emailsuffix' parameter (test_p4dti.emailsuffix)" self.setup_everything({ 'bugzilla_mysqldump': 'job000352-mysqldump', }) self.setup_perforce(self.users) self.dti.edit_parameters({ 'p4dti': 1, 'emailregexp': '^.*$', 'emailsuffix': email_suffix, }) self.check_startup() self.expected_not([129, 130, 516, 536, 867]) # 21. PERFORCE JOB/FIX CONSISTENCY # # Check that Perforce updates the 'P4DTI-user' field whenever someone # changes a job by fixing it. Regression test for job000086 and # job000276. class fix_update(p4dti_base): def check_modifier(self, jobname, expected, case): job = self.p4i['a'].run('job -o %s' % jobname)[0] for found, desc in ((job['P4DTI-user'], "job['P4DTI-user']"), (self.r.job_modifier(job), "job modifier")): if found != expected: self.addFailure("After %s, expected %s to be " "'%s', but found '%s'." % (case, desc, expected, found)) return job def runTest(self): "Perforce updates jobs when fix (test_p4dti.fix_update)" self.setup_everything() self.setup_perforce(['a', 'b', 'c', 'd', 'e', 'f']) self.initialize_replicator() self.r.prepare_to_run() # Check that the "P4DTI-user" field has the correct value after # a series of actions: # 21.1. Job creation job = self.p4i['a'].run('job -o')[0] jobname = self.create_job(self.p4i['a'], job) job = self.check_modifier(jobname, 'a', "job creation") status = job[config.job_status_field] # 21.2. Job editing job['Description'] = 'test job 2' self.p4i['b'].run('job -i', job) job = self.check_modifier(jobname, 'b', "job editing") # 21.3. Fixing (with change in status) self.p4i['c'].run('fix -c 1 %s' % jobname) job = self.check_modifier(jobname, 'c', "fixing (with change " "in status)") # 21.4. Fixing a submitted changelist (no change in status) change = self.edit_file('d') self.p4i['d'].run('submit -c %s' % change['Change']) self.p4i['d'].run('fix -c %s %s' % (change['Change'], jobname)) job = self.check_modifier(jobname, 'd', "fixing (without " "change in status)") # 21.5. Changing the status of an existing fix self.p4i['e'].run('fix -s %s -c 1 %s' % (status, jobname)) job = self.check_modifier(jobname, 'e', "changing fix status ") # 21.6. Submitting pending changelist change = self.edit_file('f') self.p4i['f'].run('fix -c %s %s' % (change['Change'], jobname)) self.p4i['f'].run('submit -c %s' % change['Change']) job = self.check_modifier(jobname, 'f', "submitting a pending " "changelist") # 22. FREQUENT EDITS # # Check that frequent edits (at least one per poll) in the defect # tracker don't cause conflicts. Regression test for job000016, # job000042. class frequent_edits(lifecycle): def runTest(self): "Frequent edits cause no conflicts (test_p4dti.frequent_edits)" # Submit an issue to the defect tracker and replicate it. id = getattr(self, config.dt_name + '_submit')(self.user) self.poll() self.expectation([803, 804, 812, 911, 912]) # Check that it was replicated correctly. issue = self.r.dt.issue(id) jobname = issue.corresponding_id() job = self.p4.run('job -o %s' % jobname)[0] getattr(self, config.dt_name + '_submitted')(issue, job) # Assign it in the defect tracker and replicate the assignment. getattr(self, config.dt_name + '_assign')(issue, job, self.user) self.poll() self.expectation([804, 812, 911, 912]) # 23. BUGZILLA PERFORCE SECTION # # Check that the Bugzilla patch is correctly producing a Perforce # section in the bug form. class perforce_section(lifecycle): def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.dti.edit_parameters({ 'p4dti': 1 }) self.check_startup() def runTest(self): "Perforce section in Bugzilla (test_p4dti.perforce_section)" # Submit a new job to Bugzilla and fix it in Perforce. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) self.p4.run('fix -c 1 %s' % job['Job']) self.close(issue, job, [802, 805, 819, 820, 824, 826]) # Get the bug form. bug_form = self.dti.run_script("show_bug.cgi", { 'bugzilla_login': config.bugzilla_admin_user, 'bugzilla_password': config.bugzilla_admin_password, 'id': issue.id(), }) parser = self.bug_form_parser() parser.feed(bug_form) parser.close() parser.check() # 23.1. Parse Bugzilla bug form class bug_form_parser(p4dti_html_parser): def __init__(self): self.seen_p4dti = 0 self.in_p4dti = 0 sgmllib.SGMLParser.__init__(self) def check(self): assert self.seen_p4dti == 1 def start_div(self, attrs_list): attrs = self.attrs(attrs_list) if attrs.get('class') == 'p4dti': self.seen_p4dti = self.seen_p4dti + 1 self.in_p4dti = 1 def end_div(self): self.in_p4dti = 0 # 24. TEST CASES: NORMAL OPERATION WITH UNICODE PERFORCE SERVER class unicode(normal): unicode = True def runTest(self): "Replication with Unicode Perforce server (test_p4dti.unicode)" normal.runTest(self) # RUNNING THE TESTS def tests(): suite = unittest.TestSuite() tests = [start_1, bogus, configdb, existing, fix_update, frequent_edits, inconsistencies, lifecycle, migrate, new_p4_job, normal, project, race_385, start_2, unicode ] if os.name == 'nt': tests.extend([nt_service]) if config.dt_name == 'Bugzilla': tests.extend([bugzilla_params, emailsuffix, enum_spaces, perforce_section, unconfirmed]) for t in tests: suite.addTest(t()) return suite if __name__ == "__main__": unittest.main(defaultTest="tests") # A. REFERENCES # # [Barnson 2001-08-29] "Bugzilla Guide (revision 2.14.0)"; Matthew # Barnson; 2001-08-29; # . # # [GDR 2000-12-31] "Automated testing plan" (e-mail message); Gareth # Rees; Ravenbrook Limited; 2000-12-31; # . # # [PyUnit] "PyUnit - a unit testing framework for Python"; Steve # Purcell; . # # [RB 2000-08-10] "Perforce Defect Tracking Integration Administrator's # Guide"; Richard Brooksby; Ravenbrook Limited; 2000-08-10; # . # # [RB 2000-12-08] "init.py -- Initialize replicator and defect tracker"; # Richard Brooksby; Ravenbrook Limited; 2000-12-08; # . # # # B. DOCUMENT HISTORY # # 2001-03-14 GDR Created. # # 2001-03-15 GDR Added list of regression tests, discussion of # limitations, snoping on e-mail, p4dti_variant class, test cases for # existing jobs, recent start date and replicating by project. # # 2001-03-17 GDR Added test_dt_errors test case for errors caught by the # defect tracker or its database. # # 2001-05-01 GDR Use p4dti_unittest module so that we can report many # errors in the course of a single test case. Added regression test for # job000311. # # 2001-05-17 GDR Don't look for messages 844-847 (replicator doesn't # output them any more). Fixed bug in issue lifecycle test. # # 2001-05-18 GDR Extended the lifecycle test so that it tests # replication of fixes. # # 2001-06-07 GDR Added references to design. # # 2001-07-02 GDR Made bogus_config and lifecycle test cases portable # between tTrack 4.5 and tTrack 5.0. # # 2001-07-09 NB Added changelist_url test. # # 2001-07-17 GDR Added start_2 (regression test for job000340) and # configdb (regression test for job000169, job000351). # # 2001-07-25 GDR Added consistency checker tests (regression test for # job000372). # # 2001-09-24 GDR Use environment variable P4DTI_PATH (if set) to find # the P4DTI sources, so that testers can test installed copies of the # P4DTI as well as copies synced from the repository. Use environment # variable P4DTI_CONFIG (if set) to find the configuration. # # 2001-09-25 GDR Use Bugzilla database and user from the configuration; # see job000394. Restore the mysqldump for the Bugzilla version given # by the bugzilla_version configuration parameter (if specified). The # tests start_1 and start_2 were incorrectly using gmtime when making # timestamps for the Bugzilla database -- this could lead to problems # when testing in timezones not equal to UTC with recently-created # defects. Use localtime instead. # # 2001-10-03 GDR Added test case for job000385. # # 2001-10-23 GDR Use entry point for polling, so no need to call # start_logger. # # 2001-10-26 NB Added test case for job000410. # # 2001-11-01 NB Always write e-mail messages to the test log. Make # Perforce user records so that email address translation works. # # 2001-11-06 GDR Added test case for replicating new job from Perforce. # Log each run to a separate file. # # 2001-11-09 NDL Added test case for NT services and event log. # # 2001-11-13 GDR Method and variable names use underscore to separate # words where possible. Lifecycle test works with NT service changes. # # 2001-11-15 GDR Do not destructively modify config.replicated_fields # in new_p4_job.setUp. # # 2001-11-21 GDR Added test case for migration, and test cases for # incorrect values for the new configuration parameters. # # 2001-11-22 GDR Refresh Perforce jobs after migrating. # # 2001-11-26 GDR Test cases migrate and new_p4_job take account of new # migration messages. # # 2001-11-28 GDR Bugzilla migration test fills in Severity and Priority # fields when translating the jobspec. Adapted the lifecycle test so # that it runs in the Bugzilla integration. # # 2001-11-28 NDL Added test case for bugzilla parameters. # # 2001-12-03 GDR Added regression tests for job000007, job000013, # job000222, job000271. # # 2001-12-05 GDR Test Perforce passwords, all kinds of inconsistencies, # startup e-mail, success of polling, and Perforce servers that don't # report a recognizable changelevel. # # 2001-12-07 GDR Extended lifecycle test case to cover conflict, # overwriting, mail report, and illegal changes to jobs. # # 2001-12-08 GDR Don't make local variables with tracebacks in them # (creates circular references). # # 2001-12-08 GDR Be thorough about checking expected messages. # # 2001-12-11 GDR Test replication of filespecs and fixes back from # defect tracker. Check that conflict e-mail gets sent to job changer # if different from job owner. # # 2002-01-07 NB Test replication of bugzilla enum fields contanining # spaces (job000445). Also fix a number of config attribute accesses # to use getattr and setattr. # # 2002-01-23 GDR Each test case now installs a fresh instance of # Bugzilla. The lifecycle test case runs the post_bug CGI script # directly (not via the HTTP server). The bugzilla_params test case # gets the default Bugzilla parameters by running the editparams CGI # script. New test case emailsuffix (job000352). # # 2002-01-28 GDR Added regression tests for double-replication causing # conflicts (job000016, job000042), and permissions not being applied # correctly (job000086, job000276). # # 2002-01-30 GDR Allow error messages from MySQLdb 0.9.1 and MySQL # 3.23.47. # # 2002-02-01 NB Added test for Perforce section in Bugzilla bug form. # # 2002-03-06 NB Add test for forbidden transitions in Bugzilla. # # 2002-03-28 NB Don't expect message 896 when running p4_new_job test. # # 2002-04-02 NB Migration test needs to add 'Reporter' to a job in the # translate_jobspec function if 'reporter' is in replicated_fields. # # 2002-04-03 NB Added test for qa_contact field in migration test. # job000495. # # 2002-10-25 RB Modified Bugzilla_nt_interface to try to make it work # in Ravenbrook's test environment. # # 2002-10-28 RB Made Bugzilla_posix_interface and # Bugzilla_nt_interface as similar as possible in preparation for # refactoring into a single class. Merged them into a single # class. Worked around some bugs in Windows file redirection. # # 2002-10-29 NB Fixed some problems with the type of the result of # run_script. # # 2002-10-29 RB Lifted common Perforce functions into a Perforce mixin # class shared between defect tracker interfaces. # # 2002-10-30 NB Make this work under Python 2.0. # # 2003-05-23 NB Remove tTrack references. # # 2003-12-05 NB Modify tests for handling jobspec configurability. # # 2004-05-28 NB Bugzilla script invocation upgraded for 2.17.7 # (HTTP_USER_AGENT, Perl taint mode, single-selects in editparams, # changed output of post_bug.cgi). Also handle dumped databases in # old schemas (by using checksetup), and change enum_spaces test to # modify the usual test database rather than using a distinct one. # Lastly, changed lifecycle test so that it inserts correct rows in # bugs_activity even when a user value has changed (otherwise we get # complaints from the bugmail script). # # # C. COPYRIGHT AND LICENSE # # This file is copyright (c) 2001 Perforce Software, Inc. All rights # reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS # OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR # TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # DAMAGE. # # # $Id: //info.ravenbrook.com/project/p4dti/version/2.4/test/test_p4dti.py#2 $