#             Perforce Defect Tracking Integration Project
#              <http://www.ravenbrook.com/project/p4dti/>
#
#                   TEST_P4DTI.PY -- TEST THE P4DTI
#
#             Gareth Rees, Ravenbrook Limited, 2001-03-14
#
#
# 1. INTRODUCTION
#
# This test script tests the P4DTI.  The initial plan is described in
# [GDR 2000-12-31] and some of the design is given in [GDR 2001-03-14].
#
# This script contains a lot of tricky code (for example, using the
# hostname or operating system name to pick a module to import or a
# class to use; or overriding a method in another module in order to
# snoop on its behaviour).  Please take care when editing it.
#
# It uses the PyUnit unit test framework [PyUnit].
#
# The intended readership is project developers.
#
# This document is not confidential.
#
# The test cases depend on features of the test databases.  It is
# important that the cases be kept in sync with the test database and
# with the design of those databases [GDR 2001-03-14].  When you add a
# test, be sure to change the design and refer back to the test.
#
#
# 1.1. Useful info
#
# The script creates a series of test Perforce servers (one for each
# test case).  When a test case starts, it kills all the running
# Perforce servers on the required port.  Each of these has its own
# directory, in the appropriate place for temporary files for your
# system (typically /tmp on Unix, C:\Temp on Windows).  The P4DTI log is
# diverted from its real place and sent to the file p4dti.log.
#
#
# 1.2. Regression tests in this script
#
# The section * means that the defect is tested throughout as a simple
# consequence of running the script; there is no particular test for it.
#
# Job    Section  Title
# ----------------------------------------------------------------------
# job000003  6.3  It is not possible from the result of "p4 -G job -o
#                 jobname" to tell whether a job has been returned, or
#                 an error message
# job000004  *    "p4 -G jobspec -o" doesn't work
# job000005  *    TeamTrack integration may fail mysteriously on startup
#                 in future releases of TeamTrack
# job000007  10.4 The fixes keyword can't be set on submit (except to
#                 "closed")
# job000013  10.3 Deleting fixes not replicated
# job000022  17   No migration from Perforce jobs to defect tracker
# job000036  15   New jobs in Perforce don't get replicated to the
#                 defect tracker
# job000037  6    Consistency checker script is inadequate
# job000046  16   The replicator process is hard to manage
# job000047  8.1  Historical bugs are replicated but uninteresting
# job000048  6    Consistency checking of the configuration is
#                 inadequate
# job000050  8.1  There's no way to re-start replication
# job000051  *    There's no Bugzilla integration
# job000053  10   Implicit changes made by the DT don't get replicated
#                 back
# job000057  *    Special characters in single-select keywords make the
#                 replicator barf
# job000075  6    No automatic check of configuration
# job000092  *    Long descriptions aren't replicated by Bugzilla
#                 integration
# job000107  9    You can't replicate an issue's project
# job000111  7    Replicator destroys the jobspec that exists before
#                 it's installed
# job000112  9    Can't easily replicate by project
# job000116  6    Bugzilla integration doesn't do enough checking
# job000118  10.1 Resolving a Bugzilla bug without setting the
#                 resolution always causes a conflict
# job000133  10.2 You can't close a job by fixing it
# job000140  6.3  Logical field name "code" not allowed in TeamTrack
# job000149  16   We don't use system logging facilities on Windows
# job000158  6.2  Obscure error if Perforce can't be reached or
#                 p4_client_executable is wrong
# job000168  6.2  Too easy to set dbms_port to a string
# job000169  11   Change numbers are links in TeamTrack even when no
#                 changelist URL has been specified
# job000170  6.2  Replicator may be unable to send e-mail if the default
#                 replicator_address is unchanged
# job000173  6.3  Wrong Perforce server version causes installation to
#                 fail mysteriously
# job000181  *    Assertion failure in translate_1_to_0
# job000182  10   Elapsed time fields aren't replicated properly
# job000184  *    Bugzilla integration doesn't work if your database is
#                 not called 'bugs'
# job000190  3.1  Connection to TeamTrack hangs for several minutes
# job000199  4.5  Auxiliary scripts send e-mail
# job000202  6.2  Errors from Perforce not reported well
# job000219  7    Existing jobs in Perforce may become unusable when the
#                 jobspec is changed
# job000221  8.1  Refreshing Perforce jobs fails in Bugzilla integration
# job000222  10.3 Deleting fix causes Bugzilla integration to crash
# job000225  10.3 If you "p4 fix" when there's no closed state, the
#                 replicator can't replicate
# job000233  10   When you submit a new issue to TeamTrack it overwrites
#                 the issue
# job000240  7    Perforce server can crash if P4DTI is started with
#                 existing jobs
# job000249  17   Refresh script fails if you change the closed_state
# job000262  14   Bugzilla: Can't use Perforce to confirm an unconfirmed
#                 bug
# job000271  17   The migrate script loses fixes
# job000311  9    If you have two TeamTrack projects with the same name,
#                 the replicator stops working
# job000325  *    Can't read issues from TeamTrack 5.0
# job000338  *    TeamTrack 5.0 doesn't update P4DTI_ fields
# job000340  8.2  Consistency checker may miss some issues if you change
#                 the start date
# job000351  11   Bugzilla integration doesn't remove old configuration
#                 items
# job000352  18   Bugzilla emailsuffix parameter not supported
# job000354  *    MySQL bug stops replicator from seeing some Bugzilla
#                 changes
# job000355  8    Bugzilla integration ignores start_date parameter
# job000358  *    P4DTI doesn't work with TeamTrack 5.0
# job000362  10.3 Can't update case from Perforce when using TeamTrack
#                 4.5 database upgraded to TeamTrack 5.0
# job000370  *    Journal fields in TeamTrack 5.0 can't be replicated
# job000372  12.3 Check consistency stops with error if there are
#                 unreplicated jobs
# job000379  10   P4DTI corrupts date/time fields in TeamTrack if server
#                 time zone is not UTC
# job000381  10   P4DTI corrupts date/time fields in TeamTrack if
#                 daylight savings time is in effect
# job000385  13   Renumbered changelist causes P4DTI error and deletes
#                 fix
# job000390  *    P4DTI doesn't support Bugzilla 2.14
# job000410  14   Can't confirm an unconfirmed Bugzilla bug from
#                 Perforce
# job000416  10   Fixed-point numbers treated as floating-point in
#                 TeamTrack integration
# job000422  17   Refreshing fails after migration if new workflow
#                 doesn't match old workflow
# job000426  17   Migration to TeamTrack fails unless issue ids are
#                 zero-filled to 5 digits
# job000427  12.8 Can't delete associated filespec in Bugzilla
#                 integration
# job000442  10   Can't replicate 'line' fields with hashes in them to
#                 Perforce
# job000445  19   Bugzilla replicator doesn't like spaces in enum
#                 fields


import os
import sys
if os.environ.has_key('P4DTI_PATH'):
    p4dti_path = os.environ['P4DTI_PATH']
else:
    p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code',
                              'replicator')
if p4dti_path not in sys.path:
    sys.path.append(p4dti_path)
import cgi
import copy
import httplib
import imp
import logger
import message
import p4
import p4dti_unittest
import re
import socket
import string
import tempfile
import time
import types
import unittest
import urllib


# 2. CONFIGURATION
#
#
# 2.1. Limitations
#
# This script can only support one basic configuration on each host (see
# section 2.2).  Tests of other configurations have to be made by
# changing configuration parameters in the script.
# section 2.2).  Tests of other configurations have to be made by
# changing configuration parameters in the script.
#
# This script can only test one defect tracker on each host (you have
# to change dt_name in config_HOSTNAME.py to test another defect
# tracker).
#
# This script can only have one current invocation on each host.
#
# Could it use an environment variable to work out which configuration
# to use?
#
#
# 2.2. Using a test configuration
#
# I want to be able to test different configurations, changes to
# configurations, and to check whether erroneous configurations are
# spotted correctly.
#
# I expect to find a working configuration either (1) in the file
# specified by the environment variable P4DTI_CONFIG, if set, or (2) in
# config_HOSTNAME.py, where HOSTNAME is the first component of the FQDN,
# converted to lower case (e.g. 'swan' for 'swan.ravenbrook.com').
#
# This configuration file is an ordinary P4DTI configuration file,
# except that it must include two additional configuration parameters:
# p4_license_file specifies the location of a Perforce license file
# suitable for use on the machine running the tests;
# p4_server_executable specifies the location of a suitable Perforce
# server executable.
#
# The loaded configuration module is copied when a test is started and
# the copy is installed in sys.modules['config'].  This has two effects:
#
#   1. The P4DTI itself won't load the config module; see the
#      initialization code [RB 2000-12-08].
#
#   2. The unit tests can make changes to the copy of the configuration
#      (e.g., to specify incorrect values, as in section 6) without
#      affecting the original, which can be restored for the next test.

if os.environ.has_key('P4DTI_CONFIG'):
    config_file = os.environ['P4DTI_CONFIG']
else:
    hostname = string.lower(string.split(socket.gethostname(), '.')[0])
    config_file = 'config_' + hostname + '.py'
file = open(config_file)
try:
    imp.load_source('config', config_file, file)
finally:
    file.close()
original_configuration = copy.copy(sys.modules['config'].__dict__)
import config

log_filename = (config.log_file
                or time.strftime('p4dti.log.%Y%m%dT%H%M%S',
                                 time.gmtime(time.time())))
log_filename = os.path.abspath(log_filename)
sys.stdout.write("P4DTI test suite, logging to %s.\n" % log_filename)
sys.stdout.flush()

def reset_configuration():
    # Delete current config, just in case we added something.
    for k in sys.modules['config'].__dict__.keys():
        if not re.match('^__.*__$', k):
            del sys.modules['config'].__dict__[k]
    # Restore old config.
    for (k,v) in original_configuration.items():
        sys.modules['config'].__dict__[k] = v


# 3. DEFECT TRACKER AND OPERATING INTERFACES
#
# Many of the test cases are generic: they don't depend on a particular
# defect tracker or operating system.  But they need an interface to the
# defect tracker in order to restart it, and to the operating system in
# order to start a new Perforce server.
#
# Each interface class should be called DT_OS_interface (where DT is
# config.dt_name and OS is os.name) is and must define these methods:
#
# restart_defect_tracker()
# restart_perforce()
#
# Note that there are no corresponding "stop" methods.  The tests leave
# Perforce and the defect tracker running so that failures can be
# investigated without all the evidence having disappeared.


# 3.1. Interface to TeamTrack on Windows NT

