common.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. #
  2. # Metrix++, Copyright 2009-2019, Metrix++ Project
  3. # Link: https://github.com/metrixplusplus/metrixplusplus
  4. #
  5. # This file is a part of Metrix++ Tool.
  6. #
  7. import inspect
  8. import os.path
  9. import subprocess
  10. import logging
  11. import difflib
  12. import unittest
  13. import shutil
  14. import ast
  15. import sys
  16. class ToolRunner(object):
  17. def __init__(self,
  18. tool_name,
  19. opts_list = [],
  20. dirs_list = None,
  21. cwd='sources',
  22. prefix = "default",
  23. exit_code = 0,
  24. save_prev = False,
  25. use_prev = False,
  26. check_stderr = None,
  27. remove_exiting_dbfile = None,
  28. remove_exiting_dbfile_prev = False):
  29. self.message = ""
  30. # identify gold_file_location
  31. curframe = inspect.currentframe()
  32. calframe = inspect.getouterframes(curframe, 2)
  33. test_name = calframe[1][3]
  34. suite_name = os.path.splitext(os.path.basename(calframe[1][1]))[0]
  35. group_name = os.path.basename(os.path.dirname(calframe[1][1]))
  36. self.suite_location = os.path.join('tests', group_name, suite_name)
  37. self.test_location = os.path.join(self.suite_location, test_name + "_" + tool_name + "_" + str(prefix))
  38. db_file = os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], self.suite_location, test_name + ".db")
  39. self.dbfile = db_file
  40. if (remove_exiting_dbfile == True or (remove_exiting_dbfile == None and tool_name == 'collect')) and os.path.exists(db_file):
  41. os.unlink(db_file)
  42. db_file_prev = os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], self.suite_location, test_name + ".prev.db")
  43. self.dbfile_prev = db_file_prev
  44. if (remove_exiting_dbfile_prev == True or (remove_exiting_dbfile_prev == None and tool_name == 'collect')) and os.path.exists(db_file_prev):
  45. os.unlink(db_file_prev)
  46. self.cwd = cwd
  47. db_opts = ['--db-file=' + db_file]
  48. if use_prev == True:
  49. db_opts.append('--db-file-prev=' + db_file_prev)
  50. self.dbopts = db_opts
  51. self.dirs_list = []
  52. if dirs_list != None:
  53. for each in dirs_list:
  54. self.dirs_list.append(each)
  55. self.call_args = [sys.executable, os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], "metrix++.py"), tool_name] \
  56. + db_opts + opts_list + ['--'] + self.dirs_list
  57. self.cmd = " ".join(self.call_args)
  58. self.exit_code_expected = exit_code
  59. self.stderr_lines = check_stderr
  60. self.save_prev = save_prev
  61. def run(self):
  62. logging.debug(self.get_description())
  63. child = subprocess.Popen(self.call_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  64. cwd=os.path.join(self.suite_location, self.cwd))
  65. (child_stdout, child_stderr) = child.communicate()
  66. self.exit_code = child.returncode
  67. gold_file_stdout = self.test_location + "_stdout.gold.txt"
  68. real_file_stdout = self.test_location + "_stdout.real.txt"
  69. diff_file_stdout = self.test_location + "_stdout.diff.html"
  70. gold_file_stderr = self.test_location + "_stderr.gold.txt"
  71. real_file_stderr = self.test_location + "_stderr.real.txt"
  72. diff_file_stderr = self.test_location + "_stderr.diff.html"
  73. # Regenerate gold files if it was requested
  74. if os.environ['METRIXPLUSPLUS_TEST_GENERATE_GOLDS'] == "True":
  75. f = open(gold_file_stdout, 'wb');
  76. f.write(child_stdout);
  77. f.close()
  78. if self.stderr_lines != None:
  79. f = open(gold_file_stderr, 'wb');
  80. f.write(child_stderr);
  81. f.close()
  82. # Match with gold
  83. self.is_stdout_matched = self.inetrnal_compare_with_gold(child_stdout, gold_file_stdout, real_file_stdout, diff_file_stdout)
  84. if self.stderr_lines != None:
  85. self.is_stderr_matched = self.inetrnal_compare_with_gold(child_stderr, gold_file_stderr, real_file_stderr, diff_file_stderr, self.stderr_lines)
  86. else:
  87. self.is_stderr_matched = None
  88. if self.is_stdout_matched == False:
  89. f = open(real_file_stderr, 'wb');
  90. f.write(child_stderr);
  91. f.close()
  92. else:
  93. if os.path.exists(real_file_stderr):
  94. os.unlink(real_file_stderr)
  95. if self.save_prev == True:
  96. shutil.copy2(self.dbfile, self.dbfile_prev)
  97. return self
  98. def inetrnal_compare_with_gold(self, text, gold_file, real_file, diff_file, lines = None):
  99. if os.path.exists(gold_file) == False:
  100. self.message += "\nGold file does not exist: " + gold_file
  101. return False
  102. f = open(gold_file, 'r');
  103. gold_text = f.read();
  104. f.close()
  105. # don't compare dictionaries as string - they are not in order... (test case failes sometimes)
  106. try:
  107. textDict = ast.literal_eval(str(text.decode('ascii')))
  108. goldDict = ast.literal_eval(str(gold_text))
  109. except:
  110. textDict = text
  111. goldDict = gold_text
  112. if isinstance(textDict, dict) and isinstance(goldDict, dict):
  113. result = (textDict == goldDict)
  114. else:
  115. gold_to_compare = gold_text
  116. text_to_compare = str(text.decode('ascii'))
  117. if lines != None:
  118. gold_to_compare = ""
  119. text_to_compare = ""
  120. gold_lines = gold_text.splitlines(True)
  121. text_lines = str(text.decode('ascii')).splitlines(True)
  122. for each in lines:
  123. gold_to_compare += "".join(gold_lines[each[0] : each[1]])
  124. text_to_compare += "".join(text_lines[each[0] : each[1]])
  125. gold_to_compare = gold_to_compare.replace('\n', ' ').replace('\r', '').replace(" ", "")
  126. text_to_compare = text_to_compare.replace('\n', ' ').replace('\r', '').replace(" ", "")
  127. result = (gold_to_compare == text_to_compare)
  128. if result == False:
  129. f = open(real_file, 'wb')
  130. f.write(text)
  131. f.close()
  132. diff_text = difflib.HtmlDiff().make_file(gold_text.splitlines(), text.decode('ascii').splitlines(), "Gold file", "Real output")
  133. f = open(diff_file, 'w')
  134. f.write(diff_text)
  135. f.close()
  136. else:
  137. if os.path.exists(real_file):
  138. os.unlink(real_file)
  139. if os.path.exists(diff_file):
  140. os.unlink(diff_file)
  141. return result
  142. def check_exit_code(self):
  143. return self.exit_code == self.exit_code_expected
  144. def check_stdout(self):
  145. return self.is_stdout_matched
  146. def check_stderr(self):
  147. if self.is_stderr_matched == None:
  148. return True
  149. return self.is_stderr_matched
  150. def check_all(self):
  151. result = self.check_exit_code() and self.check_stdout() and self.check_stderr()
  152. if result == False:
  153. self.message += "\nCheck for exit code: " + str(self.check_exit_code()) \
  154. + ", gold: " + str(self.exit_code_expected) + ", real: " + str(self.exit_code) + \
  155. "\nCheck for stdout: " + str(self.check_stdout()) + "\nCheck for stderr: " + str(self.check_stderr())
  156. return result
  157. def get_message(self):
  158. return self.message
  159. def get_cmd(self):
  160. return self.cmd
  161. def get_description(self):
  162. return self.get_message() + "\nProcess: " + self.get_cmd() + "\nCWD: " + os.path.join(self.suite_location, self.cwd)
  163. def get_dbfile(self):
  164. return self.dbfile
  165. def get_dbfile_prev(self):
  166. return self.dbfile_prev
  167. class TestCase(unittest.TestCase):
  168. def __init__(self, methodName='runTest'):
  169. unittest.TestCase.__init__(self, methodName=methodName)
  170. if 'METRIXPLUSPLUS_LOG_LEVEL' not in list(os.environ.keys()):
  171. # launch of individual unit test
  172. os.environ['METRIXPLUSPLUS_LOG_LEVEL'] = 'ERROR'
  173. os.environ['METRIXPLUSPLUS_INSTALL_DIR'] = os.path.dirname(os.path.dirname(__file__))
  174. os.environ['METRIXPLUSPLUS_TEST_MODE'] = str("True")
  175. if 'METRIXPLUSPLUS_TEST_GENERATE_GOLDS' not in list(os.environ.keys()):
  176. os.environ['METRIXPLUSPLUS_TEST_GENERATE_GOLDS'] = str("False")
  177. os.chdir(os.environ['METRIXPLUSPLUS_INSTALL_DIR'])
  178. def get_content_paths(self, cwd='sources'):
  179. curframe = inspect.currentframe()
  180. calframe = inspect.getouterframes(curframe, 2)
  181. test_name = calframe[1][3]
  182. suite_name = os.path.splitext(os.path.basename(calframe[1][1]))[0]
  183. group_name = os.path.basename(os.path.dirname(calframe[1][1]))
  184. class ContentPaths(object):
  185. def __init__(self, cwd, dbfile, dbfile_prev):
  186. self.cwd = cwd
  187. self.dbfile = dbfile
  188. self.dbfile_prev = dbfile_prev
  189. return ContentPaths(os.path.join('tests', group_name, suite_name, cwd),
  190. os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], 'tests', group_name, suite_name, test_name + ".db"),
  191. os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], 'tests', group_name, suite_name, test_name + ".prev.db"))
  192. def setUp(self):
  193. unittest.TestCase.setUp(self)
  194. logging.basicConfig(format="[TEST-LOG]: %(levelname)s:\t%(message)s", level=logging.WARN)
  195. log_level = os.environ['METRIXPLUSPLUS_LOG_LEVEL']
  196. if log_level == 'ERROR':
  197. log_level = logging.ERROR
  198. elif log_level == 'WARNING':
  199. log_level = logging.WARNING
  200. elif log_level == 'INFO':
  201. log_level = logging.INFO
  202. elif log_level == 'DEBUG':
  203. log_level = logging.DEBUG
  204. else:
  205. raise AssertionError("Unhandled choice of log level")
  206. logging.getLogger().setLevel(log_level)
  207. self.runners = []
  208. def assertExec(self, runner):
  209. # keep reference, so files are not removed during test case time
  210. self.runners.append(runner)
  211. self.assertTrue(runner.check_all(), runner.get_description())
  212. def run(self, result=None):
  213. self.current_result = result # remember result for use in tearDown
  214. unittest.TestCase.run(self, result)
  215. def tearDown(self):
  216. unittest.TestCase.tearDown(self)
  217. if self.current_result.wasSuccessful() == True:
  218. for each in self.runners:
  219. if each.check_all() == True:
  220. if os.path.exists(each.get_dbfile()):
  221. os.unlink(each.get_dbfile())
  222. if os.path.exists(each.get_dbfile_prev()):
  223. os.unlink(each.get_dbfile_prev())