#                Perforce Defect Tracking Integration Project
#                 <http://www.ravenbrook.com/project/p4dti/>
#
#                       TEST_P4DTI.PY -- TEST THE P4DTI
#
#                 Gareth Rees, Ravenbrook Limited, 2001-03-14
#
#
# 1. INTRODUCTION
#
# This test script tests the P4DTI.  The initial plan is described in
# [GDR 2000-12-31] and some of the design is given in [[GDR 2001-03-14].
#
# This script contains a lot of tricky code (for example, using the hostname or
# operating system name to pick a module to import or a class to use; or
# overriding a method in another module in order to snoop on its behaviour).
# Please take care when editing it.
#
# It uses the PyUnit unit test framework [PyUnit].
#
# The intended readership is project developers.
#
# This document is not confidential.
#
#
# 1.1. Useful info
#
# The script creates a series of test Perforce servers (one for each test
# case).  When a test case starts, it kills all the running Perforce servers on
# the required port.  Each of these has its own directory, in the appropriate
# place for temporary files for your system (typically /tmp on Unix, C:\Temp on
# Windows).  The P4DTI log is diverted from its real place and sent to the file
# p4dti.log.
#
#
# 1.2. Regression tests in this script
#
# The section * means that the defect is tested throughout as a simple
# consequence of running the script; there is no particular test for it.
#
# Job    Section  Title
# ----------------------------------------------------------------------
# job000003  6.3  It is not possible from the result of "p4 -G job -o
#                 jobname" to tell whether a job has been returned, or
#                 an error message
# job000004  *    "p4 -G jobspec -o" doesn't work
# job000005  *    TeamTrack integration may fail mysteriously on startup
#                 in future releases of TeamTrack
# job000037  6    Consistency checker script is inadequate
# job000047  8.1  Historical bugs are replicated but uninteresting
# job000048  6    Consistency checking of the configuration is
#                 inadequate
# job000050  8.1  There's no way to re-start replication
# job000053  10   Implicit changes made by the DT don't get replicated
#                 back
# job000057  *    Special characters in single-select keywords make the
#                 replicator barf
# job000075  6    No automatic check of configuration
# job000107  9    You can't replicate an issue's project
# job000111  7    Replicator destroys the jobspec that exists before
#                 it's installed
# job000112  9    Can't easily replicate by project
# job000116  6    Bugzilla integration doesn't do enough checking
# job000140  6.3  Logical field name "code" not allowed in TeamTrack
# job000158  6.2  Obscure error if Perforce can't be reached or
#                 p4_client_executable is wrong
# job000168  6.2  Too easy to set dbms_port to a string
# job000169  11   Change numbers are links in TeamTrack even when no
#                 changelist URL has been specified
# job000170  6.2  Replicator may be unable to send e-mail if the default
#                 replicator_address is unchanged
# job000181  *    Assertion failure in translate_1_to_0
# job000182  10   Elapsed time fields aren't replicated properly
# job000190  3.1  Connection to TeamTrack hangs for several minutes
# job000199  4.5  Auxiliary scripts send e-mail
# job000202  6.2  Errors from Perforce not reported well
# job000219  7    Existing jobs in Perforce may become unusable when the
#                 jobspec is changed
# job000221  8.1  Refreshing Perforce jobs fails in Bugzilla integration
# job000233  10   When you submit a new issue to TeamTrack it overwrites
#                 the issue
# job000240  7    Perforce server can crash if P4DTI is started with
#                 existing jobs
# job000311  9    If you have two TeamTrack projects with the same name,
#                 the replicator stops working
# job000325  *    Can't read issues from TeamTrack 5.0
# job000338  *    TeamTrack 5.0 doesn't update P4DTI_ fields
# job000340  8.2  Consistency checker may miss some issues if you change
#                 the start date
# job000351  11   Bugzilla integration doesn't remove old configuration
#                 items
# job000354  *    MySQL bug stops replicator from seeing some Bugzilla
#                 changes
# job000355  8    Bugzilla integration ignores start_date parameter
# job000362  10.3 Can't update case from Perforce when using TeamTrack
#                 4.5 database upgraded to TeamTrack 5.0
# job000372  12.3 Check consistency stops with error if there are
#                 unreplicated jobs
# job000379  10   P4DTI corrupts date/time fields in TeamTrack if server
#                 time zone is not UTC
# job000381  10   P4DTI corrupts date/time fields in TeamTrack if
#                 daylight savings time is in effect


import os
import sys
p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code', 'replicator')
if p4dti_path not in sys.path:
    sys.path.append(p4dti_path)
import cgi
import copy
import httplib
import logger
import message
import p4
import p4dti_unittest
import re
import socket
import string
import tempfile
import time
import types
import unittest
import urllib


# 2. CONFIGURATION
#
#
# 2.1. Limitations
#
# This script can only support one basic configuration on each host (see
# section 2.2).  Tests of other configurations have to be made by changing
# configuration parameters in the script.
#
# This script can only test one defect tracker on each host (you have to change
# dt_name in config_HOSTNAME.py to test another defect tracker).
#
# Could it use an environment variable to work out which configuration to use?
#
#
# 2.2. Using a test configuration
#
# I want to be able to test different configurations, changes to
# configurations, and to check whether erroneous configurations are spotted
# correctly.
#
# I expect to find a working configuration in config_HOSTNAME.py.
# This configuration should be copied when a test is started, changes
# made to the copy, and the copy installed in sys.modules['config'] so
# that the P4DTI will use the test configuration instead of the one in
# config.py.  Note that HOSTNAME is the first component of the FQDN
# (e.g. 'swan' for 'swan.ravenbrook.com').
#
# I also expect to find a Perforce license in the location given by
# the p4_license_file variable in the config_HOSTNAME.py file.

config_file = 'config_' + string.lower(string.split(socket.gethostname(), '.')[0])
config_module = __import__(config_file)
original_configuration = copy.copy(config_module.__dict__)
sys.modules['config'] = config_module
import config

def reset_configuration():
    # Delete current config, just in case we added something.
    for k in sys.modules['config'].__dict__.keys():
        if not re.match('^__.*__$', k):
            del sys.modules['config'].__dict__[k]
    # Restore old config.
    for (k,v) in original_configuration.items():
        sys.modules['config'].__dict__[k] = v