class TeamTrack_nt_interface:

    # The TeamTrack server.
    server = None

    # Temporary directory for files created by this test.
    tempdir = None


    # 3.1.1. Clean up TeamTrack
    #
    # I would prefer to create a new TeamTrack database each time this
    # test is run (so that I can check that extra fields are added
    # correctly to the CASES table, for example).  However, I don't know
    # of a way to do that; TeamTrack is controlled from the TeamTrack
    # Administrator and I have no idea how to automate it.
    #
    # So the next best thing to do is to clean out all P4DTI data from a
    # running TeamTrack server.  This means setting all P4DTI_ fields in
    # the CASES table to the empty string, and deleting all records from
    # the VCACTIONS table.
    #
    # This isn't ideal: each new test ends up working with the set of
    # cases that resulted from the previous test.

    def restart_defect_tracker(self):
        # Connect to the TeamTrack server.  This should take no more
        # than 5 seconds (regression test for job000190).
        import teamtrack
        self.server = teamtrack.connect(config.teamtrack_user,
                                        config.teamtrack_password,
                                        config.teamtrack_server)

        # Empty the P4DTI_* fields in the CASES table, if there are any.
        cases = self.server.query(self.server.case_table_id(), '')
        fields = ['P4DTI_JOBNAME', 'P4DTI_RID', 'P4DTI_SID']
        for c in cases:
            changed = 0
            for f in fields:
                if c.has_key(f) and c[f] != '':
                    c[f] = ''
                    changed = 1
            if changed:
                c.update()

        # Delete all records in the VCACTIONS table.
        vcactions = self.server.query(teamtrack.table['VCACTIONS'],'')
        for v in vcactions:
            self.server.delete_record(teamtrack.table['VCACTIONS'],
                                      v['ID'])


    # 3.1.2. Kill old Perforce servers
    #
    # Find and terminate all running Perforce servers, if any.  This
    # code is based on the script "killProcName.py" that comes with the
    # Python Win32 extensions.

    def kill_perforce(self):
        import win32pdhutil
        import win32api
        import win32con
	try:
            win32pdhutil.GetPerformanceAttributes('Process',
                                                  'ID Process', "p4d")
	except:
            pass
        pids = win32pdhutil.FindPerformanceAttributesByName("p4d")
        for pid in pids:
            h = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0, pid)
            win32api.TerminateProcess(h,0)
            win32api.CloseHandle(h)


    # 3.1.3. Start a new Perforce server
    #
    # Make a completely fresh Perforce server, with a new repository.

    def start_perforce(self):
        # Make a new repository directory.
        self.tempdir = tempfile.mktemp()
        os.mkdir(self.tempdir)

        # Copy the license.
        src = open(config.p4_license_file, 'r')
        dest = open(self.tempdir + '\\license', 'w')
        dest.writelines(src.readlines())
        src.close()
        dest.close()

        # Work out Perforce's port number and start a Perforce server.
        match = re.match(".*:([0-9]+)$", config.p4_port)
        if match:
            port = int(match.group(1))
        else:
            port = 1666
        import win32api
        win32api.WinExec("%s -p 0.0.0.0:%d -r %s"
                         % (config.p4_server_executable, port,
                            self.tempdir))


    # 3.1.4. Restart Perforce
    #
    # By killing the old server and starting a new one.

    def restart_perforce(self):
        self.kill_perforce()
        self.start_perforce()



# 3.2. Interface to Bugzilla on Posix

class Bugzilla_posix_interface:

    # The Bugzilla server.
    server = None

    # Temporary directory for files created by this test.
    tempdir = None

    def __init__(self):
	# The default temporary file prefix starts with an '@'.  But
	# this means that temporary files will look like revision
	# specifications to Perforce.
	tempfile.gettempprefix = lambda: '%d.' % os.getpid()


    # 3.2.1. Restore Bugzilla
    #
    # Wipe out the existing Bugzilla database, if any, and restore a
    # known working one from a MySQL dump file.

    def restart_defect_tracker(self, restore_from_dump = 1):
        db = config.dbms_database
        user = config.dbms_user
        version = getattr(config, 'bugzilla_version', '2.10')
        mysqldump = getattr(config, 'bugzilla_mysqldump',
                            'bugzilla-%s-mysqldump' % version)
        os.system("mysqladmin -u %s --force drop %s > /dev/null"
                  % (user, db))
        os.system("mysqladmin -u %s create %s > /dev/null"
                  % (user, db))
        if restore_from_dump:
	    os.system("mysql -u %s %s < %s > /dev/null"
		      % (user, db, mysqldump))


    # 3.2.2. Kill running Perforce servers
    #
    # If there are any Perforce servers running on the magic port,
    # use p4 admin to kill them.

    def kill_running_perforce_servers(self):
        os.system("p4 -p %s -u %s -P '%s' admin stop > /dev/null"
                  % (config.p4_port, config.p4_user,
                     config.p4_password))


    # 3.2.3. Start a new Perforce server
    #
    # Make a completely fresh Perforce server, with a new repository.

    def start_perforce(self):
        # Make a new repository directory.
        self.tempdir = tempfile.mktemp()
        os.mkdir(self.tempdir)

        # Copy the license.
        src = open(config.p4_license_file, 'r')
        dest = open(os.path.join(self.tempdir, 'license'), 'w')
        dest.writelines(src.readlines())
        src.close()
        dest.close()

        # Work out Perforce's port number and start a Perforce server.
        match = re.match(".*:([0-9]+)$", config.p4_port)
        if match:
            port = int(match.group(1))
        else:
            port = 1666
        os.system("%s -d -p 127.0.0.1:%d -r %s > /dev/null"
                  % (config.p4_server_executable, port, self.tempdir))


    # 3.2.4. Restart Perforce
    #
    # By killing the old server and starting a new one.

    def restart_perforce(self):
        self.kill_running_perforce_servers()
        self.start_perforce()



# 4. P4DTI TEST CASE BASE CLASS
#
# The p4dti_base class is a generic P4DTI test case.  It defines methods
# for setting up the integration and some utilities for recording and
# interrogating the output of the replicator.
#
# Other P4DTI test cases will inherit from p4dti_base.
#
# When a class implements several test cases, the methods that implement
# test cases (in the PyUnit sense) should have names starting "test_".
# When a class implements a single test case, the method should be
# called "runTest".

class p4dti_base(p4dti_unittest.TestCase):

    # Defect tracker interface (an instance of one of the classes in
    # section 3 above).
    dti = eval(config.dt_name + '_' + os.name + '_interface')()

    # Messages written to the replicator's log.
    log_messages = []

    # Mail messages sent by the replicator.  Each message is a triple
    # (recipients, subject, body); see the definition of mail() method
    # in the replicator class.
    mail_messages = []

    # The replicator.
    r = None


    # 4.1. Snoop on the logger
    #
    # I want to be able to test that the correct messages are appearing
    # in the log, so I override the log method in the
    # logger.multi_logger class so that it records the messages as well
    # as printing them to the log file.  Printing to a log file is
    # necessary so that the test engineer can analyze what happened when
    # a test case fails.
    #
    # A disadvantage of this implementation is that the logging code is
    # not tested.  Therefore there needs to be a separate set of logging
    # tests.

    def snoop_logger(self):
        self.log_file = open(log_filename, 'a')
        logger.multi_logger.log = self.log
        logger.file_logger.log = self.log
        logger.sys_logger.log = self.log

    def log(self, msg):
        date = time.strftime('%Y-%m-%d %H:%M:%S UTC',
                             time.gmtime(time.time()))
        self.log_file.write("%s  %s\n" % (date, msg))
        self.log_file.flush()
        self.log_messages.append(msg)

    # The clear_log method can be used to clear this record of the log
    # before carrying out a test.
    def clear_log(self):
        self.log_messages = []
        self.mail_messages = []


    # 4.2. Snoop on e-mail

    def snoop_mail(self):
        import sys
        sys.modules['smtplib'] = self

    def log_separator(self):
        self.log_file.write("-" * 72)
        self.log_file.write("\n")

    def SMTP(self, server):
        self.log_separator()
        return self

    def quit(self):
        self.log_separator()

    def sendmail(self, from_address, to, text):
        self.mail_messages.append((to, text))
        self.log_file.write(text)


    # 4.3. Check that the log is as expected
    #
    # These methods make various checks on the messages recorded in the
    # log.

    def log_message_ids(self):
        return map(lambda m: m.id, self.log_messages)

    def expected_only(self, expected):
        for m in self.log_messages:
            assert m.id in expected, \
                   ("Unexpected message %d (%s) found in log %s"
                    % (m.id, str(m), self.log_message_ids()))

    def expected_only_in_range(self, min, max, expected):
        for m in self.log_messages:
            if m.id >= min and m.id <= max:
                assert m.id in expected, \
                       ("Unexpected message %d (%s) found in log %s"
                        % (m.id, str(m), self.log_message_ids()))

    def expected_not(self, unexpected):
        for m in self.log_messages:
            assert m.id not in unexpected, \
                   ("Unexpected message %d (%s) found in log %s"
                    % (m.id, str(m), self.log_message_ids()))

    def expected(self, expected):
        found = {}
        for m in self.log_messages:
            if found.has_key(m.id):
                found[m.id] = found[m.id] + 1
            else:
                found[m.id] = 1
        for id in expected:
            assert found.has_key(id) and found[id] > 0, \
                   ("Expected message %d not found in log %s"
                    % (id, self.log_message_ids()))
            found[id] = found[id] - 1

    def expectation(self, expected, maybe=[]):
        self.expected(expected)
        self.expected_only_in_range(800, 1000, expected + maybe)


    # 4.4. Set up everything so that test cases can run
    #
    # Get a fresh configuration, a new defect tracker and a new Perforce
    # server.

    def setup_everything(self, config_changes = {}):
        reset_configuration()
        for (key, value) in config_changes.items():
            setattr(config, key, value)
        self.dti.restart_defect_tracker()
        self.dti.restart_perforce()
        self.snoop_logger()
        self.snoop_mail()

        # Get a temporary Perforce interface suitable for setting the
        # Perforce password.
        if config.p4_password:
            p4i = p4.p4(port = config.p4_port,
                        user = config.p4_user,
                        client_executable = config.p4_client_executable,
                        logger = self )
            p4i.run('passwd -O "" -P "%s"' % config.p4_password)

        # Get a permanent Perforce interface using the password.
        self.p4 = p4.p4(port = config.p4_port,
                        user = config.p4_user,
                        password = config.p4_password,
                        client_executable = config.p4_client_executable,
                        logger = self )


    # 4.5. Initialize the replicator
    #
    # Load the init module.  If it's already loaded, reload it: we must
    # reload it because we want the init module to run again and to pick
    # up on the new configuration.  (Some other modules have the same
    # properties.)

    def initialize_replicator(self):
        self.mail_messages = []
        for m in ['teamtrack', 'init']:
            if sys.modules.has_key(m):
                del sys.modules[m]
        import init
        self.r = init.r
        # Regression test for job000199.
        assert self.mail_messages == []


    # 4.6. Normal replicator startup
    #
    # This is a pseudo test case.  It checks that the replicator can
    # start up normally.  Other tests may depend on this having been
    # run; for example, the check_nothing test (5.1).  This test must be
    # run before any other test cases, and therefore can't be part of a
    # test suite (the tests of which may be run singly or in any order).
    # It should therefore be run from the setUp() method of a subclass
    # of this class (unless of course that subclass has a different idea
    # of what should happen at startup).

    def check_startup(self):
        self.initialize_replicator()

        # Expect to get a startup e-mail
        self.clear_log()
        self.r.prepare_to_run()
        self.expectation([800, 866], [867, 870, 910])

        # Expect to set up issues and replicate them, but no jobs or
        # conflicts.
        self.poll()
        self.expectation([803, 804, 812, 911, 912])

        # Expect to see no issues replicated, and no issues actually
        # changed.
        self.poll()
        self.expected_not([803, 804, 812, 824])


    # 4.7. Exceptional replicator behaviour
    #
    # This method calls the given function but expects to get the
    # exception given by 'error', with message id 'msgid'.

    def check_exception(self, function, error, msgid):
        try:
            function()
        except:
            err, msg = sys.exc_info()[0:2]
            assert err == error, \
                   ("Expected error %s but got %s." % (error, err))
            assert msg.id == msgid, \
                   ("Expected message %d but got %d (%s)"
                    % (msgid, msg.id, str(msg)))
        else:
            self.fail("Expected error %s but didn't get it." % error)

    # check_startup_error() is a special case of check_exception for the
    # initialize_replicator method.

    def check_startup_error(self, error, msgid):
        self.check_exception(self.initialize_replicator, error, msgid)


    # 4.8. Check consistency
    #
    # This method checks that the databases are consistent.

    def check_consistency(self):
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 885], [883, 884, 890])


    # 4.9. Check replication of a single issue

    def check_replication_dt_to_p4(self, first_time = 0):
        self.poll()
        if first_time:
            self.expected([803]) # set up for replication
        self.expectation([804, 812, 911, 912], [803])

        # Nothing should come back from Perforce.
        self.poll()
        self.expected_only_in_range(800, 1000, [805, 825, 911, 912])


    # 4.10. Initialize Perforce repository
    #
    # This method sets up Perforce clients and workspaces for a set of
    # users, and adds a file to the repository.  It sets up the
    # following members:
    #
    # p4i: Map from user to a Perforce interface for that user.
    # workspace: Map from user to the client workspace for that user.
    #
    # This is only needed for tests involving Perforce fixes: that's why
    # it's not called in setup_everything().

    def setup_perforce(self, users):
        # Create Perforce interfaces and workspaces for the dummy users.
        self.p4i = {}
        self.workspace = {}
        for user in users:
            self.workspace[user] = tempfile.mktemp()
            os.mkdir(self.workspace[user])
            self.p4i[user] = p4.p4(
                port = config.p4_port,
                user = user,
                client = user + '_' + socket.gethostname(),
                client_executable = config.p4_client_executable,
                logger = self, )
            # Make the Perforce user record.
            p4_user = self.p4i[user].run('user -o')[0]
            p4_user['Email'] = ('%s@ravenbrook.com' % user)
            self.p4i[user].run('user -i', p4_user)
            # Make a Perforce client.
            client = self.p4i[user].run('client -o')[0]
            client['Root'] = self.workspace[user]
            self.p4i[user].run('client -i', client)

        # Add a file to the repository so that we have something with
        # which to make changes and fixes.
        user = users[0]
        self.filename = os.path.join(self.workspace[user], 'test')
        open(self.filename, 'w')
        self.p4i[user].run('add -t ktext %s' % self.filename)
        change = self.p4i[user].run('change -o')[0]
        change['Description'] = 'Added test file'
        self.p4i[user].run('submit -i', change)


    # 4.11. Make changelist in Perforce
    #
    # This method makes a changelist in Perforce and edits a file in
    # that changelist.  The changelist is returned without being
    # submitted.

    def edit_file(self, user):
        change = self.p4i[user].run('change -o')[0]
        change['Description'] = 'Edited test file'
        result = self.p4i[user].run('change -i', change)[0]
        assert result.has_key('data')
        match = re.match('^Change ([0-9]+) created', result['data'])
        assert match
        changelist = match.group(1)
        self.p4i[user].run('edit -c %s %s'
                           % (changelist, self.filename))
        open(self.filename, 'a').write("Foo\n")
        return self.p4i[user].run('change -o %s' % changelist)[0]


    # 4.12 Variant tests
    #
    # This class has variant methods for each defect tracker (e.g.,
    # TeamTrack, Bugzilla) and calls the appropriate one.

    def run_variant(self):
	try:
	    test = getattr(self, config.dt_name)
	except AttributeError:
            assert 0, "No test variant for " + config.dt_name + "."
        test()


    # 4.13. Poll the databases, expecting no errors

    expected_notices = [607]
    def poll(self):
        self.clear_log()
        self.r.carefully_poll_databases()
        for m in self.log_messages:
            if (isinstance(m, message.message)
                and m.id not in self.expected_notices):
                assert m.priority >= message.INFO, \
                       ("Expected no errors, but message '%s' has "
                        "priority %d." % (str(m), m.priority))


