# Perforce Defect Tracking Integration Project
#
#
# TEST_TEAMTRACK_API.PY -- TEST TEAMTRACK INTERFACE
#
# Gareth Rees, Ravenbrook Limited, 2001-04-19
#
#
# 1. INTRODUCTION
#
# This test script tests the Python interface to TeamTrack [GDR
# 2000-08-08].
#
# It uses the PyUnit unit test framework [PyUnit].
#
# The intended readership is project developers.
#
# This document is not confidential.
#
#
# 1.1. Regression tests in this script
#
# Job Section Title
# ----------------------------------------------------------------------
# job000277 2.4 Consistency checker and refresh script don't work with
# larger TeamTrack databases
# job000279 2.3 Consistency checking and refreshing take unacceptably
# long in the TeamTrack integration
# job000326 2.6 Can't update cases in a TeamTrack 5 database converted
# from TeamTrack 4.5.
# job000328 2.4 Database queries with plus or percent signs don't work
# in TeamTrack 5.
# job000335 2.6 Can't update cases in a TeamTrack 5.0 database
import sys
sys.path.append('../code/replicator')
import os
import socket
import string
import time
import unittest
import whrandom
config = __import__('config_' + string.lower
(string.split(socket.gethostname(),'.')[0]))
def tt_connect():
import teamtrack
return teamtrack.connect(config.teamtrack_user,
config.teamtrack_password,
config.teamtrack_server)
def check(name, expected, found):
assert expected == found, ("%s: expected %s but found %s"
% (name, str(expected), str(found)))
def log(string):
sys.stderr.write(string)
sys.stderr.flush()
def add_records(s, n, sparse = None):
import teamtrack
log("\n Adding %d cases " % n)
if sparse:
log("(sparsely at %d%%) " % sparse)
for i in range(n):
r = s.new_record(s.case_table_id())
r['PROJECTID'] = 9
r['PRIORITY'] = 1003
r.add()
if sparse:
log("%d" % r['ID'])
log(".")
if sparse:
dbver = (s.read_record(teamtrack.table['SYSTEMINFO'], 1)
['DBVER'])
if dbver < 50200:
t = s.read_record(teamtrack.table['TABLES'],
s.case_table_id())
else:
t = s.query(76, 'TS_TABLEID=%d' % s.case_table_id())[0]
t['LASTID'] = (t['LASTID']
+ whrandom.randrange(0, sparse * 2))
t.update()
log("\n")
def delete_records(s):
import teamtrack
log("\n Deleting records ")
for (id, name, query) in [(s.case_table_id(), 'CASES',
"TS_PRIORITY > 1000"),
(teamtrack.table['VCACTIONS'],
'VCACTIONS', "TS_TYPE > 1000")]:
c = cursor_three(s, name, query, 20)
while 1:
r = c.fetchone()
if r:
s.delete_record(id, r['ID'])
log(".")
else:
break
log("\n")
# 2. TEST CASES
# 2.1. Connect to the TeamTrack server
class connect(unittest.TestCase):
def runTest(self):
"Connect to the TeamTrack server (test_teamtrack.connect)"
connection_start = time.time()
server = tt_connect()
connection_time = time.time() - connection_start
assert connection_time <= 10, \
("Connection to TeamTrack took %d seconds"
% connection_time)
# 2.2. Create and read records
#
# Create some records in the VCACTIONS table, verify that they are there
# as expected and then delete them.
class create(unittest.TestCase):
def runTest(self):
"Create and read records (test_teamtrack.create)"
import teamtrack
code = 'test_create'
s = tt_connect()
vcactions = teamtrack.table['VCACTIONS']
n = 10
for i in range(n):
r = s.new_record(vcactions)
r['TYPE'] = 1001
r['INFO1'] = i
r['CHAR1'] = str(i)
r['CHAR2'] = code
r.add()
rl = s.query(vcactions, "TS_TYPE = 1001 AND TS_CHAR2 = '%s' "
"ORDER BY TS_INFO1" % code)
for r in rl:
s.delete_record(vcactions, r['ID'])
check("Number of results", n, len(rl))
for i in range(len(rl)):
check("rl[%d]['CHAR2']" % i, code, rl[i]['CHAR2'])
check("rl[%d]['INFO1']" % i, i, rl[i]['INFO1'])
check("rl[%d]['CHAR1']" % i, str(i), rl[i]['CHAR1'])
# 2.3. Stress
#
# Create many records in the VCACTIONS table. Read them all. Report on
# the time taken.
class stress(unittest.TestCase):
def runTest(self):
"Stress the API (test_teamtrack.stress)"
s = tt_connect()
project = 3
cases = s.case_table_id()
n = 50
add_records(s, n)
log(" Reading records\n")
read_start = time.time()
rl = s.query(cases, "TS_PRIORITY > 1000")
time_taken = time.time() - read_start
log(" Deleting records ")
for r in rl:
s.delete_record(cases, r['ID'])
log(".")
log("\n")
check("Number of results", n, len(rl))
log(" Took %.2f seconds to read %d records.\n"
% (time_taken, n))
# 2.4. Cursors
#
# This test checks that different cursor implementations return the same
# set of records as an ordinary query. It reports on the performance of
# each cursor implementation.
class cursor:
def __init__(self, server, table_name, query, chunk_size = 20):
import teamtrack
self.last_id = -1
self.cache = []
self.chunk_size = chunk_size
self.finished = 0
self.query = query
self.server = server
if table_name == 'CASES':
self.table_id = server.case_table_id()
self.table_name = server.case_table_name()
else:
self.table_id = teamtrack.table[table_name]
self.table_name = 'TS_' + table_name
# 2.4.1. Query "cursor".
#
# This cursor just issues a query and returns the results one by one.
# This is provided for comparison with the other implementations (it
# should run fastest if it succeeds).
class cursor_zero(cursor):
"Query"
def __init__(self, server, table_name, query, chunk):
cursor.__init__(self, server, table_name, query, chunk)
self.cache = self.server.query(self.table_id, self.query)
def fetchone(self):
if self.cache:
result = self.cache[0]
self.cache = self.cache[1:]
return result
else:
return None
# 2.4.2. TeamShare's recommend cursor
#
# This cursor reads from zero to N records by issuing a series of
# queries with WHERE clauses of the form
#
# TS_ID > -1 AND TS_ID <= 20 AND %s
# TS_ID > 20 AND TS_ID <= 40 AND %s
# TS_ID > 40 AND TS_ID <= 60 AND %s
#
# (here chunk_size == 20). This was recommended by Kelly Shaw in [Shaw
# 2001-04-16].
#
# The problem with this approach is that if a table is sparse then this
# can take arbitrarily long to read a single record. However, the CASES
# table is not sparse (people rarely delete records) and the CHANGES
# table is never sparse (no mechanism for deleting entries).
class cursor_one(cursor):
"Zero to N cursor"
def __init__(self, server, table_name, query, chunk):
import teamtrack
cursor.__init__(self, server, table_name, query, chunk)
dbver = (server.read_record(teamtrack.table['SYSTEMINFO'], 1)
['DBVER'])
if dbver < 50200:
t = server.read_record(teamtrack.table['TABLES'], self.table_id)
else:
t = server.query(76, 'TS_TABLEID=%d' % self.table_id)[0]
self.max_id = t['LASTID']
def fetchone(self):
while not self.cache:
if self.last_id >= self.max_id:
break
if self.query == '':
where = ''
else:
where = ' AND ' + self.query
end_id = self.last_id + self.chunk_size
query = ("TS_ID > %d AND TS_ID <= %d%s"
% (self.last_id, end_id, where))
self.cache = self.server.query(self.table_id, query)
self.last_id = end_id
if self.cache:
result = self.cache[0]
self.cache = self.cache[1:]
return result
return None
# 2.4.3. One at a time cursor
#
# This cursor queries the database each time it is called, with a query
# that returns exactly one record. It uses a series of WHERE clauses
# like this:
#
# TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > -1 AND %s)
# TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 5 AND %s)
# TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 7 AND %s)
# TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 10 AND %s)
#
# (assuming that the first three records in the table are 5, 7 and 10).
class cursor_two(cursor):
"One at a time cursor"
def fetchone(self):
if self.query == '':
where = ''
else:
where = ' AND ' + self.query
query = ("TS_ID=(SELECT MIN(TS_ID) FROM %s WHERE TS_ID>%d%s)"
% (self.table_name, self.last_id, where))
results = self.server.query(self.table_id, query)
if results:
assert(len(results) == 1)
assert(results[-1]['ID'] > self.last_id)
self.last_id = results[-1]['ID']
return results[0]
else:
return None
# 2.4.4. One to N at a time cursor
#
# This query is a combination of ideas from the queries in sections
# 2.4.1 and 2.4.2. It uses a subquery to make sure that it returns at
# least one result (so that the performance does not get arbitrarily bad
# when the table is sparse) but it also attempts to get several results
# at once. It uses a series of WHERE clauses of the form:
#
# %s AND TS_ID BETWEEN
# (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > -1 AND %s) AND
# (SELECT MIN(TS_ID)+19 FROM TS_CASES WHERE TS_ID > -1 AND %s)
#
# %s AND TS_ID BETWEEN
# (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 36 AND %s) AND
# (SELECT MIN(TS_ID)+19 FROM TS_CASES WHERE TS_ID > 36 AND %s)
#
# So even if the first record in the table is greater than the chunk
# size (here 20), we might still pick up several records, the last being
# record 36, so that the next query can start at a suitable place.
class cursor_three(cursor):
"One to N cursor"
def fetchone(self):
if self.finished:
return None
if not self.cache:
if self.query == '':
where_1 = ''
where_2 = ''
else:
where_1 = '(' + self.query + ') AND '
where_2 = ' AND (' + self.query + ')'
query = ("%sTS_ID BETWEEN (SELECT MIN(TS_ID) FROM %s "
"WHERE TS_ID > %d%s) AND (SELECT MIN(TS_ID)+%d "
"FROM %s WHERE TS_ID > %d%s) ORDER BY TS_ID"
% (where_1, self.table_name, self.last_id, where_2,
self.chunk_size - 1, self.table_name,
self.last_id, where_2))
self.cache = self.server.query(self.table_id, query)
if self.cache:
self.last_id = max(self.cache[-1]['ID'],
self.last_id + self.chunk_size)
else:
self.finished = 1
return None
result = self.cache[0]
self.cache = self.cache[1:]
return result
# 2.4.5. Cursors test case
class cursors(unittest.TestCase):
def performance(self, server, query, cursor_class, chunk_size):
start = time.time()
c = cursor_class(server, 'CASES', query, chunk_size)
records = []
while 1:
r = c.fetchone()
if r:
records.append(r['ID'])
else:
break
elapsed = time.time() - start
return (elapsed, records)
def runTest(self):
"Cursors (test_teamtrack.cursors)"
s = tt_connect()
n = 50
chunk_size = 18
add_records(s, n)
log(" %d records, chunk size %d." % (n, chunk_size))
# Get records by cursor.
cursors = [cursor_zero, cursor_one, cursor_two, cursor_three]
query_ids = None
query_elapsed = None
for c in cursors:
log("\n Trying %s ... " % c.__doc__)
(elapsed, ids) = self.performance(s, "TS_PRIORITY>1000", c,
chunk_size)
ids.sort()
if c == cursor_zero:
query_elapsed = elapsed
query_ids = ids
# Check that we got the right number of results each time
for i in range(len(cursors)):
check("Number of results by %s" % c.__doc__,
n, len(ids))
# Check that the returned records are the same.
if query_ids:
for i in range(len(ids)):
check("ids[%d]['ID']" % i, query_ids[i], ids[i])
# Report on the performance.
log("took %.2f seconds" % elapsed)
if c != cursor_zero and query_elapsed:
log(" (%d%% of query)." % (100*elapsed/query_elapsed))
# Delete the records again
delete_records(s)
# 2.5. Dummy test cases
#
# These aren't real test cases. They are really utilities for adding
# and deleting records that can be run from a command line when running
# other test cases.
# 2.5.1. Add records
class add(unittest.TestCase):
def runTest(self):
"Adding records (test_teamtrack.add)"
s = tt_connect()
n = 200
add_records(s, n)
# 2.5.2. Add records sparsely
class add_sparsely(unittest.TestCase):
def runTest(self):
"Adding records randomly (test_teamtrack.add_sparsely)"
s = tt_connect()
n = 350
add_records(s, n, 10)
# 2.5.3. Delete records
class delete(unittest.TestCase):
def runTest(self):
"Deleting records (test_teamtrack.delete)"
s = tt_connect()
delete_records(s)
# 2.6. Updating cases
#
# This test case fetches all the cases from the CASES table and calls
# the update() method on each one (no changes are actually made).
#
# This is a regression test for job000326 and job000335.
class update(unittest.TestCase):
def runTest(self):
"Updating cases (test_teamtrack.update)"
s = tt_connect()
rr = s.query(s.case_table_id(), '')
for r in rr:
r.update()
# 2.7. Submitting an issue
#
# This test attempts to submit an issue to TeamTrack.
class submit(unittest.TestCase):
def runTest(self):
"Submitting a case (test_teamtrack.submit)"
import teamtrack
s = teamtrack.connect('gdr', '', config.teamtrack_server)
r = s.new_record(s.case_table_id())
r['TITLE'] = 'TeamTrack test issue'
r['DESCRIPTION'] = 'This is a test issue for the submit test.'
r['ISSUETYPE'] = 1
r['PROJECTID'] = 4
id = r.submit('gdr')
# 3. RUNNING THE TESTS
def tests():
suite = unittest.TestSuite()
if os.name == 'nt':
for t in [connect, create, cursors, delete, stress, submit,
update]:
suite.addTest(t())
return suite
if __name__ == "__main__":
unittest.main(defaultTest="tests")
# A. REFERENCES
#
# [GDR 2000-08-08] "Python interface to TeamTrack: design"; Gareth Rees;
# Ravenbrook Limited; 2000-08-08;
# .
#
# [PyUnit] "PyUnit - a unit testing framework for Python"; Steve
# Purcell; .
#
# [Shaw 2001-04-16] "Notes on memory usage in the API"; Kelly Shaw;
# TeamShare; 2001-04-16;
# .
#
#
# B. DOCUMENT HISTORY
#
# 2001-04-19 GDR Created.
#
# 2001-05-17 GDR Added cursors test case.
#
# 2001-06-07 GDR Added update_cases test case.
#
# 2001-06-26 GDR Use new server methods case_table_id() and
# case_table_name() to support both TeamTrack 4.5 and TeamTrack 5.0.
#
# 2001-07-02 GDR Added submit test case.
#
# 2001-07-03 GDR Added connection time test to connect test case.
#
#
# C. COPYRIGHT AND LICENCE
#
# This file is copyright (c) 2001 Perforce Software, Inc. All rights
# reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
#
#
# $Id: //info.ravenbrook.com/project/p4dti/branch/2001-11-22/bugzilla-parameters/test/test_teamtrack.py#5 $