# 3. DEFECT TRACKER AND OPERATING INTERFACES
#
# Many of the test cases are generic: they don't depend on a particular defect
# tracker or operating system.  But they need an interface to the defect
# tracker in order to restart it, and to the operating system in order to start
# a new Perforce server.
#
# Each interface class should be called DT_OS_interface (where DT is
# config.dt_name and OS is os.name) is and must define these methods:
#
# restart_defect_tracker()
# restart_perforce()
#
# Note that there are no corresponding "stop" methods.  The tests leave
# Perforce and the defect tracker running so that failures can be investigated
# without all the evidence having disappeared.


# 3.1. Interface to TeamTrack on Windows NT

class TeamTrack_nt_interface:

    # The TeamTrack server.
    server = None

    # Temporary directory for files created by this test.
    tempdir = None


    # 3.1.1. Clean up TeamTrack
    #
    # I would prefer to create a new TeamTrack database each time this test is
    # run (so that I can check that extra fields are added correctly to the
    # CASES table, for example).  However, I don't know of a way to do that;
    # TeamTrack is controlled from the TeamTrack Administrator and I have no
    # idea how to automate it.
    #
    # So the next best thing to do is to clean out all P4DTI data from a
    # running TeamTrack server.  This means setting all P4DTI_ fields in the
    # CASES table to the empty string, and deleting all records from the
    # VCACTIONS table.
    #
    # This isn't ideal: each new test ends up working with the set of cases
    # that resulted from the previous test.

    def restart_defect_tracker(self):
        # Connect to the TeamTrack server.  This should take no more than 5
        # seconds (regression test for job000190).
        import teamtrack
        self.server = teamtrack.connect(config.teamtrack_user,
                                        config.teamtrack_password,
                                        config.teamtrack_server)

        # Empty the P4DTI_* fields in the CASES table, if there are any.
        cases = self.server.query(self.server.case_table_id(), '')
        fields = ['P4DTI_JOBNAME', 'P4DTI_RID', 'P4DTI_SID']
        for c in cases:
            changed = 0
            for f in fields:
                if c.has_key(f) and c[f] != '':
                    c[f] = ''
                    changed = 1
            if changed:
                c.update()

        # Delete all records in the VCACTIONS table.
        vcactions = self.server.query(teamtrack.table['VCACTIONS'],'')
        for v in vcactions:
            self.server.delete_record(teamtrack.table['VCACTIONS'], v['ID'])


    # 3.1.2. Kill old Perforce servers
    #
    # Find and terminate all running Perforce servers, if any.  This code is
    # based on the script "killProcName.py" that comes with the Python Win32
    # extensions.

    def kill_perforce(self):
        import win32pdhutil
        import win32api
        import win32con
	try:
            win32pdhutil.GetPerformanceAttributes('Process','ID Process',"p4d")
	except:
            pass
        pids = win32pdhutil.FindPerformanceAttributesByName("p4d")
        for pid in pids:
            h = win32api.OpenProcess(win32con.PROCESS_TERMINATE, 0, pid)
            win32api.TerminateProcess(h,0)
            win32api.CloseHandle(h)


    # 3.1.3. Start a new Perforce server
    #
    # Make a completely fresh Perforce server, with a new repository.

    def start_perforce(self):
        # Make a new repository directory.
        self.tempdir = tempfile.mktemp()
        os.mkdir(self.tempdir)

        # Copy the license.
        src = open(config.p4_license_file, 'r')
        dest = open(self.tempdir + '\\license', 'w')
        dest.writelines(src.readlines())
        src.close()
        dest.close()

        # Work out Perforce's port number and start a Perforce server.
        match = re.match(".*:([0-9]+)$", config.p4_port)
        if match:
            port = int(match.group(1))
        else:
            port = 1666
        import win32api
        win32api.WinExec("%s -p 0.0.0.0:%d -r %s"
                         % (config.p4_server_executable, port, self.tempdir))


    # 3.1.4. Restart Perforce
    #
    # By killing the old server and starting a new one.

    def restart_perforce(self):
        self.kill_perforce()
        self.start_perforce()



# 3.2. Interface to TeamTrack on Windows NT

class Bugzilla_posix_interface:

    # The Bugzilla server.
    server = None

    # Temporary directory for files created by this test.
    tempdir = None

    def __init__(self):
	# The default temporary file prefix starts with an '@'.  But
	# this means that temporary files will look like revision
	# specifications to Perforce.
	tempfile.gettempprefix = lambda: '%d.' % os.getpid()


    # 3.2.1. Restore Bugzilla
    #
    # Wipe out the existing Bugzilla database, if any, and restore a
    # known working one from a MySQL dump file.

    def restart_defect_tracker(self):
        os.system("mysqladmin -u bugs --force drop bugs > /dev/null")
        os.system("mysqladmin -u bugs create bugs > /dev/null")
        os.system("mysql -u bugs bugs < bugzilla.mysqldump > /dev/null")


    # 3.2.2. Kill running Perforce servers
    #
    # If there are any Perforce servers running on the magic port,
    # use p4 admin to kill them.

    def kill_running_perforce_servers(self):
        os.system("p4 -p %s -u %s admin stop > /dev/null"
                  % (config.p4_port, config.p4_user))

    # 3.2.3. Start a new Perforce server
    #
    # Make a completely fresh Perforce server, with a new repository.

    def start_perforce(self):
        # Make a new repository directory.
        self.tempdir = tempfile.mktemp()
        os.mkdir(self.tempdir)

        # Copy the license.
        src = open(config.p4_license_file, 'r')
        dest = open(os.path.join(self.tempdir, 'license'), 'w')
        dest.writelines(src.readlines())
        src.close()
        dest.close()

        # Work out Perforce's port number and start a Perforce server.
        match = re.match(".*:([0-9]+)$", config.p4_port)
        if match:
            port = int(match.group(1))
        else:
            port = 1666
        os.system("%s -d -p 127.0.0.1:%d -r %s > /dev/null"
                  % (config.p4_server_executable, port, self.tempdir))


    # 3.2.4. Restart Perforce
    #
    # By killing the old server and starting a new one.

    def restart_perforce(self):
        self.kill_running_perforce_servers()
        self.start_perforce()