# 5. TEST CASES: NORMAL OPERATION
#
# If nothing has changed, then nothing happens when the replicator
# polls.  The databases are consistent.

class normal(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.check_startup()

    def runTest(self):
	"Startup, replication to Perforce (test_p4dti.normal)"
        self.poll()
        self.expected_only_in_range(800, 1000, [911, 912])
        self.check_consistency()


# 6. TEST CASES: INCORRECT CONFIGURATIONS
#
# This is a regression test of job000037, job000075 and job000116.

class bogus(p4dti_base):
    def setUp(self):
        self.setup_everything()


    # 6.1. An incorrect parameter generates an error
    #
    # This is a utility function for carrying out a range of tests.  It
    # resets the configuration, sets the parameter named by 'param' to
    # value, then tries to start the replicator.  It expects to get an
    # exception, whose message should have the message id 'msgid'.

    def check_param(self, param, value, msgid):
        reset_configuration()
        setattr(config, param, value)
        try:
            self.initialize_replicator()
        except:
            err, msg = sys.exc_info()[0:2]
            if isinstance(msg, message.message):
                if msg.id != msgid:
                    self.addFailure("Set parameter %s to '%s': "
                                    "expected message %d but got %d "
                                    "(%s)" % (param, value, msgid,
                                              msg.id, str(msg)))
            else:
                self.addFailure("Set parameter %s to '%s': expected "
                                "message %d but got '%s: %s' instead."
                                % (param, value, msgid, err, msg))
        else:
            self.addFailure("Set parameter %s to '%s': expected "
                            "message %d but didn't get it."
                            % (param, value, msgid))


    # 6.2. Basic errors in parameters are caught quickly
    #
    # Basic errors in parameters (wrong type, wrong format) should be
    # caught quickly.
    #
    # This is a table of (parameter name, bogus value, message id of
    # expected error).

    bogus_parameters = [
        # Regression for job000170:
        ('administrator_address', 'invalid e-mail address', 202),
        ('changelist_url', -1, 207),
        ('changelist_url', "http://invalid/%d/%s", 210),
        ('changelist_url', "http://invalid/no/format/specifier", 210),
        ('changelist_url', "http://invalid/%d/%%/%%%", 210),
        ('closed_state', -1, 208),
        ('configure_name', -1, 207),
        ('job_url', 42, 207),
        ('job_url', "http://invalid/%d/%s", 211),
        ('job_url', "http://invalid/no/format/specifier", 211),
        ('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211),
        ('log_file', -1, 208),
        ('log_level', 'not an int', 204),
        ('migrate_p', 'not a function', 203),
        ('p4_client_executable', -1, 207),
        # Regression test for job000158:
        ('p4_client_executable', 'no such file', 705),
        ('p4_user', None, 207),
        ('p4_password', -1, 207),
        ('p4_password', 'incorrectpassword', 706),
        ('p4_port', None, 207),
        # Regression test for job000158, job000202:
        ('p4_port', '127.0.0.1:9999', 707),
        ('p4_server_description', -1, 207),
        ('poll_period', 'not an int', 204),
        ('prepare_issue', 'not a function', 203),
        ('replicator_address', 'invalid@e-mail@address', 202),
        ('replicate_p', 'not a function', 203),
        ('replicate_job_p', 'not a function', 203),
        ('replicated_fields', 'not a list', 205),
        ('replicated_fields', ['not', 'list', 'of', 'strings', 0], 206),
        ('rid', -1, 207),
        ('rid', '0abc', 209),
        ('rid', 'ab-c', 209),
        ('sid', -1, 207),
        ('sid', 'abcdefg+z', 209),
        ('smtp_server', -1, 207),
        ('start_date', '2001-02-03 24-00-00', 201),
        ('use_deleted_selections', 'neither 0 nor 1', 200),
        ('use_deleted_selections', -1, 200),
        ('use_deleted_selections', 2, 200),
        ('use_perforce_jobnames', 'neither 0 nor 1', 200),
        ('use_perforce_jobnames', -1, 200),
        ('use_perforce_jobnames', 2, 200),
        ('use_stdout_log', 'neither 0 nor 1', 200),
        ('use_stdout_log', -1, 200),
        ('use_stdout_log', 2, 200),
        ]

    def test_parameters(self):
        import check_config
        for (param, value, msgid) in self.bogus_parameters:
            self.check_param(param, value, msgid)


    # 6.3. Basic errors in DT parameters are caught quickly
    #
    # As 6.2, but picks a set of tests based on config.dt_name.

    bogus_TeamTrack_parameters = [
        ('closed_state', 'not a TeamTrack state', 401),
        ('replicated_fields', ['PRIORITY', 'not TeamTrack state'], 403),
        ('replicated_fields', ['OWNER'], 404),
        ('replicated_fields', ['TITLE'], 404),
        ('replicated_fields', ['STATE'], 404),
        ('replicated_fields', ['PRIORITY', 'PRIORITY'], 405),
        ('replicated_fields', ['MULTISELECT'], 406),
        # Regression for job000003, job000140:
        ('replicated_fields', ['CODE'], 407),
        ('teamtrack_password', -1, 207),
        ('teamtrack_server', None, 207),
        ('teamtrack_user', None, 207),
        ('teamtrack_version', -1, 207),
        ('teamtrack_version', '0.8', 1003),
        ('use_windows_event_log', -1, 200),
        ('use_windows_event_log', 2, 200),
        ('use_windows_event_log', 'neither 0 nor 1', 200),
        ]

    bogus_Bugzilla_parameters = [
        ('bugzilla_directory', 'not a directory', 303),
        ('bugzilla_directory', '/', 304),
        ('closed_state', 'not a Bugzilla state', 301),
        ('dbms_database', -1, 207),
        ('dbms_host', -1, 207),
        # Regression for job000168:
        ('dbms_port', '1234', 204),
        ('dbms_user', -1, 207),
        ('dbms_password', -1, 207),
        ('migrated_user_password', -1, 207),
        # By using fake Perforce client executables we can check that
        # unsupported client and server versions are detected.
        # Regression test for job000173.
        ('p4_client_executable', './fake_p4.py', 704),
        ('p4_client_executable', './fake_p4d_old_changelevel.py', 834),
        ('p4_client_executable', './fake_p4d_no_changelevel.py', 835),
        ('replicated_fields', ['bug_status'], 311),
        ('replicated_fields', ['assigned_to'], 311),
        ('replicated_fields', ['short_desc'], 311),
        ('replicated_fields', ['resolution'], 311),
        ('replicated_fields', ['longdesc', 'longdesc'], 312),
        ('replicated_fields', ['not a Bugzilla field'], 313),
        ]

    def test_dt_parameters(self):
        params = getattr(self, 'bogus_%s_parameters' % config.dt_name)
        for (param, value, msgid) in params:
            self.check_param(param, value, msgid)


    # 6.4. Parameter errors are caught by the defect tracker
    #
    # Like 6.2 and 6.3 this test sets a parameter to an incorrect value.
    # In this case the error is caught by the defect tracker, so a
    # message object isn't returned, but rather a string, which we must
    # test directly rather than by message id.

    # These three authentication failure messages are from the APIs for
    # builds 4509, 5034, and 50101 respectively.
    TeamTrack_auth_messages = [
        "SERVER_ERROR: Authentication Failed.  "
        "Invalid user id or password",
        "SOCKET_READ_ERROR: Socket error.\n"
        "One reason might be database needs up upgrading.\n"
        "Check event viewer for complete error message.\n",
        "SOCKET_READ_ERROR: Authentication Failed.  "
        "Invalid user id, password, or licensing."
        ]

    erroneous_TeamTrack_parameters = [
        ('dt_name',
         'not a defect tracker',
	 'exceptions.ImportError',
         'No module named configure_not a defect tracker'),
        ('teamtrack_server',
         'host.invalid',
         'TeamShare API error',
         'SOCKET_CONNECT_FAILED: Socket Connect failed.'),
        ('teamtrack_user',
         'invalid user',
         'TeamShare API error',
         TeamTrack_auth_messages),
        ('teamtrack_password',
         'invalid password',
         'TeamShare API error',
         TeamTrack_auth_messages),
        ]

    erroneous_Bugzilla_parameters = [
        ('dt_name',
         'not a defect tracker',
	 'exceptions.ImportError',
         'No module named configure_not a defect tracker'),
        ('dbms_host',
         'host.invalid',
         '_mysql.OperationalError',
         '(2005, "Unknown MySQL Server Host \'host.invalid\' (2)")'),
        ('dbms_database',
         'invalid',
         '_mysql.OperationalError',
         '(1049, "Unknown database \'invalid\'")'),
        # ('dbms_port',
	#  25,
        #  '_mysql.OperationalError',
	#  -1),
        ('dbms_password',
	 'not the Bugzilla password',
         '_mysql.OperationalError',
	 '(1045, "Access denied for user:'),
        ('dbms_user',
	 'not the Bugzilla user',
         '_mysql.OperationalError',
	 '(1045, "Access denied for user:'),
        ]

    def test_dt_errors(self):
        params = getattr(self, 'erroneous_%s_parameters'
                         % config.dt_name)
        for (param, value, error, message_texts) in params:
            reset_configuration()
            setattr(config, param, value)
            try:
                self.initialize_replicator()
            except:
                (err, msg, _) = sys.exc_info()
                if str(err) != error:
                    self.addFailure("Set parameter %s to '%s': "
                                    "expected error '%s' but got "
                                    "error '%s'."
                                    % (param, value, error, str(err)))
                if isinstance(message_texts, types.ListType):
                    texts = message_texts
                else:
                    texts = [message_texts]
                found = 0
                for text in texts:
                    if str(msg)[0:len(text)] == text:
                        found = 1
                        break
                if not found:
                    self.addFailure("Set parameter %s to '%s': "
                                    "expected error message in %s but "
                                    "got '%s'."
                                    % (param, value, texts, msg))
            else:
                self.addFailure("Set parameter %s to %s: expected "
                                "error '%s' but there was no error."
                                % (param, value, error))

    def runTest(self):
	"Illegal configuration parameters (test_p4dti.bogus)"
        self.test_parameters()
        self.test_dt_parameters()
        self.test_dt_errors()


# 7. TEST CASE: EXISTING JOB IN PERFORCE
#
# The replicator should refuse to start if there's a job in Perforce.
#
# This is a regression test for job000219 and job000240.

class existing(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.initialize_replicator()

    def runTest(self):
	"Startup with an existing job (test_p4dti.existing)"
        j = self.p4.run('job -o')[0]
        j['Description'] = 'Test job'
        self.p4.run('job -i', j)
        self.check_exception(self.r.poll, self.r.error, 914)


# 8. TEST CASE: MOVING THE START DATE
#
#
# 8.1. Moving the start date backwards in time
#
# When start_date is set to the current time, no issues should be
# replicated when the replicator starts.  Similarly, refreshing Perforce
# has no effect.  But you can set start_date back in time and refresh
# Perforce, this time with effect.
#
# This is a regression test for job000047, job000050, job000221.

class start_1(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def runTest(self):
	"Moving the start_date backwards in time (test_p4dti.start_1)"
        config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
                                          time.localtime(time.time()))
        self.initialize_replicator()

        # When we poll, nothing should happen.
        self.clear_log()
        self.r.poll()
        self.expected_only_in_range(800, 1000, [911, 912])

        # Nor when we refresh.
        self.clear_log()
        self.r.refresh_perforce_jobs()
        self.expected_only_in_range(800, 1000, [911, 912])

        # The databases should report consistent.
        self.check_consistency()

        # Now set start date back in time and try refreshing again.
        config.start_date = "1971-01-01 00:00:00"
        self.initialize_replicator()
        self.clear_log()
        self.r.refresh_perforce_jobs()
        self.expectation([803, 804, 812])

        # The databases should still report consistent.
        self.check_consistency()


# 8.2. Moving the start date forwards in time
#
# Start up with an old start date as normal, then move the start date
# forwards in time.  The databases should still report consistent,
# because issues that were recorded as being replicated in the first
# poll should still be recorded as replicated, even though they haven't
# changed since the start date.
#
# This is a regression test for job000340.

class start_2(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.check_startup()

    def runTest(self):
	"Moving the start_date forwards in time (test_p4dti.start_2)"

        # Set start date forward in time and check the consistency.
        config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
                                          time.localtime(time.time()))
        self.initialize_replicator()

        # When we poll, nothing should happen.
        self.poll()
        self.expected([])

        # The databases should report consistent.
        self.check_consistency()


# 9. TEST CASE: REPLICATING BY PROJECT
#
# This is a regression test for job000107, job000112, job000311.

class project(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def runTest(self):
        "Replicate by project (test_p4dti.project)"
        self.run_variant()

    def TeamTrack(self):
        config.replicated_fields = ['PROJECTID']
        config.replicate_p = (lambda self: self['PROJECTID'] in [4,5])
        self.initialize_replicator()
        self.clear_log()
        self.r.poll()
        self.expectation([803, 804, 911, 912], [812])
        # The database has two projects called 'P4DTI', so we should get
        # a warning.  This is a regression test for job000311.
        self.expected([607])
        jobs = self.p4.run('jobs')
        projects = ['Elaborate editor', 'Clever compiler']
        for j in jobs:
            assert j['Project'] in projects, \
                   ("Job %s has project %s; shouldn't be replicated."
                    % (j['Job'], j['Project']))
        self.check_consistency()

    def Bugzilla(self):
        config.replicated_fields = ['product']
        config.replicate_p = lambda self: self['product'] == 'product 1'
        self.check_startup()
        jobs = self.p4.run('jobs')
        for j in jobs:
            assert j['Product'] == 'product 1\n', \
                   ("Job %s has product %s; shouldn't be replicated."
                    % (j['Job'], j['Product']))
        self.check_consistency()


# 10. ISSUE LIFE CYCLE TEST CASES
#
# This test creates issues and takes them through various kinds of
# lifecycle, checking that they are replicated correctly at each step.

class lifecycle(p4dti_base):
    user = 'rb'
    user1 = 'gdr'

    def setUp(self):
        self.setup_everything()
        self.setup_perforce([self.user, self.user1])
        self.check_startup()

    # Submit a new issue to the defect tracker.  Run a replication
    # cycle; check that the issue gets replicated to perforce.  Return a
    # pair of the defect tracker issue and the Perforce job.
    def submit(self, user):
        id = getattr(self, config.dt_name + '_submit')(user)

        # It gets replicated to Perforce.  This is a regression test for
        # job000233.
        self.check_replication_dt_to_p4(first_time = 1)

        # Check that the job has been created in Perforce.
        issue = self.r.dt.issue(id)
        assert issue
        jobname = issue.corresponding_id()
        job = self.p4.run('job -o %s' % jobname)[0]

        # Defect-tracker specific checks.
        getattr(self, config.dt_name + '_submitted')(issue, job)
        return issue, job

    # Assign the issue to the user in the defect tracker.  Run a
    # replication cycle; check that the assignment gets replicated.
    # Return the updated defect trakcer issue and Perforce job.
    def assign(self, issue, job, user):
        getattr(self, config.dt_name + '_assign')(issue, job, user)

        # It gets replicated to Perforce.
        self.check_replication_dt_to_p4()

        # Defect-tracker specific checks.
        issue = self.r.dt.issue(issue.id())
        job = self.p4.run('job -o %s' % job['Job'])[0]
        getattr(self, config.dt_name + '_assigned')(issue, job)
        return issue, job

    # The job has been closed in Perforce.  Check that the closure is
    # replicated to the defect tracker and that the expected messages
    # appear.
    def close(self, issue, job, expected):
        self.poll()
        self.expectation(expected + [911, 912])
        issue1 = self.r.dt.issue(issue.id())
        job1 = self.p4.run('job -o %s' % job['Job'])[0]
        getattr(self, config.dt_name + '_closed')(issue, job,
                                                  issue1, job1)

    def Bugzilla_submit(self, user):
        fields = {
            'reporter': 'nb@ravenbrook.com',
            'product': 'product 1',
            'version': 'unspecified',
            'component': 'component 1.1',
            'rep_platform': 'All',
            'op_sys': 'All',
            'priority': 'P1',
            'bug_severity': 'critical',
            'assigned_to': '',
            'cc': '',
            'bug_file_loc': '',
            'short_desc': 'Life cycle # test bug',
            'comment': 'Life cycle # test long description.',
            'submit': '    Commit    ',
            'form_name': 'enter_bug',
            'Bugzilla_password': 'p4dtitest',
            'Bugzilla_login': 'nb@ravenbrook.com',
            }
        h = httplib.HTTP(config.bugzilla_server)
        #h.set_debuglevel(100)
        h.putrequest('POST', config.bugzilla_path + 'post_bug.cgi')
        def url_param(field):
            return (urllib.quote_plus(field[0]) + '='
                    + urllib.quote_plus(field[1]))
        content = (string.join(map(url_param, fields.items()), '&')
                   + '\r\n')
        h.putheader('Content-Length', str(len(content)))
        h.endheaders()
        h.send(content)
        h.getreply()
	bugid = None
	lines = h.getfile().readlines()
        for l in lines:
	    match = re.search('<H2>Bug ([0-9]+) posted</H2>', l)
	    if match:
		bugid = match.group(1)
	if bugid == None:
	    self.fail("Tried to submit a bug to Bugzilla, but got the "
                      "following in reply: %s." % string.join(lines))

        return bugid

    def Bugzilla_submitted(self, bug, job):
        for (field, expected) in [('Status', 'bugzilla_new'),
                                  ('Summary', 'Life cycle # test bug\n'),
                                  ('Priority', 'P1'),
                                  ('Severity', 'critical'),
                                  ]:
            assert job[field] == expected, \
                   ("Expected new job %s to have %s '%s', but found "
                    "'%s'." % (job['Job'], field, expected, job[field]))

    def Bugzilla_assign(self, bug, job, user):
        # Assign the issue in Bugzilla.  Ideally this should go through
        # the Bugzilla web interface, but that's horrible.  So cheat for
        # now by going through the very low-level Bugzilla interface.
        # (This is horrible too!)
        bz_user = self.r.config.user_translator.translate_1_to_0(
            user, self.r.dt, self.r.dt_p4)
        changes = {'bug_status': 'ASSIGNED',
                   'assigned_to': bz_user}
        bug_id = bug['bug_id']
        self.r.dt.bugzilla.update_row('bugs', changes,
                                      'bug_id = %d' % bug_id)
        for k, v in changes.items():
            activity = {
                'bug_id': bug_id,
                'who': bz_user,
                'bug_when': self.r.dt.bugzilla.now(),
                'fieldid': self.r.dt.bugzilla.fieldid(k),
                self.r.dt.bugzilla.activity_old_field(): str(bug[k]),
                self.r.dt.bugzilla.activity_new_field(): str(v),
                }
            self.r.dt.bugzilla.insert_row('bugs_activity', activity)

        # Wait two seconds so that the bug will be picked up in the next
        # poll, not the one after.
        time.sleep(2)

    def Bugzilla_assigned(self, bug, job):
        # The status should now be assigned.
        job = self.p4.run('job -o %s' % job['Job'])[0]
        assert job['Status'] == 'assigned', \
               ("Expected assigned job %(Job)s to have status "
                "'assigned' but it has state %(Status)s." % job)

    def Bugzilla_closed(self, bug, job, bug1, job1):
        pass

    def TeamTrack_submit(self, user):
        import teamtrack

        # We need to be logged in to TeamTrack as a user other than the
        # replicator in order to make a change that isn't ignored.
        s = teamtrack.connect(user, '', config.teamtrack_server)

        # Submit a new case.
        case = s.new_record(s.case_table_id())
        case['TITLE'] = 'Life cycle # test title'
        case['DESCRIPTION'] = 'Life cycle # test description.'
        case['ISSUETYPE'] = 1 # Bug report
        case['PROJECTID'] = 4 # Editor
        case['SEVERITY'] = 45
        # Regression test for job000182:
        case['ESTTIMETOFIX'] = 7199   # 1:59:59
        case['ACTTIMETOFIX'] = 445556 # 123:45:56
        # Regression test for job000379 and job000381 (note that date is
        # in the summer so DST is in effect):
        case['CLOSEDATE'] = 962532245 # 2000-07-02 03:04:05
        # Regression test for job000416.
        case['FIXED_POINT'] = 16.1 # Represented by 16.100000000000001
        # Regression test for job000442.
        case['SINGLE_LINE'] = 'Life cycle # test line'
        return str(case.submit(user))

    def TeamTrack_submitted(self, case, job):
        # Regression test for job000182.
        assert case['ESTTIMETOFIX'] == 7199
        assert case['ACTTIMETOFIX'] == 445556

        for (field, expected) in [('State', '_new'),
                                  ('Title', 'Life cycle # test title\n'),
                                  ('Description', 'Life cycle # test '
                                   'description.\n'),
                                  ('Est._Time_to_Fix', '1:59:59'),
                                  ('Actual_Time_to_Fix', '123:45:56'),
                                  ('Fixed_Point', '16.10'),
                                  ('Single_Line', 'Life cycle # test line\n'),
                                  ]:
            assert job[field] == expected, \
                   ("Expected new job %s to have %s '%s', but found "
                    "'%s'." % (job['Job'], field, expected, job[field]))

    def TeamTrack_assign(self, case, job, user):
        import teamtrack

        # We need to be logged in to TeamTrack as a user other than the
        # replicator in order to make a change that isn't ignored.
        s = teamtrack.connect(user, '', config.teamtrack_server)
        r = s.read_record(s.case_table_id(), case['ID'])

        # Find the assign transition.
        assign = s.query(teamtrack.table['TRANSITIONS'],
                         "TS_NAME = 'Assign'")[0]['ID']

        # Assign the issue in TeamTrack.
        r.transition(user, assign)

    def TeamTrack_assigned(self, case, job):
        # The state should now be assigned.
        job = self.p4.run('job -o %s' % job['Job'])[0]
        assert job['State'] == 'assigned', \
               ("Expected assigned job %(Job)s to have state "
                "'assigned' but it has state %(State)s." % job)

        return case, job

    def TeamTrack_closed(self, case, job, case1, job1):
        # The closed job gets replicated back to TeamTrack.  TeamTrack
        # will change the owner field and the replicator will replicate
        # that back.  This is a regression test for job000053.
        assert job1['Owner'] != job['Owner'], \
               ("Closed job %(Job)s but owner is still %(Owner)s."
                % job1)

    def runTest(self):
        "Issue life cycle (test_p4dti.lifecycle)"

        # 10.1. Simple issue lifecycle
        #
        # This is a simple issue cycle:
        #
        #   1. An issue is submitted to the defect tracker.
        #   2. It is replicated to Perforce.
        #   3. It gets assigned to a developer.
        #   4. It is closed in Perforce (by editing the job).
        #   5. The closure gets replicated back to the defect tracker.
        issue, job = self.submit(self.user)
        issue, job = self.assign(issue, job, self.user)

        # Close the job in Perforce.  This is a regression test for
        # job000118.
        job[self.r.config.job_status_field] = 'closed'
        self.p4.run('job -i', job)
        self.close(issue, job, [805, 824, 826])
        self.poll()


        # 10.2. Issue lifecycle (fix in Perforce)
        #
        # This is an issue cycle in which the issue is associated with a
        # changelist in Perforce:
        #
        #   1. An issue is submitted to the defect tracker.
        #   2. It is replicated to Perforce.
        #   3. It gets assigned to a developer.
        #   4. It is closed in Perforce (by making a fix).
        #   5. Closure and fix get replicated back to the defect
        #      tracker.
        #
        # This is a regression test for job000133.
        issue, job = self.submit(self.user)
        issue, job = self.assign(issue, job, self.user)
        self.p4.run('fix -c 1 %s' % job['Job'])
        self.close(issue, job, [802, 805, 819, 820, 824, 826])
        self.poll()


        # 10.3. Issue lifecycle (fix on submission in Perforce)
        #
        # This tests an issue lifecycle in which the issue is associated
        # with a pending changelist and closed on submission.
        #
        #   1. An issue is submitted to the defect tracker.
        #   2. It is replicated to Perforce.
        #   3. It gets assigned to a developer.
        #   4. The job description is edited in Perforce (this is a
        #      regression test for job000362).
        #   5. A fix is made with a pending changelist.
        #   6. The change is submitted.
        #   7. Job, fix get replicated back to the defect tracker.
        #
        # This is a regression test for job000225.
        issue, job = self.submit(self.user)
        issue, job = self.assign(issue, job, self.user)
        job['Description'] = job['Description'] + '\nEdited.'
        self.p4i[self.user].run('job -i', job)
        self.poll()
        self.expectation([805, 824, 911, 912], [826])
        change = self.edit_file(self.user)
        self.p4i[self.user].run('fix -c %s %s'
                                % (change['Change'], job['Job']))
        self.p4i[self.user].run('submit -c %s' % change['Change'])
        self.close(issue, job, [802, 805, 819, 820, 824, 826])
        self.poll()

        # Delete fix in Perforce and check that the deletion is
        # replicated (regression test for job000013 and job000222).
        self.p4i[self.user].run('fix -d -c %s %s'
                                % (change['Change'], job['Job']))
        self.poll()
        self.expected([818])
        fixes = self.p4.run('fixes -j %s' % job['Job'])
        assert len(fixes) == 0, ("Expected no fixes for %s, but found "
                                 "%s." % (job['Job'], fixes))

        # 10.4. Issue lifecycle (fix to assigned on submission in
        # Perforce)
        #
        # As section 10.3, but the fix is to the job's current state
        # rather than "closed", so the job state doesn't change.
        # Regression test for job000007.
        issue, job = self.submit(self.user)
        issue, job = self.assign(issue, job, self.user)
        change = self.edit_file(self.user)
        status = job[self.r.config.job_status_field]
        self.p4i[self.user].run('fix -s %s -c %s %s'
                                % (status, change['Change'],
                                   job['Job']))
        self.p4i[self.user].run('submit -c %s' % change['Change'])
        self.poll()
        self.expectation([802, 805, 819, 820, 825, 911, 912])

        # Change fix status (only -- note that we set the job status
        # back); check that it's replicated.
        self.p4i[self.user].run('fix -s closed -c %s %s'
                                % (change['Change'], job['Job']))
        self.p4i[self.user].run('job -i', job)
        self.poll()
        self.expectation([805, 819, 821, 825, 911, 912])


        # 10.5. Simultaneous edit
        #
        # Provoke a conflict by changing an issue simultaneously in both
        # systems.  Then try again, but with a user other than the job
        # owner.  (Check that mail goes to both.)
        #
        # Make, update and delete a fix in Perforce simultaneously with
        # the change.  Make sure the fixes are respectively deleted,
        # updated and restored.  Ditto for filespecs.
        change1 = self.edit_file(self.user) # make
        change2 = self.edit_file(self.user) # update
        change3 = self.edit_file(self.user) # delete
        self.poll()
        for u in [self.user, self.user1]:
            issue, job = self.submit(self.user)
            status = job[self.r.config.job_status_field]
            # Make the fixes that we are going to update and delete.

            self.p4i[self.user].run('fix -s %s -c %s %s'
                                    % (status, change2['Change'],
                                       job['Job']))
            self.p4i[self.user].run('fix -s %s -c %s %s'
                                    % (status, change3['Change'],
                                       job['Job']))
            job['P4DTI-filespecs'] = 'filespec_1\n'
            self.p4i[self.user].run('job -i', job)
            self.poll()
            # Now edit the job simultaneously in DT and Perforce.
            getattr(self, config.dt_name + '_assign')(issue, job,
                                                      self.user)
            job[self.r.config.job_status_field] = 'closed'
            job['P4DTI-filespecs'] = 'filespec_2\n'
            self.p4i[u].run('job -i', job)
            # Make, update and delete those fixes.
            self.p4i[u].run('fix -c %s %s'
                            % (change1['Change'], job['Job']))
            self.p4i[u].run('fix -c %s %s'
                            % (change2['Change'], job['Job']))
            self.p4i[u].run('fix -d -c %s %s'
                            % (change3['Change'], job['Job']))
            self.clear_log()
            self.r.carefully_poll_databases()
            self.expectation([800, 806, 811, 814, 815, 816, 817, 841,
                              860, 861, 853, 862, 812, 910, 911, 912])
            issue = self.r.dt.issue(issue.id())
            job = self.p4.run('job -o %s' % job['Job'])[0]
            getattr(self, config.dt_name + '_assigned')(issue, job)
            self.poll()
            fixes = self.p4.run('fixes -j %s' % job['Job'])
            assert job['P4DTI-filespecs'] == 'filespec_1\n'

            assert len(fixes) == 2
            assert fixes[0]['Change'] == change3['Change']
            assert fixes[0]['Status'] == status
            assert fixes[1]['Change'] == change2['Change']
            assert fixes[1]['Status'] == status
            for m in self.log_messages:
                if m.id == 800:
                    assert string.find(m.text,
                                       config.administrator_address)
                    assert string.find(m.text, self.user) != -1
                    assert string.find(m.text, u) != -1


        # 10.6. Illegal changes
        #
        # Regression test for job000429.
        tests = getattr(self, config.dt_name + '_illegal_changes')
        for field, value, expected in tests:
            issue, job = self.submit(self.user)
            issue, job = self.assign(issue, job, self.user)
            job[field] = value
            self.p4i[self.user].run('job -i', job)
            self.clear_log()
            self.r.carefully_poll_databases()
            self.expectation(expected + [800, 805, 811, 851, 860, 861,
                                         862, 812, 852, 853, 910, 911,
                                         912],
                             [824, 923])
            self.poll()

        # We're done; one last check for luck.
        self.check_consistency()

    Bugzilla_illegal_changes = [
        ('Product', 'no_such_product', [504]),
        ('Description', 'foo', [505]),
        ]

    TeamTrack_illegal_changes = [
        ('State', 'verified', [614]),
        ('Est._Time_to_Fix', 'foo', [619]),
        ('Actual_Time_to_Fix', 'foo', [619]),
        ('Additional_Notes', 'foo', [633]),
        ]


# 11. P4DTI CONFIGURATION DATABASE
#
# This checks that parameters get added and removed from the
# configuration database.  This is a regression test for job000169 and
# job000351.

class configdb(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def TeamTrack_config_item(self, item):
        import teamtrack
        query = ("TS_TYPE = 4 AND TS_CHAR1 = '%s' AND TS_CHAR2 = "
                 "'%s'" % (config.rid, item))
        rr = self.dti.server.query(teamtrack.table['VCACTIONS'], query)
        for r in rr:
            dict = eval(r['FILENAME'])
            if (dict.has_key('sid') and dict['sid'] == config.sid
                and dict.has_key('description')):
                return dict['description']
        return None

    TeamTrack_config_map = { 'p4_server_description': 'SERVER',
                             'job_url': 'JOB_URL',
                             'changelist_url': 'CHANGELIST_URL' }

    def TeamTrack_config(self):
        cf = {}
        for k, v in self.TeamTrack_config_map.items():
            cf[k] = self.TeamTrack_config_item(v)
        return cf

    def Bugzilla_config(self):
        return self.r.dt.bugzilla.get_config()

    def check(self, cf1):
        reset_configuration()
        for k, v in cf1.items():
            setattr(config, k, v)
        self.initialize_replicator()
        cf2 = getattr(self, config.dt_name + '_config')()
        for k, v in cf1.items():
            if v != cf2.get(k):
                self.addFailure("Set parameter %s to '%s', but found "
                                "'%s' in the config database."
                                % (k, v, cf2.get(k)))


    def runTest(self):
        "Replicator configuration database (test_p4dti.configdb)"
        cf1 = { 'p4_server_description': 'spong',
                'changelist_url': 'http://spong/changelist?%d',
                'job_url': 'http://spong/job?%s' }
        cf2 = { 'p4_server_description': 'spong',
                'changelist_url': None,
                'job_url': None }
        self.check(cf1)
        self.check(cf2)
        self.check(cf1)


# 12. INCONSISTENCIES
#
# This test case checks that each kind of inconsistency that can be
# reported by the consistency checking script is detected and reported.

class inconsistencies(p4dti_base):
    user = 'rb'

    def setUp(self):
        self.setup_everything()
        self.setup_perforce([self.user])
        self.check_startup()

    def runTest(self):
        "Consistency check failures (test_p4dti.inconsistencies)"

        # 12.1. Unreplicated fix in Perforce
        change = self.edit_file(self.user)
        job1 = self.p4.run('jobs')[0]
        self.p4i[self.user].run('fix -s %s -c %s %s'
                                % (job1[config.job_status_field],
                                   change['Change'], job1['Job']))
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 878, 886, 890], [883, 884])
        self.poll()

        # 12.2. Fix with wrong status in Perforce
        job1status = job1[config.job_status_field]
        assert job1status != 'assigned'
        self.p4i[self.user].run('fix -s assigned -c %s %s'
                                % (change['Change'], job1['Job']))
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 880, 886, 890], [883, 884])
        self.p4i[self.user].run('fix -s %s -c %s %s'
                                % (job1status, change['Change'],
                                   job1['Job']))

        # 12.3. Changed job in Perforce
        job1['Description'] = job1['Description'] + '...\n'
        self.p4i[self.user].run('job -i', job1)
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 875, 886, 890], [883, 884])
        self.poll()

        # 12.4. Unreplicated job in Perforce
        #
        # This is a regression test for job000372.
        job4 = self.p4i[self.user].run('job -o')[0]
        for k, v in job4.items():
            if string.find(v, '<enter description here>') == 0:
                job4[k] = 'Test!'
	job4['P4DTI-rid'] = config.rid
        job4['P4DTI-issue-id'] = "999999"
        results = self.p4i[self.user].run('job -i', job4)
        job4name = string.split(results[0]['data'])[1]
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 882, 886, 890], [883, 884])

        # 12.5. Unreplicated job pointing to replicated issue
        issueid = self.p4.run('jobs')[0]['P4DTI-issue-id']
        job5 = self.p4i[self.user].run('job -o %s' % job4name)[0]
        job5['P4DTI-issue-id'] = issueid
        self.p4i[self.user].run('job -i', job5)
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 881, 886, 890], [883, 884])
        self.p4i[self.user].run('job -d %s' % job5['Job'])

        # 12.6. Missing job in Perforce
        job6 = self.p4.run('jobs')[0]
        self.p4i[self.user].run('job -d %s' % job6['Job'])
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 873, 886, 890], [883, 884])

        # 12.7. Job pointing to wrong issue
        #
        # We also get "(P4DTI-875X) Job '%s' would need the following
        # set of changes ..." because P4DTI-issue-id is wrong, and
        # "(P4DTI-8793) Change %d fixes issue '%s' but there is no
        # corresponding fix for job '%s'." because we added a fix and
        # replicated it in section 12.1 above but now it's missing in
        # Perforce.
        id = job6['P4DTI-issue-id']
        job6['P4DTI-issue-id'] = '999999'
        self.p4i[self.user].run('job -i', job6)
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 874, 875, 879, 887, 890], [883, 884])
        job6['P4DTI-issue-id'] = id

        # Put things back to rights, and add a filespec in preparation
        # for section 12.8 below.
        job6['P4DTI-filespecs'] = 'filespec_1\n'
        self.p4i[self.user].run('job -i', job6)
        self.poll()
        self.check_consistency()

        # 12.8. Incorrect filespecs
        #
        # By changing the filespec we can provoke messages about
        # filespecs being missing in both sides.
        job6['P4DTI-filespecs'] = 'filespec_2\n'
        self.p4i[self.user].run('job -i', job6)
        self.clear_log()
        self.r.check_consistency()
        self.expectation([871, 876, 877, 887, 890], [883, 884])

        # Polling should sort everything out.
        self.poll()
        self.check_consistency()


