# Perforce Defect Tracking Integration Project # # # TEST_TEAMTRACK_API.PY -- TEST TEAMTRACK INTERFACE # # Gareth Rees, Ravenbrook Limited, 2001-04-19 # # # 1. INTRODUCTION # # This test script tests the Python interface to TeamTrack [GDR # 2000-08-08]. # # It uses the PyUnit unit test framework [PyUnit]. # # The intended readership is project developers. # # This document is not confidential. # # # 1.1. Regression tests in this script # # Job Section Title # ---------------------------------------------------------------------- # job000277 2.4 Consistency checker and refresh script don't work with # larger TeamTrack databases # job000279 2.3 Consistency checking and refreshing take unacceptably # long in the TeamTrack integration # job000326 2.6 Can't update cases in a TeamTrack 5 database converted # from TeamTrack 4.5. # job000328 2.4 Database queries with plus or percent signs don't work # in TeamTrack 5. # job000335 2.6 Can't update cases in a TeamTrack 5.0 database import sys sys.path.append('../code/replicator') import os import socket import string import time import unittest import whrandom config = __import__('config_' + string.lower (string.split(socket.gethostname(),'.')[0])) def tt_connect(): import teamtrack return teamtrack.connect(config.teamtrack_user, config.teamtrack_password, config.teamtrack_server) def check(name, expected, found): assert expected == found, ("%s: expected %s but found %s" % (name, str(expected), str(found))) def log(string): sys.stderr.write(string) sys.stderr.flush() def add_records(s, n, sparse = None): import teamtrack log("\n Adding %d cases " % n) if sparse: log("(sparsely at %d%%) " % sparse) for i in range(n): r = s.new_record(s.case_table_id()) r['PROJECTID'] = 9 r['PRIORITY'] = 1003 r.add() if sparse: log("%d" % r['ID']) log(".") if sparse: dbver = (s.read_record(teamtrack.table['SYSTEMINFO'], 1) ['DBVER']) if dbver < 50200: t = s.read_record(teamtrack.table['TABLES'], s.case_table_id()) else: t = s.query(76, 'TS_TABLEID=%d' % s.case_table_id())[0] t['LASTID'] = (t['LASTID'] + whrandom.randrange(0, sparse * 2)) t.update() log("\n") def delete_records(s): import teamtrack log("\n Deleting records ") for (id, name, query) in [(s.case_table_id(), 'CASES', "TS_PRIORITY > 1000"), (teamtrack.table['VCACTIONS'], 'VCACTIONS', "TS_TYPE > 1000")]: c = cursor_three(s, name, query, 20) while 1: r = c.fetchone() if r: s.delete_record(id, r['ID']) log(".") else: break log("\n") # 2. TEST CASES # 2.1. Connect to the TeamTrack server class connect(unittest.TestCase): def runTest(self): "Connect to the TeamTrack server (test_teamtrack.connect)" connection_start = time.time() server = tt_connect() connection_time = time.time() - connection_start assert connection_time <= 10, \ ("Connection to TeamTrack took %d seconds" % connection_time) # 2.2. Create and read records # # Create some records in the VCACTIONS table, verify that they are there # as expected and then delete them. class create(unittest.TestCase): def runTest(self): "Create and read records (test_teamtrack.create)" import teamtrack code = 'test_create' s = tt_connect() vcactions = teamtrack.table['VCACTIONS'] n = 10 for i in range(n): r = s.new_record(vcactions) r['TYPE'] = 1001 r['INFO1'] = i r['CHAR1'] = str(i) r['CHAR2'] = code r.add() rl = s.query(vcactions, "TS_TYPE = 1001 AND TS_CHAR2 = '%s' " "ORDER BY TS_INFO1" % code) for r in rl: s.delete_record(vcactions, r['ID']) check("Number of results", n, len(rl)) for i in range(len(rl)): check("rl[%d]['CHAR2']" % i, code, rl[i]['CHAR2']) check("rl[%d]['INFO1']" % i, i, rl[i]['INFO1']) check("rl[%d]['CHAR1']" % i, str(i), rl[i]['CHAR1']) # 2.3. Stress # # Create many records in the VCACTIONS table. Read them all. Report on # the time taken. class stress(unittest.TestCase): def runTest(self): "Stress the API (test_teamtrack.stress)" s = tt_connect() project = 3 cases = s.case_table_id() n = 50 add_records(s, n) log(" Reading records\n") read_start = time.time() rl = s.query(cases, "TS_PRIORITY > 1000") time_taken = time.time() - read_start log(" Deleting records ") for r in rl: s.delete_record(cases, r['ID']) log(".") log("\n") check("Number of results", n, len(rl)) log(" Took %.2f seconds to read %d records.\n" % (time_taken, n)) # 2.4. Cursors # # This test checks that different cursor implementations return the same # set of records as an ordinary query. It reports on the performance of # each cursor implementation. class cursor: def __init__(self, server, table_name, query, chunk_size = 20): import teamtrack self.last_id = -1 self.cache = [] self.chunk_size = chunk_size self.finished = 0 self.query = query self.server = server if table_name == 'CASES': self.table_id = server.case_table_id() self.table_name = server.case_table_name() else: self.table_id = teamtrack.table[table_name] self.table_name = 'TS_' + table_name # 2.4.1. Query "cursor". # # This cursor just issues a query and returns the results one by one. # This is provided for comparison with the other implementations (it # should run fastest if it succeeds). class cursor_zero(cursor): "Query" def __init__(self, server, table_name, query, chunk): cursor.__init__(self, server, table_name, query, chunk) self.cache = self.server.query(self.table_id, self.query) def fetchone(self): if self.cache: result = self.cache[0] self.cache = self.cache[1:] return result else: return None # 2.4.2. TeamShare's recommend cursor # # This cursor reads from zero to N records by issuing a series of # queries with WHERE clauses of the form # # TS_ID > -1 AND TS_ID <= 20 AND %s # TS_ID > 20 AND TS_ID <= 40 AND %s # TS_ID > 40 AND TS_ID <= 60 AND %s # # (here chunk_size == 20). This was recommended by Kelly Shaw in [Shaw # 2001-04-16]. # # The problem with this approach is that if a table is sparse then this # can take arbitrarily long to read a single record. However, the CASES # table is not sparse (people rarely delete records) and the CHANGES # table is never sparse (no mechanism for deleting entries). class cursor_one(cursor): "Zero to N cursor" def __init__(self, server, table_name, query, chunk): import teamtrack cursor.__init__(self, server, table_name, query, chunk) dbver = (server.read_record(teamtrack.table['SYSTEMINFO'], 1) ['DBVER']) if dbver < 50200: t = server.read_record(teamtrack.table['TABLES'], self.table_id) else: t = server.query(76, 'TS_TABLEID=%d' % self.table_id)[0] self.max_id = t['LASTID'] def fetchone(self): while not self.cache: if self.last_id >= self.max_id: break if self.query == '': where = '' else: where = ' AND ' + self.query end_id = self.last_id + self.chunk_size query = ("TS_ID > %d AND TS_ID <= %d%s" % (self.last_id, end_id, where)) self.cache = self.server.query(self.table_id, query) self.last_id = end_id if self.cache: result = self.cache[0] self.cache = self.cache[1:] return result return None # 2.4.3. One at a time cursor # # This cursor queries the database each time it is called, with a query # that returns exactly one record. It uses a series of WHERE clauses # like this: # # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > -1 AND %s) # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 5 AND %s) # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 7 AND %s) # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 10 AND %s) # # (assuming that the first three records in the table are 5, 7 and 10). class cursor_two(cursor): "One at a time cursor" def fetchone(self): if self.query == '': where = '' else: where = ' AND ' + self.query query = ("TS_ID=(SELECT MIN(TS_ID) FROM %s WHERE TS_ID>%d%s)" % (self.table_name, self.last_id, where)) results = self.server.query(self.table_id, query) if results: assert(len(results) == 1) assert(results[-1]['ID'] > self.last_id) self.last_id = results[-1]['ID'] return results[0] else: return None # 2.4.4. One to N at a time cursor # # This query is a combination of ideas from the queries in sections # 2.4.1 and 2.4.2. It uses a subquery to make sure that it returns at # least one result (so that the performance does not get arbitrarily bad # when the table is sparse) but it also attempts to get several results # at once. It uses a series of WHERE clauses of the form: # # %s AND TS_ID BETWEEN # (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > -1 AND %s) AND # (SELECT MIN(TS_ID)+19 FROM TS_CASES WHERE TS_ID > -1 AND %s) # # %s AND TS_ID BETWEEN # (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 36 AND %s) AND # (SELECT MIN(TS_ID)+19 FROM TS_CASES WHERE TS_ID > 36 AND %s) # # So even if the first record in the table is greater than the chunk # size (here 20), we might still pick up several records, the last being # record 36, so that the next query can start at a suitable place. class cursor_three(cursor): "One to N cursor" def fetchone(self): if self.finished: return None if not self.cache: if self.query == '': where_1 = '' where_2 = '' else: where_1 = '(' + self.query + ') AND ' where_2 = ' AND (' + self.query + ')' query = ("%sTS_ID BETWEEN (SELECT MIN(TS_ID) FROM %s " "WHERE TS_ID > %d%s) AND (SELECT MIN(TS_ID)+%d " "FROM %s WHERE TS_ID > %d%s) ORDER BY TS_ID" % (where_1, self.table_name, self.last_id, where_2, self.chunk_size - 1, self.table_name, self.last_id, where_2)) self.cache = self.server.query(self.table_id, query) if self.cache: self.last_id = max(self.cache[-1]['ID'], self.last_id + self.chunk_size) else: self.finished = 1 return None result = self.cache[0] self.cache = self.cache[1:] return result # 2.4.5. Cursors test case class cursors(unittest.TestCase): def performance(self, server, query, cursor_class, chunk_size): start = time.time() c = cursor_class(server, 'CASES', query, chunk_size) records = [] while 1: r = c.fetchone() if r: records.append(r['ID']) else: break elapsed = time.time() - start return (elapsed, records) def runTest(self): "Cursors (test_teamtrack.cursors)" s = tt_connect() n = 50 chunk_size = 18 add_records(s, n) log(" %d records, chunk size %d." % (n, chunk_size)) # Get records by cursor. cursors = [cursor_zero, cursor_one, cursor_two, cursor_three] query_ids = None query_elapsed = None for c in cursors: log("\n Trying %s ... " % c.__doc__) (elapsed, ids) = self.performance(s, "TS_PRIORITY>1000", c, chunk_size) ids.sort() if c == cursor_zero: query_elapsed = elapsed query_ids = ids # Check that we got the right number of results each time for i in range(len(cursors)): check("Number of results by %s" % c.__doc__, n, len(ids)) # Check that the returned records are the same. if query_ids: for i in range(len(ids)): check("ids[%d]['ID']" % i, query_ids[i], ids[i]) # Report on the performance. log("took %.2f seconds" % elapsed) if c != cursor_zero and query_elapsed: log(" (%d%% of query)." % (100*elapsed/query_elapsed)) # Delete the records again delete_records(s) # 2.5. Dummy test cases # # These aren't real test cases. They are really utilities for adding # and deleting records that can be run from a command line when running # other test cases. # 2.5.1. Add records class add(unittest.TestCase): def runTest(self): "Adding records (test_teamtrack.add)" s = tt_connect() n = 200 add_records(s, n) # 2.5.2. Add records sparsely class add_sparsely(unittest.TestCase): def runTest(self): "Adding records randomly (test_teamtrack.add_sparsely)" s = tt_connect() n = 350 add_records(s, n, 10) # 2.5.3. Delete records class delete(unittest.TestCase): def runTest(self): "Deleting records (test_teamtrack.delete)" s = tt_connect() delete_records(s) # 2.6. Updating cases # # This test case fetches all the cases from the CASES table and calls # the update() method on each one (no changes are actually made). # # This is a regression test for job000326 and job000335. class update(unittest.TestCase): def runTest(self): "Updating cases (test_teamtrack.update)" s = tt_connect() rr = s.query(s.case_table_id(), '') for r in rr: r.update() # 2.7. Submitting an issue # # This test attempts to submit an issue to TeamTrack. class submit(unittest.TestCase): def runTest(self): "Submitting a case (test_teamtrack.submit)" import teamtrack s = teamtrack.connect('gdr', '', config.teamtrack_server) r = s.new_record(s.case_table_id()) r['TITLE'] = 'TeamTrack test issue' r['DESCRIPTION'] = 'This is a test issue for the submit test.' r['ISSUETYPE'] = 1 r['PROJECTID'] = 4 id = r.submit('gdr') # 3. RUNNING THE TESTS def tests(): suite = unittest.TestSuite() if os.name == 'nt': for t in [connect, create, cursors, delete, stress, submit, update]: suite.addTest(t()) return suite if __name__ == "__main__": unittest.main(defaultTest="tests") # A. REFERENCES # # [GDR 2000-08-08] "Python interface to TeamTrack: design"; Gareth Rees; # Ravenbrook Limited; 2000-08-08; # . # # [PyUnit] "PyUnit - a unit testing framework for Python"; Steve # Purcell; . # # [Shaw 2001-04-16] "Notes on memory usage in the API"; Kelly Shaw; # TeamShare; 2001-04-16; # . # # # B. DOCUMENT HISTORY # # 2001-04-19 GDR Created. # # 2001-05-17 GDR Added cursors test case. # # 2001-06-07 GDR Added update_cases test case. # # 2001-06-26 GDR Use new server methods case_table_id() and # case_table_name() to support both TeamTrack 4.5 and TeamTrack 5.0. # # 2001-07-02 GDR Added submit test case. # # 2001-07-03 GDR Added connection time test to connect test case. # # # C. COPYRIGHT AND LICENCE # # This file is copyright (c) 2001 Perforce Software, Inc. All rights # reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS # OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR # TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # DAMAGE. # # # $Id: //info.ravenbrook.com/project/p4dti/branch/2001-11-22/bugzilla-parameters/test/test_teamtrack.py#5 $