# Perforce Defect Tracking Integration Project # # # TEST_TEAMTRACK_API.PY -- TEST TEAMTRACK INTERFACE # # Gareth Rees, Ravenbrook Limited, 2001-04-19 # # # 1. INTRODUCTION # # This test script tests the Python interface to TeamTrack [GDR # 2000-08-08]. # # It uses the PyUnit unit test framework [PyUnit]. # # The intended readership is project developers. # # This document is not confidential. import sys sys.path.append('../code/replicator') import os import socket import string import teamtrack import time import unittest import whrandom config = __import__('config_' + string.split(socket.gethostname(),'.')[0]) def tt_connect(): return teamtrack.connect(config.teamtrack_user, config.teamtrack_password, config.teamtrack_server) def check(name, expected, found): assert expected == found, ("%s: expected %s but found %s" % (name, str(expected), str(found))) def log(string): sys.stderr.write(string) sys.stderr.flush() def add_records(s, n, sparse = None): log("\n Adding %d cases " % n) if sparse: log("(sparsely at %d%%) " % sparse) for i in range(n): r = s.new_record(teamtrack.table['CASES']) r['PROJECTID'] = 9 r['PRIORITY'] = 1003 r.add() if sparse: log("%d" % r['ID']) log(".") if sparse: t = s.read_record(teamtrack.table['TABLES'], teamtrack.table['CASES']) t['LASTID'] = (t['LASTID'] + whrandom.randrange(0, sparse * 2)) t.update() log("\n") def delete_records(s): log("\n Deleting records ") for (table, query) in [('CASES', "TS_PRIORITY > 1000"), ('VCACTIONS', "TS_TYPE > 1000")]: table_id = teamtrack.table[table] c = cursor_three(s, table, query, 20) while 1: r = c.fetchone() if r: s.delete_record(table_id, r['ID']) log(".") else: break log("\n") # 2. TEST CASES # 2.1. Connect to the TeamTrack server class connect(unittest.TestCase): def runTest(self): "Connect to the TeamTrack server" server = tt_connect() # 2.2. Create and read records # # Create some records in the VCACTIONS table, verify that they are there # as expected and then delete them. class create(unittest.TestCase): def runTest(self): "Create and read records" code = 'test_create' s = tt_connect() vcactions = teamtrack.table['VCACTIONS'] n = 10 for i in range(n): r = s.new_record(vcactions) r['TYPE'] = 1001 r['INFO1'] = i r['CHAR1'] = str(i) r['CHAR2'] = code r.add() rl = s.query(vcactions, "TS_TYPE = 1001 AND TS_CHAR2 = '%s' " "ORDER BY TS_INFO1" % code) for r in rl: s.delete_record(vcactions, r['ID']) check("Number of results", n, len(rl)) for i in range(len(rl)): check("rl[%d]['CHAR2']" % i, code, rl[i]['CHAR2']) check("rl[%d]['INFO1']" % i, i, rl[i]['INFO1']) check("rl[%d]['CHAR1']" % i, str(i), rl[i]['CHAR1']) # 2.3. Stress # # Create many records in the VCACTIONS table. Read them all. Report on # the time taken. class stress(unittest.TestCase): def runTest(self): "Stress" s = tt_connect() project = 3 cases = teamtrack.table['CASES'] n = 50 add_records(s, n) log(" Reading records\n") read_start = time.time() rl = s.query(cases, "TS_PRIORITY > 1000") time_taken = time.time() - read_start log(" Deleting records ") for r in rl: s.delete_record(cases, r['ID']) log(".") log("\n") check("Number of results", n, len(rl)) log(" Took %.2f seconds to read %d records.\n" % (time_taken, n)) # 2.4. Cursors # # This test checks that different cursor implementations return the same # set of records as an ordinary query. It reports on the performance of # each cursor implementation. class cursor: def __init__(self, server, table_name, query, chunk_size = 20): self.last_id = -1 self.cache = [] self.chunk_size = chunk_size self.finished = 0 self.query = query self.server = server self.table_id = teamtrack.table[table_name] self.table_name = table_name # 2.4.1. Query "cursor". # # This cursor just issues a query and returns the results one by one. # This is provided for comparison with the other implementations (it # should run fastest if it succeeds). class cursor_zero(cursor): "Query" def __init__(self, server, table_name, query, chunk): cursor.__init__(self, server, table_name, query, chunk) self.cache = self.server.query(self.table_id, self.query) def fetchone(self): if self.cache: result = self.cache[0] self.cache = self.cache[1:] return result else: return None # 2.4.2. TeamShare's recommend cursor # # This cursor reads from zero to N records by issuing a series of # queries with WHERE clauses of the form # # TS_ID > -1 AND TS_ID <= 20 AND %s # TS_ID > 20 AND TS_ID <= 40 AND %s # TS_ID > 40 AND TS_ID <= 60 AND %s # # (here chunk_size == 20). This was recommended by Kelly Shaw in [Shaw # 2001-04-16]. # # The problem with this approach is that if a table is sparse then this # can take arbitrarily long to read a single record. However, the CASES # table is not sparse (people rarely delete records) and the CHANGES # table is never sparse (no mechanism for deleting entries). class cursor_one(cursor): "Zero to N cursor" def __init__(self, server, table_name, query, chunk): cursor.__init__(self, server, table_name, query, chunk) t = server.read_record(teamtrack.table['TABLES'], self.table_id) self.max_id = t['LASTID'] def fetchone(self): while not self.cache: if self.last_id >= self.max_id: break if self.query == '': where = '' else: where = ' AND ' + self.query end_id = self.last_id + self.chunk_size query = ("TS_ID > %d AND TS_ID <= %d%s" % (self.last_id, end_id, where)) self.cache = self.server.query(self.table_id, query) self.last_id = end_id if self.cache: result = self.cache[0] self.cache = self.cache[1:] return result return None # 2.4.3. One at a time cursor # # This cursor queries the database each time it is called, with a query # that returns exactly one record. It uses a series of WHERE clauses # like this: # # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > -1 AND %s) # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 5 AND %s) # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 7 AND %s) # TS_ID = (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 10 AND %s) # # (assuming that the first three records in the table are 5, 7 and 10). class cursor_two(cursor): "One at a time cursor" def fetchone(self): if self.query == '': where = '' else: where = ' AND ' + self.query query = ("TS_ID=(SELECT MIN(TS_ID) FROM TS_%s WHERE TS_ID>%d%s)" % (self.table_name, self.last_id, where)) results = self.server.query(self.table_id, query) if results: assert(len(results) == 1) assert(results[-1]['ID'] > self.last_id) self.last_id = results[-1]['ID'] return results[0] else: return None # 2.4.4. One to N at a time cursor # # This query is a combination of ideas from the queries in sections # 2.4.1 and 2.4.2. It uses a subquery to make sure that it returns at # least one result (so that the performance does not get arbitrarily bad # when the table is sparse) but it also attempts to get several results # at once. It uses a series of WHERE clauses of the form: # # TS_ID BETWEEN # (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > -1 AND %s) AND # (SELECT MIN(TS_ID)+19 FROM TS_CASES WHERE TS_ID > -1 AND %s) # # TS_ID BETWEEN # (SELECT MIN(TS_ID) FROM TS_CASES WHERE TS_ID > 36 AND %s) AND # (SELECT MIN(TS_ID)+19 FROM TS_CASES WHERE TS_ID > 36 AND %s) # # So even if the first record in the table is greater than the chunk # size (here 20), we might still pick up several records, the last being # record 36, so that the next query can start at a suitable place. class cursor_three(cursor): "One to N cursor" def fetchone(self): if self.finished: return None if not self.cache: if self.query == '': where = '' else: where = ' AND ' + self.query query = ("TS_ID BETWEEN (SELECT MIN(TS_ID) FROM TS_%s " "WHERE TS_ID > %d%s) AND (SELECT MIN(TS_ID)+%d " "FROM TS_%s WHERE TS_ID > %d%s) ORDER BY TS_ID" % (self.table_name, self.last_id, where, self.chunk_size - 1, self.table_name, self.last_id, where)) self.cache = self.server.query(self.table_id, query) if self.cache: self.last_id = max(self.cache[-1]['ID'], self.last_id + self.chunk_size) else: self.finished = 1 return None result = self.cache[0] self.cache = self.cache[1:] return result # 2.4.5. Cursors test case class cursors(unittest.TestCase): def performance(self, server, query, cursor_class, chunk_size): start = time.time() c = cursor_class(server, 'CASES', query, chunk_size) records = [] while 1: r = c.fetchone() if r: records.append(r['ID']) else: break elapsed = time.time() - start return (elapsed, records) def runTest(self): "Cursors" s = tt_connect() n = 50 chunk_size = 18 add_records(s, n) log(" %d records, chunk size %d." % (n, chunk_size)) # Get records by cursor. cursors = [cursor_zero, cursor_one, cursor_two, cursor_three] query_ids = None query_elapsed = None for c in cursors: log("\n Trying %s ... " % c.__doc__) (elapsed, ids) = self.performance(s, "TS_PRIORITY>1000", c, chunk_size) ids.sort() if c == cursor_zero: query_elapsed = elapsed query_ids = ids # Check that we got the right number of results each time for i in range(len(cursors)): check("Number of results by %s" % c.__doc__, n, len(ids)) # Check that the returned records are the same. if query_ids: for i in range(len(ids)): check("ids[%d]['ID']" % i, query_ids[i], ids[i]) # Report on the performance. log("took %.2f seconds" % elapsed) if c != cursor_zero and query_elapsed: log(" (%d%% of query)." % (100*elapsed/query_elapsed)) # Delete the records again delete_records(s) # 2.5. Dummy test cases # # These aren't real test cases. They are really utilities for adding # and deleting records that can be run from a command line when running # other test cases. # 2.5.1. Add records class add(unittest.TestCase): def runTest(self): "Adding records" s = tt_connect() n = 200 add_records(s, n) # 2.5.2. Add records sparsely class add_sparsely(unittest.TestCase): def runTest(self): "Adding records randomly" s = tt_connect() n = 350 add_records(s, n, 10) # 2.5.3. Delete records class delete(unittest.TestCase): def runTest(self): "Deleting records" s = tt_connect() delete_records(s) # 3. RUNNING THE TESTS def tests(): suite = unittest.TestSuite() suite.addTest(connect()) suite.addTest(create()) suite.addTest(stress()) suite.addTest(cursors()) suite.addTest(delete()) return suite if __name__ == "__main__": unittest.main(defaultTest="tests") # A. REFERENCES # # [GDR 2000-08-08] "Python interface to TeamTrack: design"; Gareth Rees; # Ravenbrook Limited; 2000-08-08. # # [PyUnit] "PyUnit - a unit testing framework for Python"; Steve # Purcell; . # # [Shaw 2001-04-16] "Notes on memory usage in the API"; ; ; # ; # 2001-04-16. # # # B. DOCUMENT HISTORY # # 2001-04-19 GDR Created. # # 2001-05-17 GDR Added cursors test case. # # # C. COPYRIGHT AND LICENCE # # This file is copyright (c) 2001 Perforce Software, Inc. All rights # reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS # OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR # TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # DAMAGE. # # # $Id: //info.ravenbrook.com/project/p4dti/branch/2001-05-15/capacity/test/test_teamtrack.py#3 $