# 13. RACE DURING REPLICATION OF FIXES
#
# This is a regression test for job000385.

class race_385(p4dti_base):
    user = 'rb'
    change = None
    original_fixes_differences = None
    race_flag = None

    def setUp(self):
        self.setup_everything()
        self.setup_perforce([self.user])
        self.check_startup()

    def race(self, dt_fixes, p4_fixes):
        fix_diffs = self.original_fixes_differences(dt_fixes, p4_fixes)
        # Submit the change (this is the race).  But only once.
        if self.race_flag == 0:
            self.p4i[self.user].run('submit -c %s'
                                    % self.change['Change'])
            self.race_flag = 1
        return fix_diffs

    def runTest(self):
        "Race during replication of fixes (test_p4dti.race_385)"

        # Create a pending change in Perforce and make a fix to that
        # change.
        self.change = self.edit_file(self.user)
        job = self.p4.run('jobs')[0]
        self.p4i[self.user].run('fix -s %s -c %s %s'
                                % (job[config.job_status_field],
                                   self.change['Change'], job['Job']))

        # Create a second pending change in Perforce (so that the first
        # change will get renumbered when submitted).
        change_2 = self.edit_file(self.user)

        # We'll make sure that the change gets submitted after
        # fixes_differences gets called -- this is our last opportunity
        # to run the race before the possibly illegal call to p4 change
        # -o.
        self.original_fixes_differences = self.r.fixes_differences
        self.r.fixes_differences = self.race
        self.race_flag = 0

        # Replicate.
        self.poll()

        # Restore the original fixes_differences.
        self.r.fixes_differences = self.original_fixes_differences

        # Check that the replication succeeded.
        self.expectation([802, 805, 819, 820], [825, 911, 912])
        self.check_consistency()


