common.py 10 KB


  1. #
  2. # Metrix++, Copyright 2009-2013, Metrix++ Project
  3. # Link: http://metrixplusplus.sourceforge.net
  4. #
  5. # This file is a part of Metrix++ Tool.
  6. #
  7. # Metrix++ is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, version 3 of the License.
  10. #
  11. # Metrix++ is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Metrix++. If not, see <http://www.gnu.org/licenses/>.
  18. #
  19. import inspect
  20. import os.path
  21. import subprocess
  22. import logging
  23. import difflib
  24. import unittest
  25. import shutil
  26. class ToolRunner(object):
  27. def __init__(self,
  28. tool_name,
  29. opts_list = [],
  30. dirs_list = None,
  31. cwd='sources',
  32. prefix = "default",
  33. exit_code = 0,
  34. save_prev = False,
  35. use_prev = False,
  36. check_stderr = None,
  37. remove_exiting_dbfile = None,
  38. remove_exiting_dbfile_prev = False):
  39. self.message = ""
  40. # identify gold_file_location
  41. curframe = inspect.currentframe()
  42. calframe = inspect.getouterframes(curframe, 2)
  43. test_name = calframe[1][3]
  44. suite_name = os.path.splitext(os.path.basename(calframe[1][1]))[0]
  45. group_name = os.path.basename(os.path.dirname(calframe[1][1]))
  46. self.suite_location = os.path.join('tests', group_name, suite_name)
  47. self.test_location = os.path.join(self.suite_location, test_name + "_" + tool_name + "_" + str(prefix))
  48. db_file = os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], self.suite_location, test_name + ".db")
  49. self.dbfile = db_file
  50. if (remove_exiting_dbfile == True or (remove_exiting_dbfile == None and tool_name == 'collect')) and os.path.exists(db_file):
  51. os.unlink(db_file)
  52. db_file_prev = os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], self.suite_location, test_name + ".prev.db")
  53. self.dbfile_prev = db_file_prev
  54. if (remove_exiting_dbfile_prev == True or (remove_exiting_dbfile_prev == None and tool_name == 'collect')) and os.path.exists(db_file_prev):
  55. os.unlink(db_file_prev)
  56. self.cwd = cwd
  57. db_opts = ['--general.db-file=' + db_file]
  58. if use_prev == True:
  59. db_opts.append('--general.db-file-prev=' + db_file_prev)
  60. self.dbopts = db_opts
  61. self.dirs_list = []
  62. if dirs_list != None:
  63. for each in dirs_list:
  64. self.dirs_list.append(each)
  65. self.call_args = ['python', os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], tool_name + ".py")] \
  66. + db_opts + opts_list + ['--'] + self.dirs_list
  67. self.cmd = " ".join(self.call_args)
  68. self.exit_code_expected = exit_code
  69. self.stderr_lines = check_stderr
  70. self.save_prev = save_prev
  71. def run(self):
  72. logging.debug(self.get_description())
  73. child = subprocess.Popen(self.call_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  74. cwd=os.path.join(self.suite_location, self.cwd))
  75. (child_stdout, child_stderr) = child.communicate()
  76. self.exit_code = child.returncode
  77. gold_file_stdout = self.test_location + "_stdout.gold.txt"
  78. real_file_stdout = self.test_location + "_stdout.real.txt"
  79. diff_file_stdout = self.test_location + "_stdout.diff.html"
  80. gold_file_stderr = self.test_location + "_stderr.gold.txt"
  81. real_file_stderr = self.test_location + "_stderr.real.txt"
  82. diff_file_stderr = self.test_location + "_stderr.diff.html"
  83. # Regenerate gold files if it was requested
  84. if os.environ['METRIXPLUSPLUS_TEST_GENERATE_GOLDS'] == "True":
  85. f = open(gold_file_stdout, 'wb');
  86. f.write(child_stdout);
  87. f.close()
  88. if self.stderr_lines != None:
  89. f = open(gold_file_stderr, 'wb');
  90. f.write(child_stderr);
  91. f.close()
  92. # Match with gold
  93. self.is_stdout_matched = self.inetrnal_compare_with_gold(child_stdout, gold_file_stdout, real_file_stdout, diff_file_stdout)
  94. if self.stderr_lines != None:
  95. self.is_stderr_matched = self.inetrnal_compare_with_gold(child_stderr, gold_file_stderr, real_file_stderr, diff_file_stderr, self.stderr_lines)
  96. else:
  97. self.is_stderr_matched = None
  98. if self.is_stdout_matched == False:
  99. f = open(real_file_stderr, 'wb');
  100. f.write(child_stderr);
  101. f.close()
  102. else:
  103. if os.path.exists(real_file_stderr):
  104. os.unlink(real_file_stderr)
  105. if self.save_prev == True:
  106. shutil.copy2(self.dbfile, self.dbfile_prev)
  107. return self
  108. def inetrnal_compare_with_gold(self, text, gold_file, real_file, diff_file, lines = None):
  109. if os.path.exists(gold_file) == False:
  110. self.message += "\nGold file does not exist: " + gold_file
  111. return False
  112. f = open(gold_file, 'rb');
  113. gold_text = f.read();
  114. f.close()
  115. gold_to_compare = gold_text
  116. text_to_compare = text
  117. if lines != None:
  118. gold_to_compare = ""
  119. text_to_compare = ""
  120. gold_lines = gold_text.splitlines(True)
  121. text_lines = text.splitlines(True)
  122. for each in lines:
  123. gold_to_compare += "".join(gold_lines[each[0] : each[1]])
  124. text_to_compare += "".join(text_lines[each[0] : each[1]])
  125. result = (gold_to_compare == text_to_compare)
  126. if result == False:
  127. f = open(real_file, 'wb');
  128. f.write(text);
  129. f.close()
  130. diff_text = difflib.HtmlDiff().make_file(gold_to_compare.splitlines(), text_to_compare.splitlines(), "Gold file", "Real output")
  131. f = open(diff_file, 'w');
  132. f.write(diff_text);
  133. f.close()
  134. else:
  135. if os.path.exists(real_file):
  136. os.unlink(real_file)
  137. if os.path.exists(diff_file):
  138. os.unlink(diff_file)
  139. return result
  140. def check_exit_code(self):
  141. return self.exit_code == self.exit_code_expected
  142. def check_stdout(self):
  143. return self.is_stdout_matched
  144. def check_stderr(self):
  145. if self.is_stderr_matched == None:
  146. return True
  147. return self.is_stderr_matched
  148. def check_all(self):
  149. result = self.check_exit_code() and self.check_stdout() and self.check_stderr()
  150. if result == False:
  151. self.message += "\nCheck for exit code: " + str(self.check_exit_code()) \
  152. + ", gold: " + str(self.exit_code_expected) + ", real: " + str(self.exit_code) + \
  153. "\nCheck for stdout: " + str(self.check_stdout()) + "\nCheck for stderr: " + str(self.check_stderr())
  154. return result
  155. def get_message(self):
  156. return self.message
  157. def get_cmd(self):
  158. return self.cmd
  159. def get_description(self):
  160. return self.get_message() + "\nProcess: " + self.get_cmd() + "\nCWD: " + os.path.join(self.suite_location, self.cwd)
  161. def get_dbfile(self):
  162. return self.dbfile
  163. def get_dbfile_prev(self):
  164. return self.dbfile_prev
  165. class TestCase(unittest.TestCase):
  166. def get_content_paths(self, cwd='sources'):
  167. curframe = inspect.currentframe()
  168. calframe = inspect.getouterframes(curframe, 2)
  169. test_name = calframe[1][3]
  170. suite_name = os.path.splitext(os.path.basename(calframe[1][1]))[0]
  171. group_name = os.path.basename(os.path.dirname(calframe[1][1]))
  172. class ContentPaths(object):
  173. def __init__(self, cwd, dbfile, dbfile_prev):
  174. self.cwd = cwd
  175. self.dbfile = dbfile
  176. self.dbfile_prev = dbfile_prev
  177. return ContentPaths(os.path.join('tests', group_name, suite_name, cwd),
  178. os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], 'tests', group_name, suite_name, test_name + ".db"),
  179. os.path.join(os.environ['METRIXPLUSPLUS_INSTALL_DIR'], 'tests', group_name, suite_name, test_name + ".prev.db"))
  180. def setUp(self):
  181. unittest.TestCase.setUp(self)
  182. logging.basicConfig(format="[TEST-LOG]: %(levelname)s:\t%(message)s", level=logging.WARN)
  183. log_level = os.environ['METRIXPLUSPLUS_LOG_LEVEL']
  184. if log_level == 'ERROR':
  185. log_level = logging.ERROR
  186. elif log_level == 'WARNING':
  187. log_level = logging.WARNING
  188. elif log_level == 'INFO':
  189. log_level = logging.INFO
  190. elif log_level == 'DEBUG':
  191. log_level = logging.DEBUG
  192. else:
  193. raise AssertionError("Unhandled choice of log level")
  194. logging.getLogger().setLevel(log_level)
  195. self.runners = []
  196. def assertExec(self, runner):
  197. # keep reference, so files are not removed during test case time
  198. self.runners.append(runner)
  199. self.assertTrue(runner.check_all(), runner.get_description())
  200. def run(self, result=None):
  201. self.current_result = result # remember result for use in tearDown
  202. unittest.TestCase.run(self, result)
  203. def tearDown(self):
  204. unittest.TestCase.tearDown(self)
  205. if self.current_result.wasSuccessful() == True:
  206. for each in self.runners:
  207. if each.check_all() == True:
  208. if os.path.exists(each.get_dbfile()):
  209. os.unlink(each.get_dbfile())
  210. if os.path.exists(each.get_dbfile_prev()):
  211. os.unlink(each.get_dbfile_prev())