# 4. P4DTI TEST CASE BASE CLASS
#
# The p4dti_base class is a generic P4DTI test case.  It defines methods for
# setting up the integration and some utilities for recording and interrogating
# the output of the replicator.
#
# Other P4DTI test cases will inherit from p4dti_base.
#
# When a class implements several test cases, the methods that implement test
# cases (in the PyUnit sense) should have names starting "test_".  When a class
# implements a single test case, the method should be called "runTest".

class p4dti_base(p4dti_unittest.TestCase):

    # Defect tracker interface (an instance of one of the classes in section 3
    # above).
    dti = eval(config.dt_name + '_' + os.name + '_interface')()

    # Messages written to the replicator's log.
    log_messages = []

    # Mail messages sent by the replicator.  Each message is a triple
    # (recipients, subject, body); see the definition of mail() method in
    # the replicator class.
    mail_messages = []

    # The replicator.
    r = None


    # 4.1. Snoop on the logger
    #
    # I want to be able to test that the correct messages are appearing
    # in the log, so I override the log method in the
    # logger.multi_logger class so that it records the messages as well
    # as printing them to the log file.  Printing to a log file is
    # necessary so that the test engineer can analyze what happened when
    # a test case fails.
    #
    # A disadvantage of this implementation is that the logging code is
    # not tested.  Therefore there needs to be a separate set of logging
    # tests.

    def snoop_logger(self):
        self.log_file = open(config.log_file or 'p4dti.log', 'a')
        logger.multi_logger.log = self.log
        logger.file_logger.log = self.log
        logger.sys_logger.log = self.log

    def log(self, msg):
        date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time()))
        self.log_file.write("%s  %s\n" % (date, msg))
        self.log_file.flush()
        self.log_messages.append(msg)

    # The clear_log method can be used to clear this record of the log
    # before carrying out a test.
    def clear_log(self):
        self.log_messages = []


    # 4.2. Snoop on e-mail

    def snoop_mail(self):
        import replicator
        replicator.replicator.mail = self.mail

    def mail(self, to, subject, body):
        self.mail_messages.append((to, subject, body))

    def clear_mail(self):
        self.mail_messages = []


    # 4.3. Check that the log is as expected
    #
    # These methods make various checks on the messages recorded in the log.

    def log_message_ids(self):
        return map(lambda m: m.id, self.log_messages)

    def expected_only(self, expected):
        for m in self.log_messages:
            assert m.id in expected, \
                   ("Unexpected message %d (%s) found in log %s"
                    % (m.id, str(m), self.log_message_ids()))

    def expected_only_in_range(self, min, max, expected):
        for m in self.log_messages:
            if m.id >= min and m.id <= max:
                assert m.id in expected, \
                       ("Unexpected message %d (%s) found in log %s"
                        % (m.id, str(m), self.log_message_ids()))

    def expected_not(self, unexpected):
        for m in self.log_messages:
            assert m.id not in unexpected, \
                   ("Unexpected message %d (%s) found in log %s"
                    % (m.id, str(m), self.log_message_ids()))

    def expected(self, expected):
        found = {}
        for m in self.log_messages:
            if found.has_key(m.id):
                found[m.id] = found[m.id] + 1
            else:
                found[m.id] = 1
        for id in expected:
            assert found.has_key(id) and found[id] > 0, \
                   ("Expected message %d not found in log %s"
                    % (id, self.log_message_ids()))
            found[id] = found[id] - 1


    # 4.4. Set up everything so that test cases can run
    #
    # Get a fresh configuration, a new defect tracker and a new Perforce
    # server.

    def setup_everything(self):
        reset_configuration()
        self.dti.restart_defect_tracker()
        self.dti.restart_perforce()
        self.snoop_logger()
        self.snoop_mail()

        # Get a Perforce interface.
        self.p4 = p4.p4( port = config.p4_port,
                         user = config.p4_user,
                         password = config.p4_password,
                         client_executable = config.p4_client_executable,
                         logger = self )


    # 4.5. Initialize the replicator
    #
    # Load the init module.  If it's already loaded, reload it: we must reload
    # it because we want the init module to run again and to pick up on the new
    # configuration.

    def initialize_replicator(self):
        self.clear_mail()
        if sys.modules.has_key('init'):
            reload(sys.modules['init'])
        import init
        self.r = init.r
        # Regression test for job000199.
        assert self.mail_messages == []


    # 4.6. Normal replicator startup
    #
    # This is a pseudo test case.  It checks that the replicator can start up
    # normally.  Other tests may depend on this having been run; for example,
    # the check_nothing test (5.1).  This test must be run before any other
    # test cases, and therefore can't be part of a test suite (the tests of
    # which may be run singly or in any order).  It should therefore be run
    # from the setUp() method of a subclass of this class (unless of course
    # that subclass has a different idea of what should happen at startup).

    def check_startup(self):
        self.initialize_replicator()

        # Expect to set up issues and replicate them, but no jobs or conflicts.
        self.clear_log()
        self.r.poll()
        self.expected([803, 804, 812])
        self.expected_only_in_range(800, 1000, [803, 804, 812])
        self.expected_not([806, 811])

        # Expect to see no issues replicated, and no issues actually changed.
        self.clear_log()
        self.r.poll()
        self.expected_not([803, 804, 812, 824])


    # 4.7. Exceptional replicator startup
    #
    # This method starts the replicator but expects to get the exception given
    # by 'error', with message id 'msgid'.

    def check_startup_error(self, error, msgid):
        try:
            self.initialize_replicator()
        except:
            (err, msg, _) = sys.exc_info()
            assert err == error, \
                   ("Expected error %s but got %s." % (error, err))
            assert msg.id == msgid, \
                   ("Expected message %d but got %d (%s)"
                    % (msgid, msg.id, str(msg)))
        else:
            self.fail("Expected error %s but didn't get it." % error)


    # 4.8. Check consistency
    #
    # This method checks that the databases are consistent.

    def check_consistency(self):
        self.clear_log()
        self.r.check_consistency()
        self.expected([871, 885])
        self.expected_only_in_range(800, 1000, [871, 883, 884, 885, 890])


    # 4.9. Check replication of a single issue

    def check_replication_dt_to_p4(self, first_time = 0):
        self.clear_log()
        self.r.poll()
        self.expected([804, 812])
        if first_time:
            self.expected([803]) # set up for replication
        self.expected_only_in_range(800, 1000, [803, 804, 812])

        # Nothing should come back from Perforce.
        self.clear_log()
        self.r.poll()
        self.expected_not([803, 804, 812, 824])
        self.expected_only_in_range(800, 1000, [805, 825])


    # 4.10. Initialize Perforce repository
    #
    # This method sets up Perforce clients and workspaces for a set of
    # users, and adds a file to the repository.  It sets up the
    # following members:
    #
    # p4i: Map from user to a Perforce interface for that user.
    # workspace: Map from user to the client workspace for that user.
    #
    # This is only needed for tests involving Perforce fixes: that's why
    # it's not called in setup_everything().

    def setup_perforce(self, users):
        # Create Perforce interfaces and workspaces for the dummy users.
        self.p4i = {}
        self.workspace = {}
        for user in users:
            self.workspace[user] = tempfile.mktemp()
            os.mkdir(self.workspace[user])
            self.p4i[user] = p4.p4(
                port = config.p4_port,
                user = user,
                client = user + '_' + socket.gethostname(),
                client_executable = config.p4_client_executable,
                logger = self, )
            client = self.p4i[user].run('client -o')[0]
            client['Root'] = self.workspace[user]
            self.p4i[user].run('client -i', client)

        # Add a file to the repository so that we have something with
        # which to make changes and fixes.
        user = 'rb'
        self.filename = os.path.join(self.workspace[user], 'test')
        open(self.filename, 'w')
        self.p4i[user].run('add -t ktext %s' % self.filename)
        change = self.p4i[user].run('change -o')[0]
        change['Description'] = 'Added test file'
        self.p4i[user].run('submit -i', change)


    # 4.11. Make changelist in Perforce
    #
    # This method makes a changelist in Perforce and edits a file in
    # that changelist.  The changelist is returned without being
    # submitted.

    def edit_file(self, user):
        change = self.p4i[user].run('change -o')[0]
        change['Description'] = 'Edited test file'
        result = self.p4i[user].run('change -i', change)[0]
        assert result.has_key('data')
        match = re.match('^Change ([0-9]+) created', result['data'])
        assert match
        changelist = match.group(1)
        self.p4i[user].run('edit -c %s %s' % (changelist, self.filename))
        open(self.filename, 'a').write("Foo\n")
        return self.p4i[user].run('change -o %s' % changelist)[0]


    # 4.12 Variant tests
    #
    # This class has variant methods for each defect tracker (e.g., TeamTrack,
    # Bugzilla) and calls the appropriate one.

    def run_variant(self):
	try:
	    test = getattr(self, config.dt_name)
	except AttributeError:
            assert 0, "No test variant for " + config.dt_name + "."
        test()