# 14. CAN CONFIRM AN UNCONFIRMED BUG
#
# This is a regression test for job000262 and job000410.  Bugzilla only.

class unconfirmed(p4dti_base):
    bug_id = 27     # this bug is UNCONFIRMED in our test database
    user = 'rb'

    def setUp(self):
        self.setup_everything()
        self.setup_perforce([self.user])
        self.check_startup()

    def runTest(self):
        "Confirm an UNCONFIRMED bug (test_p4dti.unconfirmed)"

        job = self.p4.run('job -o bug%d' % self.bug_id)[0]
        assert job['Status'] == 'unconfirmed'
        job['Status'] = 'bugzilla_new'
        self.p4i[self.user].run('job -i', job)

        # Replicate.
        self.poll()

        # Check that the replication succeeded.
        self.expectation([805, 824], [911, 912])
        self.check_consistency()


# 15. REPLICATE NEW JOBS FROM PERFORCE
#
# This checks that a new job in Perforce can be submitted to the defect
# tracker and successfully replicated thereafter.  This is a regression
# test for job000036.

class new_p4_job(p4dti_base):
    user = 'rb'

    TeamTrack_job = {
        'Title': 'test title',
        'Owner': user,
        'Description': 'test description',
        'Priority': 'medium',
        'Severity': 'medium',
        'Project': 'Elaborate editor',
        }

    Bugzilla_job = {
        'Summary': 'test summary',
        'Assigned_To': user,
        'Description': 'test description',
        'Priority': 'P3',
        'Severity': 'normal',
        'Product': 'product 1',
        }

    def TeamTrack_prepare_issue(_, issue, job):
        pass

    def Bugzilla_prepare_issue(_, issue, job):
        default_component = {
            'product 1': 'component 1.1',
            'product 2': 'component 2.1',
            'unconfirmed': 'unconf1',
            'group': 'group1',
            }
        if issue['product'] == '':
            issue['product'] = 'product 1'
        if issue['component'] == '':
            issue['component'] = default_component[issue['product']]
        if issue['version'] == '':
            issue['version'] = 'unspecified'

    def setUp(self):
        self.setup_everything()
        config.prepare_issue = getattr(self, config.dt_name
                                       + '_prepare_issue')
        config.replicate_job_p = lambda(job): 1
	if config.dt_name == 'TeamTrack':
            config.replicated_fields = (config.replicated_fields
                                        + ['PROJECTID'])
        self.setup_perforce([self.user])
        self.check_startup()

    def runTest(self):
        "Replicate a new job from Perforce (test_p4dti.new_p4_job)"

        # Add a job to Perforce.
	job = self.p4i[self.user].run('job -o')[0]
	for k, v in getattr(self, config.dt_name + '_job').items():
	    job[k] = v
        result = self.p4i[self.user].run('job -i', job)[0]['data']
        jobname = string.split(result)[1]

        # Replicate it.
        self.poll()
        self.expected([892, 894])
        self.expected_only_in_range(800, 914,
                                    [826, 892, 894, 896, 911, 912])

        # Check that the job is replicated.
        self.check_consistency()
        job = self.p4.run('job -o %s' % jobname)[0]
        assert job['P4DTI-rid'] == config.rid


