# Perforce Defect Tracking Integration Project
#
#
# TEST_P4DTI.PY -- TEST THE P4DTI
#
# Gareth Rees, Ravenbrook Limited, 2001-03-14
#
#
# 1. INTRODUCTION
#
# This test script tests the P4DTI. The initial plan is described in
# [GDR 2000-12-31] and some of the design is given in [GDR 2001-03-14].
#
# This script contains a lot of tricky code (for example, using the hostname or
# operating system name to pick a module to import or a class to use; or
# overriding a method in another module in order to snoop on its behaviour).
# Please take care when editing it.
#
# It uses the PyUnit unit test framework [PyUnit].
#
# The intended readership is project developers.
#
# This document is not confidential.
#
# The test cases depend on features of the test databases. It is
# important that the cases be kept in sync with the test database and
# with the design of those databases [GDR 2001-03-14]. When you add a
# test, be sure to change the design and refer back to the test.
#
#
# 1.1. Useful info
#
# The script creates a series of test Perforce servers (one for each test
# case). When a test case starts, it kills all the running Perforce servers on
# the required port. Each of these has its own directory, in the appropriate
# place for temporary files for your system (typically /tmp on Unix, C:\Temp on
# Windows). The P4DTI log is diverted from its real place and sent to the file
# p4dti.log.
#
#
# 1.2. Regression tests in this script
#
# The section * means that the defect is tested throughout as a simple
# consequence of running the script; there is no particular test for it.
#
# Job Section Title
# ----------------------------------------------------------------------
# job000003 6.3 It is not possible from the result of "p4 -G job -o
# jobname" to tell whether a job has been returned, or
# an error message
# job000004 * "p4 -G jobspec -o" doesn't work
# job000005 * TeamTrack integration may fail mysteriously on startup
# in future releases of TeamTrack
# job000007 10.4 The fixes keyword can't be set on submit (except to
# "closed")
# job000013 10.3 Deleting fixes not replicated
# job000022 17 No migration from Perforce jobs to defect tracker
# job000036 15 New jobs in Perforce don't get replicated to the
# defect tracker
# job000037 6 Consistency checker script is inadequate
# job000046 16 The replicator process is hard to manage
# job000047 8.1 Historical bugs are replicated but uninteresting
# job000048 6 Consistency checking of the configuration is
# inadequate
# job000050 8.1 There's no way to re-start replication
# job000051 * There's no Bugzilla integration
# job000053 10 Implicit changes made by the DT don't get replicated
# back
# job000057 * Special characters in single-select keywords make the
# replicator barf
# job000075 6 No automatic check of configuration
# job000092 * Long descriptions aren't replicated by Bugzilla
# integration
# job000107 9 You can't replicate an issue's project
# job000111 7 Replicator destroys the jobspec that exists before
# it's installed
# job000112 9 Can't easily replicate by project
# job000116 6 Bugzilla integration doesn't do enough checking
# job000118 10.1 Resolving a Bugzilla bug without setting the
# resolution always causes a conflict
# job000133 10.2 You can't close a job by fixing it
# job000140 6.3 Logical field name "code" not allowed in TeamTrack
# job000149 16 We don't use system logging facilities on Windows
# job000158 6.2 Obscure error if Perforce can't be reached or
# p4_client_executable is wrong
# job000168 6.2 Too easy to set dbms_port to a string
# job000169 11 Change numbers are links in TeamTrack even when no
# changelist URL has been specified
# job000170 6.2 Replicator may be unable to send e-mail if the default
# replicator_address is unchanged
# job000173 6.3 Wrong Perforce server version causes installation to
# fail mysteriously
# job000181 * Assertion failure in translate_1_to_0
# job000182 10 Elapsed time fields aren't replicated properly
# job000184 * Bugzilla integration doesn't work if your database is
# not called 'bugs'
# job000190 3.1 Connection to TeamTrack hangs for several minutes
# job000199 4.5 Auxiliary scripts send e-mail
# job000202 6.2 Errors from Perforce not reported well
# job000219 7 Existing jobs in Perforce may become unusable when the
# jobspec is changed
# job000221 8.1 Refreshing Perforce jobs fails in Bugzilla integration
# job000222 10.3 Deleting fix causes Bugzilla integration to crash
# job000225 10.3 If you "p4 fix" when there's no closed state, the
# replicator can't replicate
# job000233 10 When you submit a new issue to TeamTrack it overwrites
# the issue
# job000240 7 Perforce server can crash if P4DTI is started with
# existing jobs
# job000249 17 Refresh script fails if you change the closed_state
# job000262 14 Bugzilla: Can't use Perforce to confirm an unconfirmed
# bug
# job000271 17 The migrate script loses fixes
# job000311 9 If you have two TeamTrack projects with the same name,
# the replicator stops working
# job000325 * Can't read issues from TeamTrack 5.0
# job000338 * TeamTrack 5.0 doesn't update P4DTI_ fields
# job000340 8.2 Consistency checker may miss some issues if you change
# the start date
# job000351 11 Bugzilla integration doesn't remove old configuration
# items
# job000354 * MySQL bug stops replicator from seeing some Bugzilla
# changes
# job000355 8 Bugzilla integration ignores start_date parameter
# job000358 * P4DTI doesn't work with TeamTrack 5.0
# job000362 10.3 Can't update case from Perforce when using TeamTrack
# 4.5 database upgraded to TeamTrack 5.0
# job000370 * Journal fields in TeamTrack 5.0 can't be replicated
# job000372 12.3 Check consistency stops with error if there are
# unreplicated jobs
# job000379 10 P4DTI corrupts date/time fields in TeamTrack if server
# time zone is not UTC
# job000381 10 P4DTI corrupts date/time fields in TeamTrack if
# daylight savings time is in effect
# job000385 13 Renumbered changelist causes P4DTI error and deletes
# fix
# job000390 * P4DTI doesn't support Bugzilla 2.14
# job000410 14 Can't confirm an unconfirmed Bugzilla bug from
# Perforce
# job000416 10 Fixed-point numbers treated as floating-point in
# TeamTrack integration
# job000422 17 Refreshing fails after migration if new workflow
# doesn't match old workflow
import os
import sys
if os.environ.has_key('P4DTI_PATH'):
p4dti_path = os.environ['P4DTI_PATH']
else:
p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code',
'replicator')
if p4dti_path not in sys.path:
sys.path.append(p4dti_path)
import cgi
import copy
import httplib
import imp
import logger
import message
import p4
import p4dti_unittest
import re
import socket
import string
import tempfile
import time
import types
import unittest
import urllib
# 2. CONFIGURATION
#
#
# 2.1. Limitations
#
# This script can only support one basic configuration on each host (see
# section 2.2). Tests of other configurations have to be made by changing
# configuration parameters in the script.
#
# This script can only test one defect tracker on each host (you have to change
# dt_name in config_HOSTNAME.py to test another defect tracker).
#
# Could it use an environment variable to work out which configuration to use?
#
#
# 2.2. Using a test configuration
#
# I want to be able to test different configurations, changes to
# configurations, and to check whether erroneous configurations are
# spotted correctly.
#
# I expect to find a working configuration either (1) in the file
# specified by the environment variable P4DTI_CONFIG, if set, or (2) in
# config_HOSTNAME.py, where HOSTNAME is the first component of the FQDN,
# converted to lower case (e.g. 'swan' for 'swan.ravenbrook.com').
#
# This configuration file is an ordinary P4DTI configuration file,
# except that it must include two additional configuration parameters:
# p4_license_file specifies the location of a Perforce license file
# suitable for use on the machine running the tests;
# p4_server_executable specifies the location of a suitable Perforce
# server executable.
#
# The loaded configuration module is copied when a test is started and
# the copy is installed in sys.modules['config']. This has two effects:
#
# 1. The P4DTI itself won't load the config module; see the
# initialization code [RB 2000-12-08].
#
# 2. The unit tests can make changes to the copy of the configuration
# (e.g., to specify incorrect values, as in section 6) without
# affecting the original, which can be restored for the next test.
if os.environ.has_key('P4DTI_CONFIG'):
config_file = os.environ['P4DTI_CONFIG']
else:
hostname = string.lower(string.split(socket.gethostname(), '.')[0])
config_file = 'config_' + hostname + '.py'
file = open(config_file)
try:
imp.load_source('config', config_file, file)
finally:
file.close()
original_configuration = copy.copy(sys.modules['config'].__dict__)
import config
log_filename = (config.log_file
or time.strftime('p4dti.log.%Y%m%dT%H%M%S',
time.gmtime(time.time())))
sys.stdout.write("P4DTI test suite, logging to %s.\n" % log_filename)
sys.stdout.flush()
def reset_configuration():
# Delete current config, just in case we added something.
for k in sys.modules['config'].__dict__.keys():
if not re.match('^__.*__$', k):
del sys.modules['config'].__dict__[k]
# Restore old config.
for (k,v) in original_configuration.items():
sys.modules['config'].__dict__[k] = v
# 3. DEFECT TRACKER AND OPERATING INTERFACES
#
# Many of the test cases are generic: they don't depend on a particular defect
# tracker or operating system. But they need an interface to the defect
# tracker in order to restart it, and to the operating system in order to start
# a new Perforce server.
#
# Each interface class should be called DT_OS_interface (where DT is
# config.dt_name and OS is os.name) is and must define these methods:
#
# restart_defect_tracker()
# restart_perforce()
#
# Note that there are no corresponding "stop" methods. The tests leave
# Perforce and the defect tracker running so that failures can be investigated
# without all the evidence having disappeared.
# 3.1. Interface to TeamTrack on Windows NT
class TeamTrack_nt_interface:
# The TeamTrack server.
server = None
# Temporary directory for files created by this test.
tempdir = None
# 3.1.1. Clean up TeamTrack
#
# I would prefer to create a new TeamTrack database each time this test is
# run (so that I can check that extra fields are added correctly to the
# CASES table, for example). However, I don't know of a way to do that;
# TeamTrack is controlled from the TeamTrack Administrator and I have no
# idea how to automate it.
#
# So the next best thing to do is to clean out all P4DTI data from a
# running TeamTrack server. This means setting all P4DTI_ fields in the
# CASES table to the empty string, and deleting all records from the
# VCACTIONS table.
#
# This isn't ideal: each new test ends up working with the set of cases
# that resulted from the previous test.
def restart_defect_tracker(self):
# Connect to the TeamTrack server. This should take no more than 5
# seconds (regression test for job000190).
import teamtrack
self.server = teamtrack.connect(config.teamtrack_user,
config.teamtrack_password,
config.teamtrack_server)
# Empty the P4DTI_* fields in the CASES table, if there are any.
cases = self.server.query(self.server.case_table_id(), '')
fields = ['P4DTI_JOBNAME', 'P4DTI_RID', 'P4DTI_SID']
for c in cases:
changed = 0
for f in fields:
if c.has_key(f) and c[f] != '':
c[f] = ''
changed = 1
if changed:
c.update()
# Delete all records in the VCACTIONS table.
vcactions = self.server.query(teamtrack.table['VCACTIONS'],'')
for v in vcactions:
self.server.delete_record(teamtrack.table['VCACTIONS'], v['ID'])
# 3.1.2. Kill old Perforce servers
#
# Find and terminate all running Perforce servers, if any. This code is
# based on the script "killProcName.py" that comes with the Python Win32
# extensions.
def kill_perforce(self):
import win32pdhutil
import win32api
import win32con
try:
win32pdhutil.GetPerformanceAttributes('Process','ID Process',"p4d")
except:
pass
pids = win32pdhutil.FindPerformanceAttributesByName("p4d")
for pid in pids:
h = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0, pid)
win32api.TerminateProcess(h,0)
win32api.CloseHandle(h)
# 3.1.3. Start a new Perforce server
#
# Make a completely fresh Perforce server, with a new repository.
def start_perforce(self):
# Make a new repository directory.
self.tempdir = tempfile.mktemp()
os.mkdir(self.tempdir)
# Copy the license.
src = open(config.p4_license_file, 'r')
dest = open(self.tempdir + '\\license', 'w')
dest.writelines(src.readlines())
src.close()
dest.close()
# Work out Perforce's port number and start a Perforce server.
match = re.match(".*:([0-9]+)$", config.p4_port)
if match:
port = int(match.group(1))
else:
port = 1666
import win32api
win32api.WinExec("%s -p 0.0.0.0:%d -r %s"
% (config.p4_server_executable, port, self.tempdir))
# 3.1.4. Restart Perforce
#
# By killing the old server and starting a new one.
def restart_perforce(self):
self.kill_perforce()
self.start_perforce()
# 3.2. Interface to Bugzilla on Posix
class Bugzilla_posix_interface:
# The Bugzilla server.
server = None
# Temporary directory for files created by this test.
tempdir = None
def __init__(self):
# The default temporary file prefix starts with an '@'. But
# this means that temporary files will look like revision
# specifications to Perforce.
tempfile.gettempprefix = lambda: '%d.' % os.getpid()
# 3.2.1. Restore Bugzilla
#
# Wipe out the existing Bugzilla database, if any, and restore a
# known working one from a MySQL dump file.
def restart_defect_tracker(self):
db = config.dbms_database
user = config.dbms_user
version = config.__dict__.get('bugzilla_version', '2.10')
os.system("mysqladmin -u %s --force drop %s > /dev/null"
% (user, db))
os.system("mysqladmin -u %s create %s > /dev/null"
% (user, db))
os.system("mysql -u %s %s < bugzilla-%s-mysqldump > /dev/null"
% (user, db, version))
# 3.2.2. Kill running Perforce servers
#
# If there are any Perforce servers running on the magic port,
# use p4 admin to kill them.
def kill_running_perforce_servers(self):
os.system("p4 -p %s -u %s admin stop > /dev/null"
% (config.p4_port, config.p4_user))
# 3.2.3. Start a new Perforce server
#
# Make a completely fresh Perforce server, with a new repository.
def start_perforce(self):
# Make a new repository directory.
self.tempdir = tempfile.mktemp()
os.mkdir(self.tempdir)
# Copy the license.
src = open(config.p4_license_file, 'r')
dest = open(os.path.join(self.tempdir, 'license'), 'w')
dest.writelines(src.readlines())
src.close()
dest.close()
# Work out Perforce's port number and start a Perforce server.
match = re.match(".*:([0-9]+)$", config.p4_port)
if match:
port = int(match.group(1))
else:
port = 1666
os.system("%s -d -p 127.0.0.1:%d -r %s > /dev/null"
% (config.p4_server_executable, port, self.tempdir))
# 3.2.4. Restart Perforce
#
# By killing the old server and starting a new one.
def restart_perforce(self):
self.kill_running_perforce_servers()
self.start_perforce()
# 4. P4DTI TEST CASE BASE CLASS
#
# The p4dti_base class is a generic P4DTI test case. It defines methods for
# setting up the integration and some utilities for recording and interrogating
# the output of the replicator.
#
# Other P4DTI test cases will inherit from p4dti_base.
#
# When a class implements several test cases, the methods that implement test
# cases (in the PyUnit sense) should have names starting "test_". When a class
# implements a single test case, the method should be called "runTest".
class p4dti_base(p4dti_unittest.TestCase):
# Defect tracker interface (an instance of one of the classes in section 3
# above).
dti = eval(config.dt_name + '_' + os.name + '_interface')()
# Messages written to the replicator's log.
log_messages = []
# Mail messages sent by the replicator. Each message is a triple
# (recipients, subject, body); see the definition of mail() method in
# the replicator class.
mail_messages = []
# The replicator.
r = None
# 4.1. Snoop on the logger
#
# I want to be able to test that the correct messages are appearing
# in the log, so I override the log method in the
# logger.multi_logger class so that it records the messages as well
# as printing them to the log file. Printing to a log file is
# necessary so that the test engineer can analyze what happened when
# a test case fails.
#
# A disadvantage of this implementation is that the logging code is
# not tested. Therefore there needs to be a separate set of logging
# tests.
def snoop_logger(self):
self.log_file = open(log_filename, 'a')
logger.multi_logger.log = self.log
logger.file_logger.log = self.log
logger.sys_logger.log = self.log
def log(self, msg):
date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time()))
self.log_file.write("%s %s\n" % (date, msg))
self.log_file.flush()
self.log_messages.append(msg)
# The clear_log method can be used to clear this record of the log
# before carrying out a test.
def clear_log(self):
self.log_messages = []
# 4.2. Snoop on e-mail
def snoop_mail(self):
import sys
sys.modules['smtplib'] = self
def log_separator(self):
self.log_file.write("-" * 72)
self.log_file.write("\n")
def SMTP(self, server):
self.log_separator()
return self
def quit(self):
self.log_separator()
def sendmail(self, from_address, to, text):
self.mail_messages.append((to, text))
self.log_file.write(text)
# 4.3. Check that the log is as expected
#
# These methods make various checks on the messages recorded in the log.
def log_message_ids(self):
return map(lambda m: m.id, self.log_messages)
def expected_only(self, expected):
for m in self.log_messages:
assert m.id in expected, \
("Unexpected message %d (%s) found in log %s"
% (m.id, str(m), self.log_message_ids()))
def expected_only_in_range(self, min, max, expected):
for m in self.log_messages:
if m.id >= min and m.id <= max:
assert m.id in expected, \
("Unexpected message %d (%s) found in log %s"
% (m.id, str(m), self.log_message_ids()))
def expected_not(self, unexpected):
for m in self.log_messages:
assert m.id not in unexpected, \
("Unexpected message %d (%s) found in log %s"
% (m.id, str(m), self.log_message_ids()))
def expected(self, expected):
found = {}
for m in self.log_messages:
if found.has_key(m.id):
found[m.id] = found[m.id] + 1
else:
found[m.id] = 1
for id in expected:
assert found.has_key(id) and found[id] > 0, \
("Expected message %d not found in log %s"
% (id, self.log_message_ids()))
found[id] = found[id] - 1
# 4.4. Set up everything so that test cases can run
#
# Get a fresh configuration, a new defect tracker and a new Perforce
# server.
def setup_everything(self):
reset_configuration()
self.dti.restart_defect_tracker()
self.dti.restart_perforce()
self.snoop_logger()
self.snoop_mail()
# Get a Perforce interface.
self.p4 = p4.p4( port = config.p4_port,
user = config.p4_user,
password = config.p4_password,
client_executable = config.p4_client_executable,
logger = self )
# 4.5. Initialize the replicator
#
# Load the init module. If it's already loaded, reload it: we must reload
# it because we want the init module to run again and to pick up on the new
# configuration.
def initialize_replicator(self):
self.mail_messages = []
if sys.modules.has_key('init'):
reload(sys.modules['init'])
import init
self.r = init.r
# Regression test for job000199.
assert self.mail_messages == []
# 4.6. Normal replicator startup
#
# This is a pseudo test case. It checks that the replicator can start up
# normally. Other tests may depend on this having been run; for example,
# the check_nothing test (5.1). This test must be run before any other
# test cases, and therefore can't be part of a test suite (the tests of
# which may be run singly or in any order). It should therefore be run
# from the setUp() method of a subclass of this class (unless of course
# that subclass has a different idea of what should happen at startup).
def check_startup(self):
self.initialize_replicator()
# Expect to set up issues and replicate them, but no jobs or conflicts.
self.clear_log()
self.r.poll()
self.expected([803, 804, 812])
self.expected_only_in_range(800, 1000, [803, 804, 812, 911, 912])
self.expected_not([806, 811])
# Expect to see no issues replicated, and no issues actually changed.
self.clear_log()
self.r.poll()
self.expected_not([803, 804, 812, 824])
# 4.7. Exceptional replicator behaviour
#
# This method calls the given function but expects to get the
# exception given by 'error', with message id 'msgid'.
def check_exception(self, function, error, msgid):
try:
function()
except:
(err, msg, _) = sys.exc_info()
assert err == error, \
("Expected error %s but got %s." % (error, err))
assert msg.id == msgid, \
("Expected message %d but got %d (%s)"
% (msgid, msg.id, str(msg)))
else:
self.fail("Expected error %s but didn't get it." % error)
# check_startup_error() is a special case of check_exception for the
# initialize_replicator method.
def check_startup_error(self, error, msgid):
self.check_exception(self.initialize_replicator, error, msgid)
# 4.8. Check consistency
#
# This method checks that the databases are consistent.
def check_consistency(self):
self.clear_log()
self.r.check_consistency()
self.expected([871, 885])
self.expected_only_in_range(800, 1000, [871, 883, 884, 885, 890])
# 4.9. Check replication of a single issue
def check_replication_dt_to_p4(self, first_time = 0):
self.clear_log()
self.r.poll()
self.expected([804, 812])
if first_time:
self.expected([803]) # set up for replication
self.expected_only_in_range(800, 1000, [803, 804, 812, 911, 912])
# Nothing should come back from Perforce.
self.clear_log()
self.r.poll()
self.expected_not([803, 804, 812, 824])
self.expected_only_in_range(800, 1000, [805, 825, 911, 912])
# 4.10. Initialize Perforce repository
#
# This method sets up Perforce clients and workspaces for a set of
# users, and adds a file to the repository. It sets up the
# following members:
#
# p4i: Map from user to a Perforce interface for that user.
# workspace: Map from user to the client workspace for that user.
#
# This is only needed for tests involving Perforce fixes: that's why
# it's not called in setup_everything().
def setup_perforce(self, users):
# Create Perforce interfaces and workspaces for the dummy users.
self.p4i = {}
self.workspace = {}
for user in users:
self.workspace[user] = tempfile.mktemp()
os.mkdir(self.workspace[user])
self.p4i[user] = p4.p4(
port = config.p4_port,
user = user,
client = user + '_' + socket.gethostname(),
client_executable = config.p4_client_executable,
logger = self, )
# Make the Perforce user record.
p4_user = self.p4i[user].run('user -o')[0]
p4_user['Email'] = ('%s@ravenbrook.com' % user)
self.p4i[user].run('user -i', p4_user)
# Make a Perforce client.
client = self.p4i[user].run('client -o')[0]
client['Root'] = self.workspace[user]
self.p4i[user].run('client -i', client)
# Add a file to the repository so that we have something with
# which to make changes and fixes.
user = users[0]
self.filename = os.path.join(self.workspace[user], 'test')
open(self.filename, 'w')
self.p4i[user].run('add -t ktext %s' % self.filename)
change = self.p4i[user].run('change -o')[0]
change['Description'] = 'Added test file'
self.p4i[user].run('submit -i', change)
# 4.11. Make changelist in Perforce
#
# This method makes a changelist in Perforce and edits a file in
# that changelist. The changelist is returned without being
# submitted.
def edit_file(self, user):
change = self.p4i[user].run('change -o')[0]
change['Description'] = 'Edited test file'
result = self.p4i[user].run('change -i', change)[0]
assert result.has_key('data')
match = re.match('^Change ([0-9]+) created', result['data'])
assert match
changelist = match.group(1)
self.p4i[user].run('edit -c %s %s' % (changelist, self.filename))
open(self.filename, 'a').write("Foo\n")
return self.p4i[user].run('change -o %s' % changelist)[0]
# 4.12 Variant tests
#
# This class has variant methods for each defect tracker (e.g., TeamTrack,
# Bugzilla) and calls the appropriate one.
def run_variant(self):
try:
test = getattr(self, config.dt_name)
except AttributeError:
assert 0, "No test variant for " + config.dt_name + "."
test()
# 5. TEST CASES: NORMAL OPERATION
#
# If nothing has changed, then nothing happens when the replicator polls.
# The databases are consistent.
class normal(p4dti_base):
def setUp(self):
self.setup_everything()
self.check_startup()
def runTest(self):
"Startup, replication to Perforce, consistency (test_p4dti.normal)"
self.clear_log()
self.r.poll()
self.expected_only_in_range(800, 1000, [911, 912])
self.check_consistency()
# 6. TEST CASES: INCORRECT CONFIGURATIONS
#
# This is a regression test of job000037, job000075 and job000116.
class bogus(p4dti_base):
def setUp(self):
self.setup_everything()
# 6.1. An incorrect parameter generates an error
#
# This is a utility function for carrying out a range of tests. It resets
# the configuration, sets the parameter named by 'param' to value, then
# tries to start the replicator. It expects to get an exception, whose
# message should have the message id 'msgid'.
def check_param(self, param, value, msgid):
reset_configuration()
config.__dict__[param] = value
try:
self.initialize_replicator()
except:
(err, msg, _) = sys.exc_info()
if isinstance(msg, message.message):
if msg.id != msgid:
self.addFailure("Set parameter %s to '%s': expected "
"message %d but got %d (%s)"
% (param, value, msgid, msg.id, str(msg)))
else:
self.addFailure("Set parameter %s to '%s': expected message "
"%d but got '%s: %s' instead."
% (param, value, msgid, err, msg))
else:
self.addFailure("Set parameter %s to '%s': expected message %d "
"but didn't get it." % (param, value, msgid))
# 6.2. Basic errors in parameters are caught quickly
#
# Basic errors in parameters (wrong type, wrong format) should be caught
# quickly.
#
# This is a table of (parameter name, bogus value, message id of expected
# error).
bogus_parameters = [
# Regression for job000170:
('administrator_address', 'invalid e-mail address', 202),
('changelist_url', -1, 207),
('changelist_url', "http://invalid/%d/%s", 210),
('changelist_url', "http://invalid/no/format/specifier", 210),
('changelist_url', "http://invalid/%d/%%/%%%", 210),
('closed_state', -1, 208),
('configure_name', -1, 207),
('job_url', 42, 207),
('job_url', "http://invalid/%d/%s", 211),
('job_url', "http://invalid/no/format/specifier", 211),
('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211),
('log_file', -1, 208),
('log_level', 'not an int', 204),
('migrate_p', 'not a function', 203),
('p4_client_executable', -1, 207),
# Regression test for job000158:
('p4_client_executable', 'no such file', 705),
('p4_user', None, 207),
('p4_password', -1, 207),
('p4_port', None, 207),
# Regression test for job000158, job000202:
('p4_port', '127.0.0.1:9999', 707),
('p4_server_description', -1, 207),
('poll_period', 'not an int', 204),
('prepare_issue', 'not a function', 203),
('replicator_address', 'invalid@e-mail@address', 202),
('replicate_p', 'not a function', 203),
('replicate_job_p', 'not a function', 203),
('replicated_fields', 'not a list', 205),
('replicated_fields', ['not', 'a', 'list', 'of', 'strings', 0], 206),
('rid', -1, 207),
('rid', '0abc', 209),
('rid', 'ab-c', 209),
('sid', -1, 207),
('sid', 'abcdefg+z', 209),
('smtp_server', -1, 207),
('start_date', '2001-02-03 24-00-00', 201),
('use_deleted_selections', 'neither 0 nor 1', 200),
('use_deleted_selections', -1, 200),
('use_deleted_selections', 2, 200),
('use_perforce_jobnames', 'neither 0 nor 1', 200),
('use_perforce_jobnames', -1, 200),
('use_perforce_jobnames', 2, 200),
('use_stdout_log', 'neither 0 nor 1', 200),
('use_stdout_log', -1, 200),
('use_stdout_log', 2, 200),
]
def test_parameters(self):
import check_config
for (param, value, msgid) in self.bogus_parameters:
self.check_param(param, value, msgid)
# 6.3. Basic errors in DT parameters are caught quickly
#
# As 6.2, but picks a set of tests based on config.dt_name.
bogus_TeamTrack_parameters = [
('closed_state', 'not a TeamTrack state', 401),
('replicated_fields', ['PRIORITY', 'not a TeamTrack state'], 403),
('replicated_fields', ['OWNER'], 404),
('replicated_fields', ['TITLE'], 404),
('replicated_fields', ['STATE'], 404),
('replicated_fields', ['PRIORITY', 'PRIORITY'], 405),
('replicated_fields', ['MULTISELECT'], 406),
# Regression for job000003, job000140:
('replicated_fields', ['CODE'], 407),
('teamtrack_password', -1, 207),
('teamtrack_server', None, 207),
('teamtrack_user', None, 207),
('teamtrack_version', -1, 207),
('use_windows_event_log', -1, 200),
('use_windows_event_log', 2, 200),
('use_windows_event_log', 'neither 0 nor 1', 200),
]
bogus_Bugzilla_parameters = [
('bugzilla_directory', 'not a directory', 303),
('bugzilla_directory', '/', 304),
('closed_state', 'not a Bugzilla state', 301),
('dbms_database', -1, 207),
('dbms_host', -1, 207),
# Regression for job000168:
('dbms_port', '1234', 204),
('dbms_user', -1, 207),
('dbms_password', -1, 207),
('migrated_user_password', -1, 207),
# By using fake Perforce client executables we can check that
# unsupported client and server versions are detected.
# Regression test for job000173.
('p4_client_executable', './fake_p4.py', 704),
('p4_client_executable', './fake_p4d.py', 834),
('replicated_fields', ['bug_status'], 311),
('replicated_fields', ['assigned_to'], 311),
('replicated_fields', ['short_desc'], 311),
('replicated_fields', ['resolution'], 311),
('replicated_fields', ['longdesc', 'longdesc'], 312),
('replicated_fields', ['not a Bugzilla field'], 313),
]
def test_dt_parameters(self):
params = getattr(self, 'bogus_%s_parameters' % config.dt_name)
for (param, value, msgid) in params:
self.check_param(param, value, msgid)
# 6.4. Parameter errors are caught by the defect tracker
#
# Like 6.2 and 6.3 this test sets a parameter to an incorrect value. In
# this case the error is caught by the defect tracker, so a message object
# isn't returned, but rather a string, which we must test directly rather
# than by message id.
# These three authentication failure messages are from the APIs for
# builds 4509, 5034, and 50101 respectively.
TeamTrack_auth_messages = [
"SERVER_ERROR: Authentication Failed. "
"Invalid user id or password",
"SOCKET_READ_ERROR: Socket error.\n"
"One reason might be database needs up upgrading.\n"
"Check event viewer for complete error message.\n",
"SOCKET_READ_ERROR: Authentication Failed. "
"Invalid user id, password, or licensing."
]
erroneous_TeamTrack_parameters = [
('dt_name',
'not a defect tracker',
'exceptions.ImportError',
'No module named configure_not a defect tracker'),
('teamtrack_server',
'host.invalid',
'TeamShare API error',
'SOCKET_CONNECT_FAILED: Socket Connect failed.'),
('teamtrack_user',
'invalid user',
'TeamShare API error',
TeamTrack_auth_messages),
('teamtrack_password',
'invalid password',
'TeamShare API error',
TeamTrack_auth_messages),
]
erroneous_Bugzilla_parameters = [
('dt_name',
'not a defect tracker',
'exceptions.ImportError',
'No module named configure_not a defect tracker'),
('dbms_host',
'host.invalid',
'_mysql.OperationalError',
'(2005, "Unknown MySQL Server Host \'host.invalid\' (2)")'),
('dbms_database',
'invalid',
'_mysql.OperationalError',
'(1049, "Unknown database \'invalid\'")'),
# ('dbms_port',
# 25,
# '_mysql.OperationalError',
# -1),
('dbms_password',
'not the Bugzilla password',
'_mysql.OperationalError',
'(1045, "Access denied for user:'),
('dbms_user',
'not the Bugzilla user',
'_mysql.OperationalError',
'(1045, "Access denied for user:'),
]
def test_dt_errors(self):
params = getattr(self, 'erroneous_%s_parameters' % config.dt_name)
for (param, value, error, message_texts) in params:
reset_configuration()
config.__dict__[param] = value
try:
self.initialize_replicator()
except:
(err, msg, _) = sys.exc_info()
if str(err) != error:
self.addFailure("Set parameter %s to '%s': expected error "
"'%s' but got error '%s'."
% (param, value, error, str(err)))
if isinstance(message_texts, types.ListType):
texts = message_texts
else:
texts = [message_texts]
found = 0
for text in texts:
if str(msg)[0:len(text)] == text:
found = 1
break
if not found:
self.addFailure("Set parameter %s to '%s': expected error "
"message in %s but got '%s'."
% (param, value, texts, msg))
else:
self.addFailure("Set parameter %s to %s: expected error '%s' "
"but there was no error."
% (param, value, error))
def runTest(self):
"Illegal configuration parameters (test_p4dti.bogus)"
self.test_parameters()
self.test_dt_parameters()
self.test_dt_errors()
# 7. TEST CASE: EXISTING JOB IN PERFORCE
#
# The replicator should refuse to start if there's a job in Perforce.
#
# This is a regression test for job000219 and job000240.
class existing(p4dti_base):
def setUp(self):
self.setup_everything()
self.initialize_replicator()
def runTest(self):
"Startup with an existing job (test_p4dti.existing)"
j = self.p4.run('job -o')[0]
j['Description'] = 'Test job'
self.p4.run('job -i', j)
self.check_exception(self.r.poll, self.r.error, 914)
# 8. TEST CASE: MOVING THE START DATE
#
#
# 8.1. Moving the start date backwards in time
#
# When start_date is set to the current time, no issues should be replicated
# when the replicator starts. Similarly, refreshing Perforce has no effect.
# But you can set start_date back in time and refresh Perforce, this time with
# effect.
#
# This is a regression test for job000047, job000050, job000221.
class start_1(p4dti_base):
def setUp(self):
self.setup_everything()
def runTest(self):
"Moving the start_date backwards in time (test_p4dti.start_1)"
config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(time.time()))
self.initialize_replicator()
# When we poll, nothing should happen.
self.clear_log()
self.r.poll()
self.expected([])
# Nor when we refresh.
self.clear_log()
self.r.refresh_perforce_jobs()
self.expected([])
# The databases should report consistent.
self.check_consistency()
# Now set start date back in time and try refreshing again.
config.start_date = "1971-01-01 00:00:00"
self.initialize_replicator()
self.clear_log()
self.r.refresh_perforce_jobs()
self.expected([803, 804, 812])
self.expected_only_in_range(800, 1000,
[803, 804, 812, 911, 912])
self.expected_not([806, 811])
# The databases should still report consistent.
self.check_consistency()
# 8.2. Moving the start date forwards in time
#
# Start up with an old start date as normal, then move the start date
# forwards in time. The databases should still report consistent,
# because issues that were recorded as being replicated in the first
# poll should still be recorded as replicated, even though they haven't
# changed since the start date.
#
# This is a regression test for job000340.
class start_2(p4dti_base):
def setUp(self):
self.setup_everything()
self.check_startup()
def runTest(self):
"Moving the start_date forwards in time (test_p4dti.start_2)"
# Set start date forward in time and check the consistency.
config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(time.time()))
self.initialize_replicator()
# When we poll, nothing should happen.
self.clear_log()
self.r.poll()
self.expected([])
# The databases should report consistent.
self.check_consistency()
# 9. TEST CASE: REPLICATING BY PROJECT
#
# This is a regression test for job000107, job000112, job000311.
class project(p4dti_base):
def setUp(self):
self.setup_everything()
def runTest(self):
"Replicate by project (test_p4dti.project)"
self.run_variant()
def TeamTrack(self):
config.replicated_fields = ['PROJECTID']
config.replicate_p = (lambda self: self['PROJECTID'] in [4,5])
self.initialize_replicator()
self.clear_log()
self.r.poll()
self.expected_only_in_range(800, 1000, [803, 804, 812, 911, 912])
self.expected_not([806, 811])
# The database has two projects called 'P4DTI', so we should get a
# warning. This is a regression test for job000311.
self.expected([607])
jobs = self.p4.run('jobs')
for j in jobs:
assert j['Project'] in ['Elaborate editor', 'Clever compiler'], \
("Job %s has project %s; shouldn't be replicated."
% (j['Job'], j['Project']))
self.check_consistency()
def Bugzilla(self):
config.replicated_fields = ['product']
config.replicate_p = (lambda self: self['product'] == 'product 1')
self.check_startup()
jobs = self.p4.run('jobs')
for j in jobs:
assert j['Product'] == 'product 1\n', \
("Job %s has product %s; shouldn't be replicated."
% (j['Job'], j['Product']))
self.check_consistency()
# 10. ISSUE LIFE CYCLE TEST CASES
#
# This test creates issues and takes them through various kinds of
# lifecycle, checking that they are replicated correctly at each step.
class lifecycle(p4dti_base):
user = 'rb'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
# Submit a new issue to the defect tracker. Run a replication
# cycle; check that the issue gets replicated to perforce. Return a
# pair of the defect tracker issue and the Perforce job.
def submit(self, user):
id = getattr(self, config.dt_name + '_submit')(user)
# It gets replicated to Perforce. This is a regression test for
# job000233.
self.check_replication_dt_to_p4(first_time = 1)
# Check that the job has been created in Perforce.
issue = self.r.dt.issue(id)
assert issue
jobname = issue.corresponding_id()
job = self.p4.run('job -o %s' % jobname)[0]
# Defect-tracker specific checks.
getattr(self, config.dt_name + '_submitted')(issue, job)
return issue, job
# Assign the issue to the user in the defect tracker. Run a
# replication cycle; check that the assignment gets replicated.
# Return the updated defect trakcer issue and Perforce job.
def assign(self, issue, job, user):
getattr(self, config.dt_name + '_assign')(issue, job, user)
# It gets replicated to Perforce.
self.check_replication_dt_to_p4()
# Defect-tracker specific checks.
issue = self.r.dt.issue(issue.id())
job = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_assigned')(issue, job)
return issue, job
# The job has been closed in Perforce. Check that the closure is
# replicated to the defect tracker and that the expected messages
# appear.
def close(self, issue, job, expected):
self.clear_log()
self.r.poll()
self.expected(expected)
self.expected_only_in_range(800, 1000, expected + [911, 912])
issue1 = self.r.dt.issue(issue.id())
job1 = self.p4.run('job -o %s' % job['Job'])[0]
getattr(self, config.dt_name + '_closed')(issue, job, issue1, job1)
def Bugzilla_submit(self, user):
fields = {
'reporter': 'nb@ravenbrook.com',
'product': 'product 1',
'version': 'unspecified',
'component': 'component 1.1',
'rep_platform': 'All',
'op_sys': 'All',
'priority': 'P1',
'bug_severity': 'critical',
'assigned_to': '',
'cc': '',
'bug_file_loc': '',
'short_desc': 'Life cycle test bug',
'comment': 'Life cycle test long description.',
'submit': ' Commit ',
'form_name': 'enter_bug',
'Bugzilla_password': 'p4dtitest',
'Bugzilla_login': 'nb@ravenbrook.com',
}
h = httplib.HTTP(config.bugzilla_server)
#h.set_debuglevel(100)
h.putrequest('POST', config.bugzilla_path + 'post_bug.cgi')
content = string.join(map(lambda f: urllib.quote_plus(f[0]) + '=' + urllib.quote_plus(f[1]), fields.items()), '&') + '\r\n'
h.putheader('Content-Length', str(len(content)))
h.endheaders()
h.send(content)
h.getreply()
bugid = None
lines = h.getfile().readlines()
for l in lines:
match = re.search('
Bug ([0-9]+) posted
', l)
if match:
bugid = match.group(1)
if bugid == None:
self.fail("Tried to submit a bug to Bugzilla, but got the following in reply: %s." % string.join(lines))
return bugid
def Bugzilla_submitted(self, bug, job):
for (field, expected) in [('Status', 'bugzilla_new'),
('Summary', 'Life cycle test bug\n'),
('Priority', 'P1'),
('Severity', 'critical'),
]:
assert job[field] == expected, \
("Expected new job %s to have %s '%s', but found '%s'."
% (job['Job'], field, expected, job[field]))
def Bugzilla_assign(self, bug, job, user):
# Assign the issue in Bugzilla. Ideally this should go through
# the Bugzilla web interface, but that's horrible. So cheat for
# now by going through the very low-level Bugzilla interface. (This is horrible too!)
bz_user = self.r.config.user_translator.translate_1_to_0(
user, self.r.dt, self.r.dt_p4)
changes = {'bug_status': 'ASSIGNED',
'assigned_to': bz_user}
bug_id = bug['bug_id']
self.r.dt.bugzilla.update_row('bugs', changes,
'bug_id = %d' % bug_id)
for k, v in changes.items():
activity = {
'bug_id': bug_id,
'who': bz_user,
'bug_when': self.r.dt.bugzilla.now(),
'fieldid': self.r.dt.bugzilla.fieldid(k),
self.r.dt.bugzilla.activity_old_field(): str(bug[k]),
self.r.dt.bugzilla.activity_new_field(): str(v),
}
self.r.dt.bugzilla.insert_row('bugs_activity', activity)
# Wait two seconds so that the bug will be picked up in the next
# poll, not the one after.
time.sleep(2)
def Bugzilla_assigned(self, bug, job):
# The status should now be assigned.
job = self.p4.run('job -o %s' % job['Job'])[0]
assert job['Status'] == 'assigned', \
("Expected assigned job %(Job)s to have status "
"'assigned' but it has state %(Status)s." % job)
def Bugzilla_closed(self, bug, job, bug1, job1):
pass
def TeamTrack_submit(self, user):
import teamtrack
# We need to be logged in to TeamTrack as a user other than the
# replicator in order to make a change that isn't ignored.
s = teamtrack.connect(user, '', config.teamtrack_server)
# Submit a new case.
case = s.new_record(s.case_table_id())
case['TITLE'] = 'Issue lifecycle test'
case['DESCRIPTION'] = 'This is a test issue for the lifecycle test.'
case['ISSUETYPE'] = 1 # Bug report
case['PROJECTID'] = 4 # Editor
case['SEVERITY'] = 45
# Regression test for job000182:
case['ESTTIMETOFIX'] = 7199 # 1:59:59
case['ACTTIMETOFIX'] = 445556 # 123:45:56
# Regression test for job000379 and job000381 (note that date is
# in the summer so DST is in effect):
case['CLOSEDATE'] = 962532245 # 2000-07-02 03:04:05
# Regression test for job000416.
case['FIXED_POINT'] = 16.1 # Represented by 16.100000000000001
issueid = case.submit(user)
# Check the job in Perforce.
case2 = s.query(s.case_table_id(),
"TS_ISSUETYPE=%d AND TS_ISSUEID='%05d'"
% (case['ISSUETYPE'], issueid))[0]
return str(case2['ID'])
def TeamTrack_submitted(self, case, job):
# Regression test for job000182.
assert case['ESTTIMETOFIX'] == 7199
assert case['ACTTIMETOFIX'] == 445556
for (field, expected) in [('State', '_new'),
('Title', 'Issue lifecycle test\n'),
('Est._Time_to_Fix', '1:59:59'),
('Actual_Time_to_Fix', '123:45:56'),
('Fixed_Point', '16.10'),
]:
assert job[field] == expected, \
("Expected new job %s to have %s '%s', but found '%s'."
% (job['Job'], field, expected, job[field]))
def TeamTrack_assign(self, case, job, user):
import teamtrack
# We need to be logged in to TeamTrack as a user other than the
# replicator in order to make a change that isn't ignored.
s = teamtrack.connect(user, '', config.teamtrack_server)
r = s.read_record(s.case_table_id(), case['ID'])
# Find the assign transition.
assign = s.query(teamtrack.table['TRANSITIONS'],
"TS_NAME = 'Assign'")[0]['ID']
# Assign the issue in TeamTrack.
r.transition(user, assign)
def TeamTrack_assigned(self, case, job):
# The state should now be assigned.
job = self.p4.run('job -o %s' % job['Job'])[0]
assert job['State'] == 'assigned', \
("Expected assigned job %(Job)s to have state "
"'assigned' but it has state %(State)s." % job)
return case, job
def TeamTrack_closed(self, case, job, case1, job1):
# The closed job gets replicated back to TeamTrack. TeamTrack
# will change the owner field and the replicator will replicate
# that back. This is a regression test for job000053.
assert job1['Owner'] != job['Owner'], \
("Closed job %(Job)s but owner is still %(Owner)s." % job1)
def runTest(self):
"Issue life cycle (test_p4dti.lifecycle)"
# 10.1. Simple issue lifecycle
#
# This is a simple issue cycle:
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. It is closed in Perforce (by editing the job).
# 5. The closure gets replicated back to the defect tracker.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
# Close the job in Perforce. This is a regression test for
# job000118.
job[self.r.config.job_status_field] = 'closed'
self.p4.run('job -i', job)
self.close(issue, job, [805, 824, 826])
self.r.poll()
# 10.2. Issue lifecycle (fix in Perforce)
#
# This is an issue cycle in which the issue is associated with a
# changelist in Perforce:
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. It is closed in Perforce (by making a fix).
# 5. Closure and fix get replicated back to the defect tracker.
#
# This is a regression test for job000133.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
self.p4.run('fix -c 1 %s' % job['Job'])
self.close(issue, job, [802, 805, 819, 820, 824, 826])
self.r.poll()
# 10.3. Issue lifecycle (fix on submission in Perforce)
#
# This tests an issue lifecycle in which the issue is associated
# with a pending changelist and closed on submission.
#
# 1. An issue is submitted to the defect tracker.
# 2. It is replicated to Perforce.
# 3. It gets assigned to a developer.
# 4. The job description is edited in Perforce (this is a
# regression test for job000362).
# 5. A fix is made with a pending changelist.
# 6. The change is submitted.
# 7. Job, fix get replicated back to the defect tracker.
#
# This is a regression test for job000225.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
self.r.poll()
job['Description'] = job['Description'] + '\nEdited.'
self.p4i[self.user].run('job -i', job)
self.clear_log()
self.r.poll()
self.expected([805, 824])
self.expected_only_in_range(800, 1000, [805, 824, 826, 911, 912])
change = self.edit_file(self.user)
self.p4i[self.user].run('fix -c %s %s'
% (change['Change'], job['Job']))
self.p4i[self.user].run('submit -c %s' % change['Change'])
self.close(issue, job, [802, 805, 819, 820, 824, 826])
self.r.poll()
# Delete fix in Perforce and check that the deletion is
# replicated (regression test for job000013 and job000222).
self.p4i[self.user].run('fix -d -c %s %s'
% (change['Change'], job['Job']))
self.clear_log()
self.r.poll()
self.expected([818])
fixes = self.p4.run('fixes -j %s' % job['Job'])
assert len(fixes) == 0, ("Expected no fixes for %s, but found "
"%s." % (job['Job'], fixes))
# 10.4. Issue lifecycle (fix to assigned on submission in
# Perforce)
#
# As section 10.3, but the fix is to the job's current state
# rather than "closed", so the job state doesn't change.
# Regression test for job000007.
issue, job = self.submit(self.user)
issue, job = self.assign(issue, job, self.user)
self.r.poll()
change = self.edit_file(self.user)
self.p4i[self.user].run('fix -s %s -c %s %s'
% (job[self.r.config.job_status_field],
change['Change'], job['Job']))
self.p4i[self.user].run('submit -c %s' % change['Change'])
self.clear_log()
self.r.poll()
expected = [802, 805, 819, 820, 825]
self.expected(expected)
self.expected_only_in_range(800, 1000, expected + [911, 912])
# We're done; one last check for luck.
self.check_consistency()
# 11. P4DTI CONFIGURATION DATABASE
#
# This checks that parameters get added and removed from the
# configuration database. This is a regression test for job000169 and
# job000351.
class configdb(p4dti_base):
def setUp(self):
self.setup_everything()
def TeamTrack_config_item(self, item):
import teamtrack
query = ("TS_TYPE = 4 AND TS_CHAR1 = '%s' AND TS_CHAR2 = "
"'%s'" % (config.rid, item))
rr = self.dti.server.query(teamtrack.table['VCACTIONS'], query)
for r in rr:
dict = eval(r['FILENAME'])
if (dict.has_key('sid') and dict['sid'] == config.sid
and dict.has_key('description')):
return dict['description']
return None
TeamTrack_config_map = { 'p4_server_description': 'SERVER',
'job_url': 'JOB_URL',
'changelist_url': 'CHANGELIST_URL' }
def TeamTrack_config(self):
cf = {}
for k, v in self.TeamTrack_config_map.items():
cf[k] = self.TeamTrack_config_item(v)
return cf
def Bugzilla_config(self):
return self.r.dt.bugzilla.get_config()
def check(self, cf1):
reset_configuration()
for k, v in cf1.items():
config.__dict__[k] = v
self.initialize_replicator()
cf2 = getattr(self, config.dt_name + '_config')()
for k, v in cf1.items():
if v != cf2.get(k):
self.addFailure("Set parameter %s to '%s', but found "
"'%s' in the config database."
% (k, v, cf2.get(k)))
def runTest(self):
"Replicator configuration database (test_p4dti.configdb)"
cf1 = { 'p4_server_description': 'spong',
'changelist_url': 'http://spong/changelist?%d',
'job_url': 'http://spong/job?%s' }
cf2 = { 'p4_server_description': 'spong',
'changelist_url': None,
'job_url': None }
self.check(cf1)
self.check(cf2)
self.check(cf1)
# 12. INCONSISTENCIES
#
# This test case checks that various inconsistencies are detected
# correctly.
class inconsistencies(p4dti_base):
user = 'rb'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
def runTest(self):
"Consistency check failures (test_p4dti.inconsistencies)"
# 12.1. Unreplicated fix in Perforce
change = self.edit_file(self.user)
job = self.p4.run('jobs')[0]
self.p4i[self.user].run('fix -s %s -c %s %s'
% (job[config.job_status_field],
change['Change'], job['Job']))
self.clear_log()
self.r.check_consistency()
self.expected([871, 878, 886])
self.expected_only_in_range(800, 1000,
[871, 878, 883, 884, 886, 890])
self.r.poll()
# 12.2. Changed job in Perforce
job['Description'] = job['Description'] + '...\n'
self.p4i[self.user].run('job -i', job)
self.clear_log()
self.r.check_consistency()
self.expected([871, 875, 886])
self.expected_only_in_range(800, 1000,
[871, 875, 883, 884, 886, 890])
self.r.poll()
# 12.3. Unreplicated job in Perforce
#
# This is a regression test for job000372.
j = self.p4i[self.user].run('job -o')[0]
for k, v in j.items():
if string.find(v, '') == 0:
j[k] = 'Test!'
j['P4DTI-rid'] = config.rid
j['P4DTI-issue-id'] = "999999"
self.p4i[self.user].run('job -i', j)
self.clear_log()
self.r.check_consistency()
self.expected([871, 882, 886])
self.expected_only_in_range(800, 1000,
[871, 882, 883, 884, 886, 890])
# 13. RACE DURING REPLICATION OF FIXES
#
# This is a regression test for job000385.
class race_385(p4dti_base):
user = 'rb'
change = None
original_fixes_differences = None
race_flag = None
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
def race(self, dt_fixes, p4_fixes):
fix_diffs = self.original_fixes_differences(dt_fixes, p4_fixes)
# Submit the change (this is the race). But only once.
if self.race_flag == 0:
self.p4i[self.user].run('submit -c %s' % self.change['Change'])
self.race_flag = 1
return fix_diffs
def runTest(self):
"Race during replication of fixes (test_p4dti.race_385)"
# Create a pending change in Perforce and make a fix to that
# change.
self.change = self.edit_file(self.user)
job = self.p4.run('jobs')[0]
self.p4i[self.user].run('fix -s %s -c %s %s'
% (job[config.job_status_field],
self.change['Change'], job['Job']))
# Create a second pending change in Perforce (so that the first
# change will get renumbered when submitted).
change_2 = self.edit_file(self.user)
# We'll make sure that the change gets submitted after
# fixes_differences gets called -- this is our last opportunity
# to run the race before the possibly illegal call to p4 change
# -o.
self.original_fixes_differences = self.r.fixes_differences
self.r.fixes_differences = self.race
self.race_flag = 0
# Replicate.
self.clear_log()
self.r.poll()
# Restore the original fixes_differences.
self.r.fixes_differences = self.original_fixes_differences
# Check that the replication succeeded.
expected = [802, 805, 819, 820]
self.expected(expected)
self.expected_only_in_range(800, 1000,
expected + [825, 911, 912])
self.check_consistency()
# 14. CAN CONFIRM AN UNCONFIRMED BUG
#
# This is a regression test for job000262 and job000410. Bugzilla only.
class unconfirmed(p4dti_base):
bug_id = 27 # this bug is UNCONFIRMED in our test database
user = 'rb'
def setUp(self):
self.setup_everything()
self.setup_perforce([self.user])
self.check_startup()
def runTest(self):
"Confirm an UNCONFIRMED bug (test_p4dti.unconfirmed)"
job = self.p4.run('job -o bug%d' % self.bug_id)[0]
assert job['Status'] == 'unconfirmed'
job['Status'] = 'bugzilla_new'
self.p4i[self.user].run('job -i', job)
# Replicate.
self.clear_log()
self.r.poll()
# Check that the replication succeeded.
expected = [805, 824]
self.expected(expected)
self.expected_only_in_range(800, 1000, expected + [911, 912])
self.check_consistency()
# 15. REPLICATE NEW JOBS FROM PERFORCE
#
# This checks that a new job in Perforce can be submitted to the defect
# tracker and successfully replicated thereafter. This is a regression
# test for job000036.
class new_p4_job(p4dti_base):
user = 'rb'
TeamTrack_job = {
'Title': 'test title',
'Owner': user,
'Description': 'test description',
'Priority': 'medium',
'Severity': 'medium',
'Project': 'Elaborate editor',
}
Bugzilla_job = {
'Summary': 'test summary',
'Assigned_To': user,
'Description': 'test description',
'Priority': 'P3',
'Severity': 'normal',
'Product': 'product 1',
}
def TeamTrack_prepare_issue(_, issue, job):
pass
def Bugzilla_prepare_issue(_, issue, job):
default_component = {
'product 1': 'component 1.1',
'product 2': 'component 2.1',
'unconfirmed': 'unconf1',
'group': 'group1',
}
if issue['product'] == '':
issue['product'] = 'product 1'
if issue['component'] == '':
issue['component'] = default_component[issue['product']]
if issue['version'] == '':
issue['version'] = 'unspecified'
def setUp(self):
self.setup_everything()
config.prepare_issue = getattr(self, config.dt_name
+ '_prepare_issue')
config.replicate_job_p = lambda(job): 1
if config.dt_name == 'TeamTrack':
config.replicated_fields = (config.replicated_fields
+ ['PROJECTID'])
self.setup_perforce([self.user])
self.check_startup()
def runTest(self):
"Replicate a new job from Perforce (test_p4dti.new_p4_job)"
# Add a job to Perforce.
job = self.p4i[self.user].run('job -o')[0]
for k, v in getattr(self, config.dt_name + '_job').items():
job[k] = v
result = self.p4i[self.user].run('job -i', job)[0]['data']
jobname = string.split(result)[1]
# Replicate it.
self.clear_log()
self.r.poll()
self.expected([892, 894])
self.expected_only_in_range(800, 914,
[826, 892, 894, 896, 911, 912])
# Check that the job is replicated.
self.check_consistency()
job = self.p4.run('job -o %s' % jobname)[0]
assert job['P4DTI-rid'] == config.rid
# 16. TEST WINDOWS NT SERVICE AND EVENT LOG
#
# This test works for TeamTrack only. It is a regression test for
# job000046 and job000149.
class nt_service(p4dti_base):
# Service name in the Windows registry.
_svc_name_ = 'p4dti_service'
# If the service can't replicate its initial batch of issues in five
# minutes, we'll consider it to have failed (this depends on the
# test database not having too many issues).
timeout = 300
# Number of polls before we expect consistency to be achieved.
polls_for_consistency = 3
def setUp(self):
reset_configuration()
self.dti.restart_perforce()
self.dti.restart_defect_tracker()
self.snoop_logger()
self.initialize_replicator()
# Handle on Service Manager.
import win32service
access_level = win32service.SC_MANAGER_ALL_ACCESS
self.hscm = win32service.OpenSCManager(None, None, access_level)
# Handle on Event Log.
import win32evtlog
self.hevt = win32evtlog.OpenEventLog(None, 'Application')
# Establish configuration environment for service. We want the
# current configuration with the following changes: NT Event
# logging must be set; logging level is DEBUG; email from the
# replicator is supressed.
controls = (('P4DTI_CONFIG', os.path.abspath(config_file)),
('P4DTI_EVTLOG', ''),
('P4DTI_LOGLEVEL', str(message.DEBUG)),
('P4DTI_ADMINADDR', ''),
)
for key, value in controls:
os.environ[key] = value
# If the service is already there, remove it.
if self.query_service():
self.remove_service()
def tearDown(self):
import win32service
import win32evtlog
win32service.CloseServiceHandle(self.hscm)
win32evtlog.CloseEventLog(self.hevt)
# Return None if service not currently installed. Otherwise
# return currentState field as reported by Service Manager.
def query_service(self):
import win32service
type_filter = win32service.SERVICE_WIN32
state_filter = win32service.SERVICE_STATE_ALL
query = win32service.EnumServicesStatus
# List all registered services
services = query(self.hscm, type_filter, state_filter)
# Is ours there?
for svc_name, description, status in services:
if svc_name == self._svc_name_:
# Looks like it was.
currentState = status[1]
return currentState
# Looks like it wasn't.
return None
def wait(self, function):
timeout = self.timeout
while timeout > 0:
if function():
return 1
time.sleep(1)
timeout = timeout - 1
return 0
def wait_for_status(self, status):
query = self.query_service
return self.wait(lambda query=query, status=status:
query() == status)
def read_event_log_first_time(self):
import win32evtlog
hevt = self.hevt
oldest_record = win32evtlog.GetOldestEventLogRecord(hevt)
record_count = win32evtlog.GetNumberOfEventLogRecords(hevt)
newest_record = oldest_record + record_count - 1
# Random access into the Event Log.
read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ +
win32evtlog.EVENTLOG_SEEK_READ)
# Read newest event and ignore it. This has the required
# side-effect of setting the handle's reading position in the
# log, ready for future reads.
win32evtlog.ReadEventLog(hevt, read_flags, newest_record)
def wait_for_event_log(self, msg):
return self.wait(lambda self=self, msg=msg:
self.match_event_log(msg))
# Read all the log entries that have been written since last
# time. It's remotely possible that we might flush the message
# we're looking for twice in one call to ReadEventLog, but this is
# acceptable as it is harmless to pool again.
def match_event_log(self, msg):
import win32evtlog
hevt = self.hevt
# Sequential access from where we left off, last time we read
# on this handle.
read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ +
win32evtlog.EVENTLOG_SEQUENTIAL_READ)
import catalog
# This is the string we're looking for.
wanted = str(catalog.msg(msg))
# We have to make an undeterminable number of reads to exhaust
# the log. Each read will return some undeterminable number of
# records. (I can't tell from documentation whether zero is a
# possible number in cases where there are records to be
# read. This doesn't matter, as we're prepared to come back
# several times.)
records = win32evtlog.ReadEventLog(hevt, read_flags, 0)
while records:
for record in records:
message = record.StringInserts[0]
if message == wanted:
return 1
records = win32evtlog.ReadEventLog(hevt, read_flags, 0)
return 0
# Simplify hook into service.main()
def main(self, *args):
import service
service.main([''] + list(args))
# The next four methods have to use service.py, as that's what
# we're testing.
def install_service(self):
self.main()
def remove_service(self):
self.main('remove')
def start_service(self):
self.main('start')
def halt_service(self):
self.main('stop')
def runTest(self):
"Manage NT service (test_p4dti.nt_service)"
import win32service
# Install the service and ensure that it's there.
self.install_service()
assert self.query_service()
# Start it running.
self.read_event_log_first_time()
self.start_service()
assert self.wait_for_status(win32service.SERVICE_RUNNING)
# Allow the replication to happen.
for i in range(self.polls_for_consistency):
# (message.DEBUG, "Poll finished.")
assert self.wait_for_event_log(912)
# Halt the service.
self.halt_service()
assert self.wait_for_status(win32service.SERVICE_STOPPED)
# Remove service and ensure that it's gone away.
self.remove_service()
assert not self.query_service()
# Check that replication succeeded.
self.check_consistency()
# 17. MIGRATION
#
# This tests that jobs in Perforce can be migrated from the default
# Perforce jobspec to the defect tracker and replicated thereafter.
# This is a regression test for job000022, job000249, and job000422.
class migrate(p4dti_base):
n_jobs = 20
users = ['rb', 'nb', 'gdr']
states = ['open', 'closed', 'suspended']
fixes = {}
def setUp(self):
self.setup_everything()
for param in ["migrate_p", "translate_jobspec",
"prepare_issue"]:
setattr(config, param,
getattr(self, config.dt_name + "_" + param))
self.setup_perforce(self.users)
self.initialize_replicator()
self.create_jobs()
def create_jobs(self):
self.fixes = {}
# Create some jobs in Perforce.
for i in range(self.n_jobs):
job = self.p4.run('job -o')[0]
user = self.users[i % len(self.users)]
status = self.states[i % len(self.states)]
changes = {
'Description': ("First line of job %d\n"
"Remainder of job %d\n"
"Blah blah blah...\n" % (i, i)),
'User': user,
'Status': status,
}
self.r.update_job(job, changes)
self.p4i[user].run('fix -s %s -c 1 %s'
% (status, job['Job']))
self.fixes[job['Job']] = status
def runTest(self):
"Migration from Perforce jobs (test_p4dti.migrate)"
self.clear_log()
self.r.migrate()
self.expected([892, 895])
self.expected_only_in_range(800, 914, [802, 819, 820, 892, 895])
self.r.refresh_perforce_jobs()
self.r.poll()
self.check_consistency()
# Check that no fixes have been lost, changed or added.
# Regression test for job000271.
for f in self.p4.run('fixes'):
if not self.fixes.has_key(f['Job']):
self.addFailure("Found unexpected fix for job '%s'."
% f['Job'])
else:
if self.fixes[f['Job']] != f['Status']:
self.addFailure("Expected fix for job '%s' to have "
"status '%s', but found '%s'."
% (f['Job'], self.fixes[f['Job']],
f['Status']))
del self.fixes[f['Job']]
for j in self.fixes.keys():
self.addFailure("Expected a fix for job '%s', but didn't "
"find it." % j)
def TeamTrack_migrate_p(_, job):
return 1
def TeamTrack_prepare_issue(_, issue, job):
if issue['ISSUETYPE'] == 0:
issue['ISSUETYPE'] = 1
if issue['PROJECTID'] == 0:
issue['PROJECTID'] = 4
def TeamTrack_translate_jobspec(_, job):
desc = job.get("Description", "")
newline = string.find(desc, "\n")
job["Title"] = desc[:newline]
job["Description"] = desc[newline+1:]
job["Owner"] = job.get("User", "(None)")
status = job.get("Status", "open")
if status == "open":
job["State"] = "assigned"
else:
job["State"] = "verified"
return job
def Bugzilla_migrate_p(_, job):
return 1
def Bugzilla_prepare_issue(_, issue, job):
issue["product"] = "product 1"
issue["component"] = "component 1.1"
issue["version"] = "unspecified"
def Bugzilla_translate_jobspec(_, job):
desc = job.get("Description", "")
newline = string.find(desc, "\n")
job["Summary"] = desc[:newline]
job["Description"] = desc[newline+1:]
job["Assigned_To"] = job.get("User", "")
status_map = {
"open": ("assigned", ""),
"closed": ("closed", "FIXED"),
"suspended": ("closed", "LATER"),
}
(status, resolution) = status_map[job.get("Status", "open")]
job["Status"] = status
job["Resolution"] = resolution
job["Severity"] = "blocker"
job["Priority"] = "P1"
return job
# RUNNING THE TESTS
def tests():
suite = unittest.TestSuite()
tests = [start_1, bogus, configdb, existing, inconsistencies,
lifecycle, migrate, new_p4_job, normal, project, race_385,
start_2]
if config.dt_name == 'TeamTrack':
tests.extend([nt_service])
elif config.dt_name == 'Bugzilla':
tests.extend([unconfirmed])
for t in tests:
suite.addTest(t())
return suite
if __name__ == "__main__":
unittest.main(defaultTest="tests")
# A. REFERENCES
#
# [GDR 2000-12-31] "Automated testing plan" (e-mail message); Gareth
# Rees; Ravenbrook Limited; 2000-12-31;
# .
#
# [GDR 2001-03-14] "Automatic test of TeamTrack integration" (e-mail
# message); Gareth Rees; Ravenbrook Limited; 2001-03-14;
# .
#
# [PyUnit] "PyUnit - a unit testing framework for Python"; Steve
# Purcell; .
#
# [RB 2000-12-08] "init.py -- Initialize replicator and defect tracker";
# Richard Brooksby; Ravenbrook Limited; 2000-12-08;
# .
#
#
# B. DOCUMENT HISTORY
#
# 2001-03-14 GDR Created.
#
# 2001-03-15 GDR Added list of regression tests, discussion of limitations,
# snoping on e-mail, p4dti_variant class, test cases for existing jobs, recent
# start date and replicating by project.
#
# 2001-03-17 GDR Added test_dt_errors test case for errors caught by the defect
# tracker or its database.
#
# 2001-05-01 GDR Use p4dti_unittest module so that we can report many errors in
# the course of a single test case. Added regression test for job000311.
#
# 2001-05-17 GDR Don't look for messages 844-847 (replicator doesn't
# output them any more). Fixed bug in issue lifecycle test.
#
# 2001-05-18 GDR Extended the lifecycle test so that it tests
# replication of fixes.
#
# 2001-06-07 GDR Added references to design.
#
# 2001-07-02 GDR Made bogus_config and lifecycle test cases portable
# between TeamTrack 4.5 and TeamTrack 5.0.
#
# 2001-07-03 GDR Moved TeamTrack connection time test to
# test_teamtrack.py where it belongs.
#
# 2001-07-09 NB Added changelist_url test.
#
# 2001-07-17 GDR Added start_2 (regression test for job000340) and
# configdb (regression test for job000169, job000351).
#
# 2001-07-25 GDR Added consistency checker tests (regression test for
# job000372).
#
# 2001-09-24 GDR Use environment variable P4DTI_PATH (if set) to find
# the P4DTI sources, so that testers can test installed copies of the
# P4DTI as well as copies synced from the repository. Use environment
# variable P4DTI_CONFIG (if set) to find the configuration.
#
# 2001-09-25 GDR Use Bugzilla database and user from the configuration;
# see job000394. Restore the mysqldump for the Bugzilla version given
# by the bugzilla_version configuration parameter (if specified). The
# tests start_1 and start_2 were incorrectly using gmtime when making
# timestamps for the Bugzilla database -- this could lead to problems
# when testing in timezones not equal to UTC with recently-created
# defects. Use localtime instead.
#
# 2001-10-03 GDR Added test case for job000385.
#
# 2001-10-23 GDR Use entry point for polling, so no need to call
# start_logger.
#
# 2001-10-26 NB Added test case for job000410.
#
# 2001-11-01 NB Always write e-mail messages to the test log. Make
# Perforce user records so that email address translation works.
#
# 2001-11-06 GDR Added test case for replicating new job from Perforce.
# Log each run to a separate file.
#
# 2001-11-09 NDL Added test case for NT services and event log.
#
# 2001-11-13 GDR Method and variable names use underscore to separate
# words where possible. Lifecycle test works with NT service changes.
#
# 2001-11-15 GDR Do not destructively modify config.replicated_fields
# in new_p4_job.setUp.
#
# 2001-11-21 GDR Added test case for migration, and test cases for
# incorrect values for the new configuration parameters.
#
# 2001-11-22 GDR Refresh Perforce jobs after migrating.
#
# 2001-11-26 GDR Test cases migrate and new_p4_job take account of new
# migration messages.
#
# 2001-11-28 GDR Bugzilla migration test fills in Severity and Priority
# fields when translating the jobspec. Adapted the lifecycle test so
# that it runs in the Bugzilla integration.
#
# 2001-12-03 GDR Added regression tests for job000007, job000013,
# job000222, job000271.
#
#
# C. COPYRIGHT AND LICENCE
#
# This file is copyright (c) 2001 Perforce Software, Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
# $Id: //info.ravenbrook.com/project/p4dti/version/1.3/test/test_p4dti.py#7 $