# 5. TEST CASES: NORMAL OPERATION
#
# If nothing has changed, then nothing happens when the replicator polls.
# The databases are consistent.

class normal(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.check_startup()

    def runTest(self):
	"Startup, replication to Perforce, consistency (test_p4dti.normal)"
        self.clear_log()
        self.r.poll()
        self.expected_only_in_range(800, 1000, [])
        self.check_consistency()


# 6. TEST CASES: INCORRECT CONFIGURATIONS
#
# This is a regression test of job000037, job000075 and job000116.

class bogus(p4dti_base):
    def setUp(self):
        self.setup_everything()


    # 6.1. An incorrect parameter generates an error
    #
    # This is a utility function for carrying out a range of tests.  It resets
    # the configuration, sets the parameter named by 'param' to value, then
    # tries to start the replicator.  It expects to get an exception, whose
    # message should have the message id 'msgid'.

    def check_param(self, param, value, msgid):
        reset_configuration()
        config.__dict__[param] = value
        try:
            self.initialize_replicator()
        except:
            (err, msg, _) = sys.exc_info()
            if isinstance(msg, message.message):
                if msg.id != msgid:
                    self.addFailure("Set parameter %s to '%s': expected "
                                    "message %d but got %d (%s)"
                                    % (param, value, msgid, msg.id, str(msg)))
            else:
                self.addFailure("Set parameter %s to '%s': expected message "
                                "%d but got '%s: %s' instead."
                                % (param, value, msgid, err, msg))
        else:
            self.addFailure("Set parameter %s to '%s': expected message %d "
                            "but didn't get it." % (param, value, msgid))


    # 6.2. Basic errors in parameters are caught quickly
    #
    # Basic errors in parameters (wrong type, wrong format) should be caught
    # quickly.
    #
    # This is a table of (parameter name, bogus value, message id of expected
    # error).

    bogus_parameters = [
        # Regression for job000170:
        ('administrator_address', 'invalid e-mail address', 202),
        ('changelist_url', -1, 207),
        ('changelist_url', "http://invalid/%d/%s", 210),
        ('changelist_url', "http://invalid/no/format/specifier", 210),
        ('changelist_url', "http://invalid/%d/%%/%%%", 210),
        ('job_url', 42, 207),
        ('job_url', "http://invalid/%d/%s", 211),
        ('job_url', "http://invalid/no/format/specifier", 211),
        ('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211),
        ('log_file', -1, 208),
        ('log_level', 'not an int', 204),
        ('p4_client_executable', -1, 207),
        # Regression test for job000158:
        ('p4_client_executable', 'no such file', 705),
        ('p4_user', None, 207),
        ('p4_password', -1, 207),
        ('p4_port', None, 207),
        # Regression test for job000158, job000202:
        ('p4_port', '127.0.0.1:9999', 707),
        ('p4_server_description', -1, 207),
        ('poll_period', 'not an int', 204),
        ('replicator_address', 'invalid@e-mail@address', 202),
        ('rid', -1, 207),
        ('rid', '0abc', 209),
        ('rid', 'ab-c', 209),
        ('sid', -1, 207),
        ('sid', 'abcdefg+z', 209),
        ('smtp_server', -1, 207),
        ('start_date', '2001-02-03 24-00-00', 201),
        ('replicate_p', 'not a function', 203),
        ('closed_state', -1, 208),
        ('replicated_fields', 'not a list', 205),
        ('replicated_fields', ['not', 'a', 'list', 'of', 'strings', 0], 206),
        ]

    def test_parameters(self):
        import check_config
        for (param, value, msgid) in self.bogus_parameters:
            self.check_param(param, value, msgid)


    # 6.3. Basic errors in DT parameters are caught quickly
    #
    # As 6.2, but picks a set of tests based on config.dt_name.

    bogus_TeamTrack_parameters = [
        ('teamtrack_server', None, 207),
        ('teamtrack_user', None, 207),
        ('teamtrack_password', -1, 207),
        ('closed_state', 'not a TeamTrack state', 401),
        ('replicated_fields', ['PRIORITY', 'not a TeamTrack state'], 403),
        ('replicated_fields', ['OWNER'], 404),
        ('replicated_fields', ['TITLE'], 404),
        ('replicated_fields', ['STATE'], 404),
        ('replicated_fields', ['PRIORITY', 'PRIORITY'], 405),
        ('replicated_fields', ['MULTISELECT'], 406),
        # Regression for job000003, job000140:
        ('replicated_fields', ['CODE'], 407),
        ]

    bogus_Bugzilla_parameters = [
        ('bugzilla_directory', 'not a directory', 303),
        ('bugzilla_directory', '/', 304),
        ('closed_state', 'not a Bugzilla state', 301),
        ('dbms_database', -1, 207),
        ('dbms_host', -1, 207),
        # Regression for job000168:
        ('dbms_port', '1234', 204),
        ('dbms_user', -1, 207),
        ('dbms_password', -1, 207),
        # By using fake Perforce client executables we can check that
        # unsupported client and server versions are detected.
        ('p4_client_executable', './fake_p4.py', 704),
        ('p4_client_executable', './fake_p4d.py', 834),
        ('replicated_fields', ['bug_status'], 311),
        ('replicated_fields', ['assigned_to'], 311),
        ('replicated_fields', ['short_desc'], 311),
        ('replicated_fields', ['resolution'], 311),
        ('replicated_fields', ['longdesc', 'longdesc'], 312),
        ('replicated_fields', ['not a Bugzilla field'], 313),
        ]

    def test_dt_parameters(self):
        params = getattr(self, 'bogus_%s_parameters' % config.dt_name)
        for (param, value, msgid) in params:
            self.check_param(param, value, msgid)


    # 6.4. Parameter errors are caught by the defect tracker
    #
    # Like 6.2 and 6.3 this test sets a parameter to an incorrect value.  In
    # this case the error is caught by the defect tracker, so a message object
    # isn't returned, but rather a string, which we must test directly rather
    # than by message id.

    # These three authentication failure messages are from the APIs for
    # builds 4509, 5034, and 50101 respectively.
    TeamTrack_auth_messages = [
        "SERVER_ERROR: Authentication Failed.  "
        "Invalid user id or password",
        "SOCKET_READ_ERROR: Socket error.\n"
        "One reason might be database needs up upgrading.\n"
        "Check event viewer for complete error message.\n",
        "SOCKET_READ_ERROR: Authentication Failed.  "
        "Invalid user id, password, or licensing."
        ]

    erroneous_TeamTrack_parameters = [
        ('dt_name',
         'not a defect tracker',
	 'exceptions.ImportError',
         'No module named configure_not a defect tracker'),
        ('teamtrack_server',
         'host.invalid',
         'TeamShare API error',
         'SOCKET_CONNECT_FAILED: Socket Connect failed.'),
        ('teamtrack_user',
         'invalid user',
         'TeamShare API error',
         TeamTrack_auth_messages),
        ('teamtrack_password',
         'invalid password',
         'TeamShare API error',
         TeamTrack_auth_messages),
        ]

    erroneous_Bugzilla_parameters = [
        ('dt_name',
         'not a defect tracker',
	 'exceptions.ImportError',
         'No module named configure_not a defect tracker'),
        ('dbms_host',
         'host.invalid',
         '_mysql.OperationalError',
         '(2005, "Unknown MySQL Server Host \'host.invalid\' (2)")'),
        ('dbms_database',
         'invalid',
         '_mysql.OperationalError',
         '(1049, "Unknown database \'invalid\'")'),
        # ('dbms_port',
	#  25,
        #  '_mysql.OperationalError',
	#  -1),
        ('dbms_password',
	 'not the Bugzilla password',
         '_mysql.OperationalError',
	 '(1045, "Access denied for user:'),
        ('dbms_user',
	 'not the Bugzilla user',
         '_mysql.OperationalError',
	 '(1045, "Access denied for user:'),
        ]

    def test_dt_errors(self):
        params = getattr(self, 'erroneous_%s_parameters' % config.dt_name)
        for (param, value, error, message_texts) in params:
            reset_configuration()
            config.__dict__[param] = value
            try:
                self.initialize_replicator()
            except:
                (err, msg, _) = sys.exc_info()
                if str(err) != error:
                    self.addFailure("Set parameter %s to '%s': expected error "
                                    "'%s' but got error '%s'."
                                    % (param, value, error, str(err)))
                if isinstance(message_texts, types.ListType):
                    texts = message_texts
                else:
                    texts = [message_texts]
                found = 0
                for text in texts:
                    if str(msg)[0:len(text)] == text:
                        found = 1
                        break
                if not found:
                    self.addFailure("Set parameter %s to '%s': expected error "
                                    "message in %s but got '%s'."
                                    % (param, value, texts, msg))
            else:
                self.addFailure("Set parameter %s to %s: expected error '%s' "
                                "but there was no error."
                                % (param, value, error))

    def runTest(self):
	"Illegal configuration parameters (test_p4dti.bogus)"
        self.test_parameters()
        self.test_dt_parameters()
        self.test_dt_errors()


# 7. TEST CASE: EXISTING JOB IN PERFORCE
#
# The replicator should refuse to start if there's a job in Perforce.
#
# This is a regression test for job000219 and job000240.

class existing(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def runTest(self):
	"Startup with an existing job (test_p4dti.existing)"
        j = self.p4.run('job -o')[0]
        j['Description'] = 'Test job'
        self.p4.run('job -i', j)
        self.check_startup_error("P4DTI Initialization error", 1001)


# 8. TEST CASE: MOVING THE START DATE
#
#
# 8.1. Moving the start date backwards in time
#
# When start_date is set to the current time, no issues should be replicated
# when the replicator starts.  Similarly, refreshing Perforce has no effect.
# But you can set start_date back in time and refresh Perforce, this time with
# effect.
#
# This is a regression test for job000047, job000050, job000221.

class start_1(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def runTest(self):
	"Moving the start_date backwards in time (test_p4dti.start_1)"
        config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
                                          time.gmtime(time.time()))
        self.initialize_replicator()

        # When we poll, nothing should happen.
        self.clear_log()
        self.r.poll()
        self.expected([])

        # Nor when we refresh.
        self.clear_log()
        self.r.refresh_perforce_jobs()
        self.expected([])

        # The databases should report consistent.
        self.check_consistency()

        # Now set start date back in time and try refreshing again.
        config.start_date = "1971-01-01 00:00:00"
        self.initialize_replicator()
        self.clear_log()
        self.r.refresh_perforce_jobs()
        self.expected([803, 804, 812])
        self.expected_only_in_range(800, 1000, [803, 804, 812])
        self.expected_not([806, 811])

        # The databases should still report consistent.
        self.check_consistency()


# 8.2. Moving the start date forwards in time
#
# Start up with an old start date as normal, then move the start date
# forwards in time.  The databases should still report consistent,
# because issues that were recorded as being replicated in the first
# poll should still be recorded as replicated, even though they haven't
# changed since the start date.
#
# This is a regression test for job000340.

class start_2(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.check_startup()

    def runTest(self):
	"Moving the start_date forwards in time (test_p4dti.start_2)"

        # Set start date back in time and check the consistency.
        config.start_date = time.strftime("%Y-%m-%d %H:%M:%S",
                                          time.gmtime(time.time()))
        self.initialize_replicator()

        # When we poll, nothing should happen.
        self.clear_log()
        self.r.poll()
        self.expected([])

        # The databases should report consistent.
        self.check_consistency()


# 9. TEST CASE: REPLICATING BY PROJECT
#
# This is a regression test for job000107, job000112, job000311.

class project(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def runTest(self):
        "Replicate by project (test_p4dti.project)"
        self.run_variant()

    def TeamTrack(self):
        config.replicated_fields = ['PROJECTID']
        config.replicate_p = (lambda self: self['PROJECTID'] in [4,5])
        self.initialize_replicator()
        self.clear_log()
        self.r.poll()
        self.expected_only_in_range(800, 1000, [803, 804, 812])
        self.expected_not([806, 811])
        # The database has two projects called 'P4DTI', so we should get a
        # warning.  This is a regression test for job000311.
        self.expected([607])
        jobs = self.p4.run('jobs')
        for j in jobs:
            assert j['Project'] in ['Elaborate editor', 'Clever compiler'], \
                   ("Job %s has project %s; shouldn't be replicated."
                    % (j['Job'], j['Project']))
        self.check_consistency()

    def Bugzilla(self):
        config.replicated_fields = ['product']
        config.replicate_p = (lambda self: self['product'] == 'product 1')
        self.check_startup()
        jobs = self.p4.run('jobs')
        for j in jobs:
            assert j['Product'] == 'product 1\n', \
                   ("Job %s has product %s; shouldn't be replicated."
                    % (j['Job'], j['Product']))
        self.check_consistency()


# 10. ISSUE LIFE CYCLE TEST CASES
#
# This test creates issues and checks that they are replicated
# correctly.

class lifecycle(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.setup_perforce(['rb'])
        self.check_startup()

    def runTest(self):
        "Issue life cycle (test_p4dti.lifecycle)"
        self.run_variant()

    def Bugzilla_submit(self):
        # Submit a new bug.
        fields = {
            'reporter': 'nb@ravenbrook.com',
            'product': 'product 1',
            'version': 'unspecified',
            'component': 'component 1.1',
            'rep_platform': 'All',
            'op_sys': 'All',
            'priority': 'P1',
            'severity': 'critical',
            'assigned_to': '',
            'cc': '',
            'bug_file_loc': '',
            'short_desc': 'Test bug',
            'comment': 'Test long description.',
            'submit': '    Commit    ',
            'form_name': 'enter_bug',
            'Bugzilla_password': 'p4dtitest',
            'Bugzilla_login': 'nb@ravenbrook.com',
            }
        h = httplib.HTTP('swan.ravenbrook.com')
        h.set_debuglevel(100)
        h.putrequest('POST', '/bugzilla/post_bug.cgi')
        content = string.join(map(lambda f: urllib.quote_plus(f[0]) + '=' + urllib.quote_plus(f[1]), fields.items()), '&') + '\r\n'
        h.putheader('Content-Length', str(len(content)))
        h.endheaders()
        h.send(content)
        h.getreply()
	bugid = None
	lines = h.getfile().readlines()
        for l in lines:
	    match = re.search('<H2>Bug ([0-9]+) posted</H2>', l)
	    if match:
		bugid = match.group(1)
	if bugid == None:
	    self.fail("Tried to submit a bug to Bugzilla, but got the following in reply: %s." % string.join(lines))

        # It gets replicated to Perforce.  This is a regression test for
        # job000233.
        self.check_replication_dt_to_p4(first_time = 1)

        # Check the job in Perforce.
        bug = self.r.dt.issue(bugid)
        assert bug
        jobname = bug.corresponding_id()
        job = self.p4.run('job -o %s' % jobname)[0]

        return bug, job

    def TeamTrack_submit(self, s, user):
        import teamtrack

        # Submit a new case.
        case = s.new_record(s.case_table_id())
        case['TITLE'] = 'Issue lifecycle test'
        case['DESCRIPTION'] = 'This is a test issue for the lifecycle test.'
        case['ISSUETYPE'] = 1 # Bug report
        case['PROJECTID'] = 4 # Editor
        case['SEVERITY'] = 45
        # Regression test for job000182:
        case['ESTTIMETOFIX'] = 7199   # 1:59:59
        case['ACTTIMETOFIX'] = 445556 # 123:45:56
        # Regression test for job000379 and job000381 (note that date is
        # in the summer so DST is in effect):
        case['CLOSEDATE'] = 962532245 # 2000-07-02 03:04:05
        issueid = case.submit(user)

        # It gets replicated to Perforce.  This is a regression test for
        # job000233.
        self.check_replication_dt_to_p4(first_time = 1)

        # Check the job in Perforce.
        case2 = s.query(s.case_table_id(),
                        "TS_ISSUETYPE=%d AND TS_ISSUEID='%05d'"
                        % (case['ISSUETYPE'], issueid))[0]
        job = self.p4.run('job -o %s' % case2['P4DTI_JOBNAME'])[0]
        for (field, expected) in [('State', '_new'),
                                  ('Title', 'Issue lifecycle test'),
                                  ('Est._Time_to_Fix', '1:59:59'),
                                  ('Actual_Time_to_Fix', '123:45:56')]:
            assert job[field] == expected, \
                   ("Expected new job %s to have %s '%s', but found '%s'."
                    % (job['Job'], field, expected, job[field]))

        return case2, job

    def TeamTrack_assign(self, s, case, job, user):
        import teamtrack
        # Find the assign transition.
        assign = s.query(teamtrack.table['TRANSITIONS'],
                         "TS_NAME = 'Assign'")[0]['ID']

        # Assign the issue in TeamTrack.
        case.transition(user, assign)

        # It gets replicated to Perforce.
        self.check_replication_dt_to_p4()

        # The state should now be assigned.
        job = self.p4.run('job -o %s' % job['Job'])[0]
        assert job['State'] == 'assigned', \
               ("Expected assigned job %(Job)s to have state "
                "'assigned' but it has state %(State)s." % job)

        return case, job

    def TeamTrack_close(self, job, expected):
        # The closed job gets replicated back to TeamTrack.  TeamTrack
        # will change the owner field and the replicator will replicate
        # that back.  This is a regression test for job000053.
        self.clear_log()
        self.r.poll()
        self.expected(expected)
        self.expected_only_in_range(800, 1000, expected)
        job2 = self.p4.run('job -o %s' % job['Job'])[0]
        assert job2['Owner'] != job['Owner'], \
               ("Closed job %(Job)s but owner is still %(Owner)s." % job2)

    def Bugzilla(self):
        bug, job = self.Bugzilla_submit()
        self.fail('unimplemented')

    def TeamTrack(self):
        # We need to be logged in to TeamTrack as a user other than the
        # replicator in order to make a change that isn't ignored.
        import teamtrack
        user = 'rb'
        s = teamtrack.connect(user,'', config.teamtrack_server)

        # 10.3.1. Simple issue lifecycle
        #
        # This is a simple issue cycle:
        #
        #   1. An issue is submitted to the defect tracker.
        #   2. It is replicated to Perforce.
        #   3. It gets assigned to a developer.
        #   4. It is closed in Perforce (by editing the job).
        #   5. The closure gets replicated back to the defect tracker.
        case, job = self.TeamTrack_submit(s, user)
        case, job = self.TeamTrack_assign(s, case, job, user)

        # Close the job in Perforce.
        job['State'] = 'closed'
        self.p4.run('job -i', job)
        self.TeamTrack_close(job, [805, 824, 826])
        self.r.poll()

        # Regression test for job000182.
        case = s.read_record(s.case_table_id(), case['ID'])
        assert case['ESTTIMETOFIX'] == 7199
        assert case['ACTTIMETOFIX'] == 445556


        # 10.3.2. Issue lifecycle (fix in Perforce).
        #
        # This is an issue cycle in which the issue is associated with a
        # changelist in Perforce:
        #
        #   1. An issue is submitted to the defect tracker.
        #   2. It is replicated to Perforce.
        #   3. It gets assigned to a developer.
        #   4. It is closed in Perforce (by making a fix).
        #   5. Closure and fix get replicated back to the defect tracker.
        case, job = self.TeamTrack_submit(s, user)
        case, job = self.TeamTrack_assign(s, case, job, user)
        self.p4.run('fix -c 1 %s' % job['Job'])
        self.TeamTrack_close(job, [802, 805, 819, 820, 824, 826])
        self.r.poll()


        # 10.3.3. Issue lifecycle (fix on submission in Perforce).
        #
        # This tests an issue lifecycle in which the issue is associated
        # with a pending changelist and closed on submission.
        #
        #   1. An issue is submitted to the defect tracker.
        #   2. It is replicated to Perforce.
        #   3. It gets assigned to a developer.
        #   4. The job description is edited in Perforce (this is a
        #      regression test for job000362).
        #   5. A fix is made with a pending changelist.
        #   6. The change is submitted.
        #   7. Job, fix get replicated back to the defect tracker.
        case, job = self.TeamTrack_submit(s, user)
        case, job = self.TeamTrack_assign(s, case, job, user)
        self.r.poll()
        job['Description'] = job['Description'] + '\nEdited.'
        self.p4i[user].run('job -i', job)
        self.clear_log()
        self.r.poll()
        self.expected([805, 824])
        self.expected_only_in_range(800, 1000, [805, 824])
        change = self.edit_file(user)
        self.p4i[user].run('fix -c %s %s'
                           % (change['Change'], job['Job']))
        self.p4i[user].run('submit -c %s' % change['Change'])
        self.TeamTrack_close(job, [802, 805, 819, 820, 824, 826])
        self.r.poll()

        # We're done; one last check for luck.
        self.check_consistency()


# 11. P4DTI CONFIGURATION DATABASE
#
# This checks that parameters get added and removed from the
# configuration database.  This is a regression test for job000169 and
# job000351.

class configdb(p4dti_base):
    def setUp(self):
        self.setup_everything()

    def TeamTrack_config_item(self, item):
        import teamtrack
        query = ("TS_TYPE = 4 AND TS_CHAR1 = '%s' AND TS_CHAR2 = "
                 "'%s'" % (config.rid, item))
        rr = self.dti.server.query(teamtrack.table['VCACTIONS'], query)
        for r in rr:
            dict = eval(r['FILENAME'])
            if (dict.has_key('sid') and dict['sid'] == config.sid
                and dict.has_key('description')):
                return dict['description']
        return None

    TeamTrack_config_map = { 'p4_server_description': 'SERVER',
                             'job_url': 'JOB_URL',
                             'changelist_url': 'CHANGELIST_URL' }

    def TeamTrack_config(self):
        cf = {}
        for k, v in self.TeamTrack_config_map.items():
            cf[k] = self.TeamTrack_config_item(v)
        return cf

    def Bugzilla_config(self):
        return self.r.dt.bugzilla.get_config()

    def check(self, cf1):
        reset_configuration()
        for k, v in cf1.items():
            config.__dict__[k] = v
        self.initialize_replicator()
        cf2 = getattr(self, config.dt_name + '_config')()
        for k, v in cf1.items():
            if v != cf2.get(k):
                self.addFailure("Set parameter %s to '%s', but found "
                                "'%s' in the config database."
                                % (k, v, cf2.get(k)))


    def runTest(self):
        "Replicator configuration database (test_p4dti.configdb)"
        cf1 = { 'p4_server_description': 'spong',
                'changelist_url': 'http://spong/changelist?%d',
                'job_url': 'http://spong/job?%s' }
        cf2 = { 'p4_server_description': 'spong',
                'changelist_url': None,
                'job_url': None }
        self.check(cf1)
        self.check(cf2)
        self.check(cf1)


# 12. INCONSISTENCIES
#
# This test case checks that various inconsistencies are detected
# correctly.

class inconsistencies(p4dti_base):
    def setUp(self):
        self.setup_everything()
        self.setup_perforce(['rb'])
        self.check_startup()

    def runTest(self):
        "Consistency check failures (test_p4dti.inconsistencies)"
        user = 'rb'

        # 12.1. Unreplicated fix in Perforce
        change = self.edit_file(user)
        job = self.p4.run('jobs')[0]
        self.p4i[user].run('fix -s %s -c %s %s'
                           % (job[config.job_status_field],
                              change['Change'], job['Job']))
        self.clear_log()
        self.r.check_consistency()
        self.expected([871, 878, 886])
        self.expected_only_in_range(800, 1000,
                                    [871, 878, 883, 884, 886, 890])
        self.r.poll()

        # 12.2. Changed job in Perforce
        job['Description'] = job['Description'] + '...\n'
        self.p4i[user].run('job -i', job)
        self.clear_log()
        self.r.check_consistency()
        self.expected([871, 875, 886])
        self.expected_only_in_range(800, 1000,
                                    [871, 875, 883, 884, 886, 890])
        self.r.poll()

        # 12.3. Unreplicated job in Perforce
        #
        # This is a regression test for job000372.
        j = self.p4i[user].run('job -o')[0]
        for k, v in j.items():
            if string.find(v, '<enter description here>') == 0:
                j[k] = 'Test!'
	j['P4DTI-rid'] = config.rid
        j['P4DTI-issue-id'] = "999999"
        self.p4i[user].run('job -i', j)
        self.clear_log()
        self.r.check_consistency()
        self.expected([871, 882, 886])
        self.expected_only_in_range(800, 1000,
                                    [871, 882, 883, 884, 886, 890])



# RUNNING THE TESTS

def tests():
    suite = unittest.TestSuite()
    for t in [bogus, configdb, existing, inconsistencies, normal,
              project, start_1, start_2]:
        suite.addTest(t())
    if os.name == 'nt':
	suite.addTest(lifecycle())
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest="tests")


# A. REFERENCES
#
# [GDR 2000-12-31] "Automated testing plan" (e-mail message); Gareth
# Rees; Ravenbrook Limited;
# <http://info.ravenbrook.com/2000/12/31/14-42-26/0.txt>; 2000-12-31.
#
# [GDR 2001-03-14] "Automatic test of TeamTrack integration" (e-mail
# message); Gareth Rees; Ravenbrook Limited;
# <http://info.ravenbrook.com/2001/03/14/22-44-45/0.txt>; 2001-03-14.
#
# [PyUnit] "PyUnit - a unit testing framework for Python"; Steve Purcell;
# <http://pyunit.sourceforge.net/>.
#
#
# B. DOCUMENT HISTORY
#
# 2001-03-14 GDR Created.
#
# 2001-03-15 GDR Added list of regression tests, discussion of limitations,
# snoping on e-mail, p4dti_variant class, test cases for existing jobs, recent
# start date and replicating by project.
#
# 2001-03-17 GDR Added test_dt_errors test case for errors caught by the defect
# tracker or its database.
#
# 2001-05-01 GDR Use p4dti_unittest module so that we can report many errors in
# the course of a single test case.  Added regression test for job000311.
#
# 2001-05-17 GDR Don't look for messages 844-847 (replicator doesn't
# output them any more).  Fixed bug in issue lifecycle test.
#
# 2001-05-18 GDR Extended the lifecycle test so that it tests
# replication of fixes.
#
# 2001-06-07 GDR Added references to design.
#
# 2001-07-02 GDR Made bogus_config and lifecycle test cases portable
# between TeamTrack 4.5 and TeamTrack 5.0.
#
# 2001-07-03 GDR Moved TeamTrack connection time test to
# test_teamtrack.py where it belongs.
#
# 2001-07-09 NB Added changelist_url test.
#
# 2001-07-17 GDR Added start_2 (regression test for job000340) and
# configdb (regression test for job000169, job000351).
#
# 2001-07-25 GDR Added consistency checker tests (regression test for
# job000372).
#
#
# C. COPYRIGHT AND LICENCE
#
# This file is copyright (c) 2001 Perforce Software, Inc.  All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1.  Redistributions of source code must retain the above copyright notice,
#     this list of conditions and the following disclaimer.
#
# 2.  Redistributions in binary form must reproduce the above copyright notice,
#     this list of conditions and the following disclaimer in the documentation
#     and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
# $Id: //info.ravenbrook.com/project/p4dti/version/1.1/test/test_p4dti.py#35 $