# 16. WINDOWS NT SERVICE AND EVENT LOG
#
# This test works for TeamTrack only.  It is a regression test for
# job000046 and job000149.

class nt_service(p4dti_base):

    # Service name in the Windows registry.
    _svc_name_ = 'p4dti_service'

    # If the service can't replicate its initial batch of issues in five
    # minutes, we'll consider it to have failed (this depends on the
    # test database not having too many issues).
    timeout = 300

    # Number of polls before we expect consistency to be achieved.
    polls_for_consistency = 3

    def setUp(self):
        reset_configuration()
        self.dti.restart_perforce()
        self.dti.restart_defect_tracker()
        self.snoop_logger()
        self.initialize_replicator()

        # Handle on Service Manager.
        import win32service
        access_level = win32service.SC_MANAGER_ALL_ACCESS
        self.hscm = win32service.OpenSCManager(None, None, access_level)

        # Handle on Event Log.
        import win32evtlog
        self.hevt = win32evtlog.OpenEventLog(None, 'Application')

        # Establish configuration environment for service. We want the
        # current configuration with the following changes: NT Event
        # logging must be set; logging level is DEBUG; email from the
        # replicator is supressed.
        controls = (('P4DTI_CONFIG',    os.path.abspath(config_file)),
                    ('P4DTI_EVTLOG',    ''),
                    ('P4DTI_LOGLEVEL',  str(message.DEBUG)),
                    ('P4DTI_ADMINADDR', ''),
                    )
        for key, value in controls:
            os.environ[key] = value

        # If the service is already there, remove it.
        if self.query_service():
            self.remove_service()

    def tearDown(self):
        import win32service
        import win32evtlog
        win32service.CloseServiceHandle(self.hscm)
        win32evtlog.CloseEventLog(self.hevt)

    # Return None if service not currently installed. Otherwise
    # return currentState field as reported by Service Manager.
    def query_service(self):
        import win32service
        type_filter = win32service.SERVICE_WIN32
        state_filter = win32service.SERVICE_STATE_ALL
        query = win32service.EnumServicesStatus

        # List all registered services
        services = query(self.hscm, type_filter, state_filter)

        # Is ours there?
        for svc_name, description, status in services:
            if svc_name == self._svc_name_:
                # Looks like it was.
                currentState = status[1]
                return currentState

        # Looks like it wasn't.
        return None

    def wait(self, function):
        timeout = self.timeout
        while timeout > 0:
            if function():
                return 1
            time.sleep(1)
            timeout = timeout - 1
        return 0

    def wait_for_status(self, status):
        query = self.query_service
        return self.wait(lambda query=query, status=status:
                         query() == status)

    def read_event_log_first_time(self):
        import win32evtlog
        hevt = self.hevt
        oldest_record = win32evtlog.GetOldestEventLogRecord(hevt)
        record_count = win32evtlog.GetNumberOfEventLogRecords(hevt)
        newest_record = oldest_record + record_count - 1

        # Random access into the Event Log.
        read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ +
                      win32evtlog.EVENTLOG_SEEK_READ)
        # Read newest event and ignore it. This has the required
        # side-effect of setting the handle's reading position in the
        # log, ready for future reads.
        win32evtlog.ReadEventLog(hevt, read_flags, newest_record)

    def wait_for_event_log(self, msg):
        return self.wait(lambda self=self, msg=msg:
                         self.match_event_log(msg))

    # Read all the log entries that have been written since last
    # time. It's remotely possible that we might flush the message
    # we're looking for twice in one call to ReadEventLog, but this is
    # acceptable as it is harmless to pool again.
    def match_event_log(self, msg):
        import win32evtlog
        hevt = self.hevt

        # Sequential access from where we left off, last time we read
        # on this handle.
        read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ +
                      win32evtlog.EVENTLOG_SEQUENTIAL_READ)

        import catalog
        # This is the string we're looking for.
        wanted = str(catalog.msg(msg))

        # We have to make an undeterminable number of reads to exhaust
        # the log. Each read will return some undeterminable number of
        # records. (I can't tell from documentation whether zero is a
        # possible number in cases where there are records to be
        # read. This doesn't matter, as we're prepared to come back
        # several times.)
        records = win32evtlog.ReadEventLog(hevt, read_flags, 0)
        while records:
            for record in records:
                message = record.StringInserts[0]
                if message == wanted:
                    return 1
            records = win32evtlog.ReadEventLog(hevt, read_flags, 0)
        return 0


    # Simplify hook into service.main()
    def main(self, *args):
        import service
        service.main([''] + list(args))

    # The next four methods have to use service.py, as that's what
    # we're testing.

    def install_service(self):
        self.main()

    def remove_service(self):
        self.main('remove')

    def start_service(self):
        self.main('start')

    def halt_service(self):
        self.main('stop')

    def runTest(self):
        "Manage NT service (test_p4dti.nt_service)"
        import win32service

        # Install the service and ensure that it's there.
        self.install_service()
        assert self.query_service()

        # Start it running.
        self.read_event_log_first_time()
        self.start_service()
        assert self.wait_for_status(win32service.SERVICE_RUNNING)

        # Allow the replication to happen.
        for i in range(self.polls_for_consistency):
            # (message.DEBUG, "Poll finished.")
            assert self.wait_for_event_log(912)

        # Halt the service.
        self.halt_service()
        assert self.wait_for_status(win32service.SERVICE_STOPPED)

        # Remove service and ensure that it's gone away.
        self.remove_service()
        assert not self.query_service()

        # Check that replication succeeded.
        self.check_consistency()


# 17. MIGRATION
#
# This tests that jobs in Perforce can be migrated from the default
# Perforce jobspec to the defect tracker and replicated thereafter.
# This is a regression test for job000022, job000249, job000422 and
# job000426.

class migrate(p4dti_base):
    n_jobs = 20
    users = ['rb', 'nb', 'gdr']
    states = ['open', 'closed', 'suspended']
    fixes = {}

    def setUp(self):
        self.setup_everything()
        for param in ["migrate_p", "translate_jobspec",
                      "prepare_issue"]:
            setattr(config, param,
                    getattr(self, config.dt_name + "_" + param))
        self.setup_perforce(self.users)
        self.initialize_replicator()
        self.create_jobs()

    def create_jobs(self):
        self.fixes = {}
        # Create some jobs in Perforce.
        for i in range(self.n_jobs):
            job = self.p4.run('job -o')[0]
            user = self.users[i % len(self.users)]
            status = self.states[i % len(self.states)]
            changes = {
                'Description': ("First line of job %d\n"
                                "Remainder of job %d\n"
                                "Blah blah blah...\n" % (i, i)),
                'User': user,
                'Status': status,
                }
            self.r.update_job(job, changes)
            self.p4i[user].run('fix -s %s -c 1 %s'
                               % (status, job['Job']))
            self.fixes[job['Job']] = status

    def runTest(self):
        "Migration from Perforce jobs (test_p4dti.migrate)"
        self.clear_log()
        self.r.migrate()
        self.expected([892, 895])
        self.expected_only_in_range(800, 914, [802, 819, 820, 892, 895])
        self.r.refresh_perforce_jobs()
        self.r.poll()
        self.check_consistency()

        # Check that no fixes have been lost, changed or added.
        # Regression test for job000271.
        for f in self.p4.run('fixes'):
            if not self.fixes.has_key(f['Job']):
                self.addFailure("Found unexpected fix for job '%s'."
                                % f['Job'])
            else:
                if self.fixes[f['Job']] != f['Status']:
                    self.addFailure("Expected fix for job '%s' to have "
                                    "status '%s', but found '%s'."
                                    % (f['Job'], self.fixes[f['Job']],
                                       f['Status']))
                del self.fixes[f['Job']]
        for j in self.fixes.keys():
            self.addFailure("Expected a fix for job '%s', but didn't "
                            "find it." % j)

    def TeamTrack_migrate_p(_, job):
        return 1

    def TeamTrack_prepare_issue(_, issue, job):
        if issue['ISSUETYPE'] == 0:
            issue['ISSUETYPE'] = 1
        if issue['PROJECTID'] == 0:
            issue['PROJECTID'] = 4

    def TeamTrack_translate_jobspec(_, job):
        desc = job.get("Description", "")
        newline = string.find(desc, "\n")
        job["Title"] = desc[:newline]
        job["Description"] = desc[newline+1:]
        job["Owner"] = job.get("User", "(None)")
        status = job.get("Status", "open")
        if status == "open":
            job["State"] = "assigned"
        else:
            job["State"] = "verified"
        return job

    def Bugzilla_migrate_p(_, job):
        return 1

    def Bugzilla_prepare_issue(_, issue, job):
        issue["product"] = "product 1"
        issue["component"] = "component 1.1"
        issue["version"] = "unspecified"

    def Bugzilla_translate_jobspec(_, job):
        desc = job.get("Description", "")
        newline = string.find(desc, "\n")
        job["Summary"] = desc[:newline]
        job["Description"] = desc[newline+1:]
        job["Assigned_To"] = job.get("User", "")
        status_map = {
            "open": ("assigned", ""),
            "closed": ("closed", "FIXED"),
            "suspended": ("closed", "LATER"),
            }
        (status, resolution) = status_map[job.get("Status", "open")]
        job["Status"] = status
        job["Resolution"] = resolution
        job["Severity"] = "blocker"
        job["Priority"] = "P1"
        return job


# 18. BUGZILLA PARAMETERS
#
# Regression test for job000352.  Bugzilla only.

class bugzilla_params(p4dti_base):

    # In this test we will need to pick and use an administrator's
    # password for Bugzilla. Here it is.
    dbms_admin_passwd = 'p4dti'

    # The following files need to be removed each time we fake an
    # installation of Bugzilla. When we are done we ensure that they
    # are world-writable, so that other users aren't locked out of the
    # test suite.
    data_files = ('localconfig',
                  'data/params',
                  )

    # These are the parameters in which we are most interested, because
    # customers have been asking for them.
    regression_parameters = ('emailsuffix',
                             )

    # Temporary directory where we make a Bugzilla installation.
    tempdir = None

    # Name of Bugzilla under test: a string like 'bugzilla-2.14'.
    bugzilla_name = None

    def setUp(self):
        reset_configuration()
        self.tempdir = tempfile.mktemp()
        self.bugzilla_name = 'bugzilla-%s' % config.bugzilla_version
        config.bugzilla_directory = os.path.join(self.tempdir,
                                                 self.bugzilla_name)
        os.mkdir(self.tempdir)
        bz_path = os.path.abspath(os.path.join(
            os.getcwd(), os.pardir, 'code', self.bugzilla_name))
        os.system('cp -R %s %s' % (bz_path, self.tempdir))

    def runTest(self):
        "Bugzilla parameters (test_p4dti.bugzilla_params)"
        self.test_fresh_installation()
        self.test_previous_installation()

    def test_fresh_installation(self):
        self.walk_through_install(0)

    def test_previous_installation(self):
        self.walk_through_install(1)

    def walk_through_install(self, enable_p4dti_first):
        import random
        random_number = random.randint(1,1000000)

        # 1. We remove and reinstall Bugzilla, and perform other setup
        # tasks.
        self.refresh()
        self.install()

        # 2. Argument enable_p4dti_first should be set to simulate a
        # previously configured P4DTI installation; it should be unset
        # to simulate bugzilla_params being present the first time P4DTI
        # is installed on this Bugzilla.
        if enable_p4dti_first:
            self.enable()
            self.mysql('drop table p4dti_bugzilla_parameters;')

        # 3. We confirm that the replicator spots absense of this table.
        self.initialize_replicator()
        self.expected([129])

        # 4. We pretend to follow instructions in section 5.4.3
        # "Enabling the Bugzilla extensions" in the Administrator's
        # Guide. We take the opportunity to set p4dti_enabled to a
        # random number which we will read back from the replicator
        # later.
        self.enable(p4dti_enabled = random_number)

        # 5. We confirm that the replicator can now find the
        # emailsuffix parameter.
        self.clear_log()
        self.initialize_replicator()
        self.expected_not([129, 130])
        for p in self.regression_parameters + ('p4dti',):
            assert config.bugzilla.params.has_key(p)
        assert int(config.bugzilla.params['p4dti']) == random_number

    def refresh(self):
        # Note that we start with an empty database.
        self.dti.restart_defect_tracker(restore_from_dump = 0)
        self.dti.restart_perforce()
        self.snoop_logger()
        for file in self.data_files:
            f = os.path.join(config.bugzilla_directory, file)
            if os.access(f, os.F_OK):
                os.remove(f)

    def install(self):
        self.read_cgi_defaults()
        cwd = os.getcwd()
        try:
            os.chdir(config.bugzilla_directory)
            self.first_checksetup()
            self.fixup_localconfig()
            self.second_checksetup()
        finally:
            os.chdir(cwd)

    def enable(self, p4dti_enabled = 1):
        cwd = os.getcwd()
        try:
            os.chdir(config.bugzilla_directory)
            self.edit_params(p4dti_enabled)
        finally:
            os.chdir(cwd)

    def simple_pipe(self, command):
        pipe = os.popen(command, 'r')
        lines = pipe.readlines()
        pipe.close()
        return lines

    def mysql(self, command):
        db = config.dbms_database
        user = config.dbms_user
        return self.simple_pipe('mysql -u %s %s -e \'%s\'' %
                                (user, db, command))

    # To create these defaults files for future versions of Bugzilla,
    # turn on debugging at the end of CGI.pl and - after the final
    # "read STDIN" - add the line:
    #         print "$::buffer\n";
    # Then submit editparams.cgi and look at the output.
    def read_cgi_defaults(self):
        filename = os.path.join(os.getcwd(), 'default-parameters',
                                self.bugzilla_name)
        f = open(filename, 'r')
        self.cgi_defaults = f.readline()
        f.close()

    # Installation of Bugzilla consists of three stages: run the
    # checksetup script; edit the localconfig file self.bugzilla_name
    # checksetup; run checksetup again. This second pass through
    # checksetup interrogates the administrator for various personal
    # details, which we will have to supply.
    def first_checksetup(self):
        self.simple_pipe('./checksetup.pl')

    def fixup_localconfig(self):
        f = open('localconfig', 'r')
        lines = f.readlines()
        f.close()
        lines = map(lambda line:
                    re.sub('db_name = "bugs"',
                           'db_name = "%s"' % (config.dbms_database,),
                           line),
                    lines)
        f = open('localconfig', 'w')
        f.writelines(lines)
        f.close()

    # Calling this method results in two messages "stty: standard input:
    # Invalid argument" to stderr. These come from the perl script
    # checksetup.pl, which disables input echoing before prompting for a
    # password and reenables it afterwards; it does this by calls to
    # stty which in this circumstance fail (their stdins not being ttys,
    # but Bugzilla doesn't check that).
    def second_checksetup(self):
        name = self.dbms_admin_passwd + '\n'
        password = name
        # Construct list of responses we'd give if we were running
        # this interactively.
        replies = [
            # Enter the e-mail address of the administrator
            config.administrator_address + '\n',
            # You entered ...  Is this correct? [Y/n]
            '\n',
            # Enter the real name of the administrator
            name,
            # Enter a password for the administrator account
            password,
            # Please retype the password to verify
            password,
            ]
        opipe = os.popen('./checksetup.pl >> %s 2>&1' % log_filename,
                         'w')
        opipe.writelines(replies)
        opipe.close()

    # These are the parameter settings and login details we want to
    # supply when faking use of the editparams.cgi web page. For all
    # other parameters the default value will do.
    def param_settings(self, p4dti_enabled):
        import urllib
        dict = {'p4dti': str(p4dti_enabled),
                'Bugzilla_login': config.administrator_address,
                'Bugzilla_password': self.dbms_admin_passwd,
                }
        return urllib.urlencode(dict)

    # Assemble parameters, including the defaults, and use them to
    # fake an invocation of editparams.cgi over the web.
    def edit_params(self, p4dti_enabled):
        param_settings = self.param_settings(p4dti_enabled)
        query_string =  param_settings + '&' + self.cgi_defaults
        # editparams.cgi actually uses POST, but we can simplify
        # matters with a GET and as its CGI scripts are very general
        # Bugzilla won't mind.
        env_additions = {'REQUEST_METHOD': 'GET',
                        'QUERY_STRING': query_string,
                        'REMOTE_ADDR': '127.0.0.1',
                        }
        for e in env_additions.keys():
            os.environ[e] = env_additions[e]
        try:
            self.simple_pipe('./doeditparams.cgi')
        finally:
            for e in env_additions.keys():
                del os.environ[e]


# 19. ENUM KEYWORDS WITH SPACES
#
# This is a regression test for job000445.  Bugzilla only.

class enum_spaces(p4dti_base):
    users = ['rb', 'nb', 'gdr']

    def setUp(self):
        self.setup_everything({'bugzilla_mysqldump':
                               'job000445-mysqldump'})

    def runTest(self):
        "Replicate Bugzilla enums containing spaces (test_p4dti.enum_spaces)"

        # Replicate.
        self.check_startup()
        self.check_consistency()


# RUNNING THE TESTS

def tests():
    suite = unittest.TestSuite()
    tests = [start_1, bogus, configdb, existing, inconsistencies,
             lifecycle, migrate, new_p4_job, normal, project, race_385,
             start_2]
    if config.dt_name == 'TeamTrack':
        tests.extend([nt_service])
    elif config.dt_name == 'Bugzilla':
        tests.extend([bugzilla_params, enum_spaces, unconfirmed])
    for t in tests:
        suite.addTest(t())
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest="tests")


# A. REFERENCES
#
# [GDR 2000-12-31] "Automated testing plan" (e-mail message); Gareth
# Rees; Ravenbrook Limited; 2000-12-31;
# <http://info.ravenbrook.com/mail/2000/12/31/14-42-26/0.txt>.
#
# [GDR 2001-03-14] "Automatic test of TeamTrack integration" (e-mail
# message); Gareth Rees; Ravenbrook Limited; 2001-03-14;
# <http://info.ravenbrook.com/mail/2001/03/14/22-44-45/0.txt>.
#
# [PyUnit] "PyUnit - a unit testing framework for Python"; Steve
# Purcell; <http://pyunit.sourceforge.net/>.
#
# [RB 2000-12-08] "init.py -- Initialize replicator and defect tracker";
# Richard Brooksby; Ravenbrook Limited; 2000-12-08;
# <http://www.ravenbrook.com/project/p4dti/master/code/replicator/init.py>.
#
#
# B. DOCUMENT HISTORY
#
# 2001-03-14 GDR Created.
#
# 2001-03-15 GDR Added list of regression tests, discussion of
# limitations, snoping on e-mail, p4dti_variant class, test cases for
# existing jobs, recent start date and replicating by project.
#
# 2001-03-17 GDR Added test_dt_errors test case for errors caught by the
# defect tracker or its database.
#
# 2001-05-01 GDR Use p4dti_unittest module so that we can report many
# errors in the course of a single test case.  Added regression test for
# job000311.
#
# 2001-05-17 GDR Don't look for messages 844-847 (replicator doesn't
# output them any more).  Fixed bug in issue lifecycle test.
#
# 2001-05-18 GDR Extended the lifecycle test so that it tests
# replication of fixes.
#
# 2001-06-07 GDR Added references to design.
#
# 2001-07-02 GDR Made bogus_config and lifecycle test cases portable
# between TeamTrack 4.5 and TeamTrack 5.0.
#
# 2001-07-03 GDR Moved TeamTrack connection time test to
# test_teamtrack.py where it belongs.
#
# 2001-07-09 NB Added changelist_url test.
#
# 2001-07-17 GDR Added start_2 (regression test for job000340) and
# configdb (regression test for job000169, job000351).
#
# 2001-07-25 GDR Added consistency checker tests (regression test for
# job000372).
#
# 2001-09-24 GDR Use environment variable P4DTI_PATH (if set) to find
# the P4DTI sources, so that testers can test installed copies of the
# P4DTI as well as copies synced from the repository.  Use environment
# variable P4DTI_CONFIG (if set) to find the configuration.
#
# 2001-09-25 GDR Use Bugzilla database and user from the configuration;
# see job000394.  Restore the mysqldump for the Bugzilla version given
# by the bugzilla_version configuration parameter (if specified).  The
# tests start_1 and start_2 were incorrectly using gmtime when making
# timestamps for the Bugzilla database -- this could lead to problems
# when testing in timezones not equal to UTC with recently-created
# defects.  Use localtime instead.
#
# 2001-10-03 GDR Added test case for job000385.
#
# 2001-10-23 GDR Use entry point for polling, so no need to call
# start_logger.
#
# 2001-10-26 NB Added test case for job000410.
#
# 2001-11-01 NB Always write e-mail messages to the test log.  Make
# Perforce user records so that email address translation works.
#
# 2001-11-06 GDR Added test case for replicating new job from Perforce.
# Log each run to a separate file.
#
# 2001-11-09 NDL Added test case for NT services and event log.
#
# 2001-11-13 GDR Method and variable names use underscore to separate
# words where possible.  Lifecycle test works with NT service changes.
#
# 2001-11-15 GDR Do not destructively modify config.replicated_fields
# in new_p4_job.setUp.
#
# 2001-11-21 GDR Added test case for migration, and test cases for
# incorrect values for the new configuration parameters.
#
# 2001-11-22 GDR Refresh Perforce jobs after migrating.
#
# 2001-11-26 GDR Test cases migrate and new_p4_job take account of new
# migration messages.
#
# 2001-11-28 GDR Bugzilla migration test fills in Severity and Priority
# fields when translating the jobspec.  Adapted the lifecycle test so
# that it runs in the Bugzilla integration.
#
# 2001-11-28 NDL Added test case for bugzilla parameters.
#
# 2001-12-03 GDR Added regression tests for job000007, job000013,
# job000222, job000271.
#
# 2001-12-05 GDR Test Perforce passwords, all kinds of inconsistencies,
# startup e-mail, success of polling, and Perforce servers that don't
# report a recognizable changelevel.
#
# 2001-12-07 GDR Extended lifecycle test case to cover conflict,
# overwriting, mail report, and illegal changes to jobs.
#
# 2001-12-08 GDR Don't make local variables with tracebacks in them
# (creates circular references).
#
# 2001-12-08 GDR Be thorough about checking expected messages.
#
# 2001-12-11 GDR Test replication of filespecs and fixes back from
# defect tracker.  Check that conflict e-mail gets sent to job changer
# if different from job owner.
#
# 2002-01-07 NB Test replication of bugzilla enum fields contanining
# spaces (job000445).  Also fix a number of config attribute accesses
# to use getattr and setattr.
#
#
# C. COPYRIGHT AND LICENCE
#
# This file is copyright (c) 2001 Perforce Software, Inc.  All rights
# reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1.  Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#
# 2.  Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in
#     the documentation and/or other materials provided with the
#     distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
#
#
# $Id: //info.ravenbrook.com/project/p4dti/branch/2001-11-22/bugzilla-parameters/test/test_p4dti.py#18 $
