123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- #
- # Metrix++, Copyright 2009-2019, Metrix++ Project
- # Link: https://github.com/metrixplusplus/metrixplusplus
- #
- # This file is a part of Metrix++ Tool.
- #
- from metrixpp.mpp import api
- import re
- import os
- import sys
- import logging
- import time
- import binascii
- import fnmatch
- import multiprocessing.pool
- class Plugin(api.Plugin, api.Parent, api.IConfigurable, api.IRunable):
- def __init__(self):
- self.reader = DirectoryReader()
- self.include_rules = []
- self.exclude_rules = []
- self.exclude_dir_rules = []
- self.exclude_files = []
- self.parsers = []
- super(Plugin, self).__init__()
- def declare_configuration(self, parser):
- parser.add_option("--std.general.proctime", "--sgpt", action="store_true", default=False,
- help="If the option is set (True), the tool measures processing time per file [default: %default]")
- parser.add_option("--std.general.procerrors", "--sgpe", action="store_true", default=False,
- help="If the option is set (True), the tool counts number of processing/parsing errors per file [default: %default]")
- parser.add_option("--std.general.size", "--sgs", action="store_true", default=False,
- help="If the option is set (True), the tool collects file size metric (in bytes) [default: %default]")
- parser.add_option("--include-files", "--if", action='append',
- help="Adds a regular expression pattern to include files in processing (files have to match any rule to be included)")
- parser.add_option("--exclude-files", "--ef", action='append',
- help="Adds a regular expression pattern to exclude files or directories by name from processing")
- parser.add_option("--exclude-directories", "--ed", action='append',
- help="Adds a regular expression pattern to exclude directories by path from processing")
- parser.add_option("--non-recursively", "--nr", action="store_true", default=False,
- help="If the option is set (True), sub-directories are not processed [default: %default]")
- self.optparser = parser
- def configure(self, options):
- self.is_proctime_enabled = options.__dict__['std.general.proctime']
- self.is_procerrors_enabled = options.__dict__['std.general.procerrors']
- self.is_size_enabled = options.__dict__['std.general.size']
- # check if any include rule is given
- if options.__dict__['include_files']:
- try:
- for include_rule in options.__dict__['include_files']:
- self.add_include_rule(re.compile(include_rule))
- except Exception as e:
- self.optparser.error("option --include-files: " + str(e))
- else:
- self.add_include_rule(re.compile(r'.*'))
- # check if any exclude rule is given
- if options.__dict__['exclude_files']:
- try:
- for exclude_rule in options.__dict__['exclude_files']:
- self.add_exclude_rule(re.compile(exclude_rule))
- except Exception as e:
- self.optparser.error("option --exclude-files: " + str(e))
- else:
- self.add_exclude_rule(re.compile(r'^[.]'))
- # check if any exclude dir rule is given
- if options.__dict__['exclude_directories']:
- try:
- for exclude_dir_rule in options.__dict__['exclude_directories']:
- self.add_exclude_dir_rule(re.compile(exclude_dir_rule))
- except Exception as e:
- self.optparser.error("option --exclude-directories: " + str(e))
- self.non_recursively = options.__dict__['non_recursively']
- def initialize(self):
- fields = []
- if self.is_proctime_enabled == True:
- fields.append(self.Field('proctime', float))
- if self.is_procerrors_enabled == True:
- fields.append(self.Field('procerrors', int))
- if self.is_size_enabled == True:
- fields.append(self.Field('size', int))
- super(Plugin, self).initialize(namespace='std.general', support_regions=False, fields=fields)
- self.add_exclude_file(self.get_plugin('metrixpp.mpp.dbf').get_dbfile_path())
- self.add_exclude_file(self.get_plugin('metrixpp.mpp.dbf').get_dbfile_prev_path())
- def run(self, args):
- if len(args) == 0:
- return self.reader.run(self, "./")
- retcode = 0
- for directory in args:
- retcode += self.reader.run(self, directory)
- return retcode
- def register_parser(self, fnmatch_exp_list, parser):
- self.parsers.append((fnmatch_exp_list, parser))
- def get_parser(self, file_path):
- for parser in self.parsers:
- for fnmatch_exp in parser[0]:
- if fnmatch.fnmatch(file_path, fnmatch_exp):
- return parser[1]
- return None
- def add_include_rule(self, re_compiled_pattern):
- self.include_rules.append(re_compiled_pattern)
- def add_exclude_rule(self, re_compiled_pattern):
- self.exclude_rules.append(re_compiled_pattern)
- def add_exclude_dir_rule(self, re_compiled_pattern):
- self.exclude_dir_rules.append(re_compiled_pattern)
- def add_exclude_file(self, file_path):
- if file_path == None:
- return
- self.exclude_files.append(file_path)
- def is_file_excluded(self, file_name):
- # only apply the include rules to files - skip directories
- if os.path.isfile(file_name):
- for each in self.include_rules:
- if re.match(each, os.path.basename(file_name)) != None:
- break
- # file is excluded if no include rule matches
- else:
- logging.info("Excluding: " + file_name + " - not included by any rule")
- return True
- # check exclude dir rules for directories
- if os.path.isdir(file_name):
- for each in self.exclude_dir_rules:
- if re.match(each, file_name) != None:
- logging.info("Excluding: " + file_name + " - excluded by rule '" + each.pattern + "'")
- return True
- # check exclude rules for both, files and directories
- for each in self.exclude_rules:
- if re.match(each, os.path.basename(file_name)) != None:
- logging.info("Excluding: " + file_name + " - excluded by rule '" + each.pattern + "'")
- return True
- # finally check if a file is excluded directly
- for each in self.exclude_files:
- if os.path.basename(each) == os.path.basename(file_name):
- if os.stat(each) == os.stat(file_name):
- return True
- return False
- class DirectoryReader():
- def readtextfile(self,filename):
- """ Read a text file and try to detect the coding
- Since we examine program code text files we can assume the following:
- - There are no NUL characters, i.e. no 0x00 sequences of 1, 2 or 4
- byte, starting on 1, 2 or 4 byte boundaries (depending on
- 1, 2 or 4 byte coding)
- - There should at least one space (ASCII 0x20) char
- of the respective length (1,2 or 4 byte))
- - Program code consists of only ASCII chars, i.e. code < 128
- - Non ASCII chars should appear in string literals and comments only
- Though especially in the case of an 8 bit coding it does not matter
- which code page to use: Metric analysis is done on program code
- which is pure ASCII; string literals and comments are only recognized
- as such but not interpreted, though it doesn't matter if they contain
- non-ASCII chars whichever code page is used.
- Note the decoder's different behavior for the "utf_nn" identifiers:
- - .decode("utf_32") / .decode("utf_16"): preceding BOM is skipped
- - with suffix ".._be" or ".._le" respectively: preceding BOM is preserved
- but
- - .decode("utf_8"): preceding BOM is preserved
- - .decode("utf_8_sig"): preceding BOM is skipped
- """
- # Methods to check for various UTF variants without BOM:
- # Since UTF16/32 codings are recommended to use a BOM these methods
- # shouldn't be necessary but may be useful in certain cases.
- def checkforUTF32_BE(a):
- if ( (len(a) % 4) != 0 ): return False
- n = a.find(b'\x00\x00\x00\x20')
- return (n >= 0) and ((n % 4) == 0)
- def checkforUTF32_LE(a):
- if ( (len(a) % 4) != 0 ): return False
- n = a.find(b'\x20\x00\x00\x00')
- return (n >= 0) and ((n % 4) == 0)
- def checkforUTF16_BE(a):
- if ( (len(a) % 2) != 0 ): return False
- n = a.find(b'\x00\x20')
- return (n >= 0) and ((n % 2) == 0)
- def checkforUTF16_LE(a):
- if ( (len(a) % 2) != 0 ): return False
- n = a.find(b'\x20\x00')
- return (n >= 0) and ((n % 2) == 0)
- # Method to check for UTF8 without BOM:
- # "a" is the textfile represented as a simple byte array!
- # Find first char with code > 127:
- #
- # 1 nothing found: all bytes 0..127; in this case "a" only consists
- # of ASCII chars but this may also be treated as valid UTF8 coding
- #
- # 2 Code is a valid UTF8 leading byte: 176..271
- # then check subsequent bytes to be UTF8 extension bytes: 128..175
- # Does also do some additional plausibility checks:
- # If a valid UTF8 byte sequence is found
- # - the subsequent byte (after the UTF8 sequence) must be an ASCII
- # - or another UTF8 leading byte (in the latter case we assume that there
- # are following the appropriate number of UTF8 extension bytes..)
- # Note that these checks don't guarantee the text is really UTF8 encoded:
- # If a valid UTF8 sequence is found but in fact the text is some sort
- # of 8 bit OEM coding this may be coincidentally a sequence of 8 bit
- # OEM chars. This indeed seems very unlikely but may happen...
- # Even though the whole text would examined for UTF8 sequences: every
- # valid UTF8 sequence found may also be a sequence of OEM chars!
- #
- # 3 Code is not a valid UTF8 leading byte: 128..175 or 272..255
- # In this case coding is some sort of 8 bit OEM coding. Since we don't
- # know the OEM code page the file was written with, we assume "latin_1"
- # (is mostly the same as ANSI but "ansi" isn't available on Python 2)
- #
- # return suggested text coding: "ascii","utf_8" or "latin_1" (resp. default)
- def checkforUTF8(a,default="latin_1"):
- # Since "a" is a string array on Python 2 we use a special ORD function:
- # Convert c to its byte representation if it is a character
- # Works for Python 2+3
- def ORD(c): return ord(c) if (type(c) == str) else c
- L = len(a)
- n = 0
- while ( (n < L) and (ORD(a[n]) < 128) ): # (a[n] < ExtASCII) ):
- n = n+1
- if ( n >= L ): # all chars < 128: ASCII coding
- return "ascii" # but may also be treated as UTF8!
- w = a[n]
- # UTF8 two byte sequence: leading byte + 1 extension byte
- if ORD(w) in range(192,224):
- if ( (n+1 < L)
- and (ORD(a[n+1]) in range(128,192)) # valid UTF8 extension byte
- ):
- if ((n+2 == L) # w is last character
- or (ORD(a[n+2]) < 128) # or next byte is an ASCII char
- or (ORD(a[n+2]) in range(192,244)) # or next byte is an UTF8 leading byte
- ):
- return "utf_8"
- return default
- # UTF8 three byte sequence: leading byte + 2 extension bytes
- if ORD(w) in range(224,240):
- if ( (n+2 < L)
- and (ORD(a[n+1]) in range(128,192)) # 2 valid UTF8 extension bytes
- and (ORD(a[n+2]) in range(128,192))
- ):
- if ((n+3 == L) # w is last character
- or (ORD(a[n+3]) < 128) # or next byte is ASCII char
- or (ORD(a[n+3]) in range(192,244)) # or next byte is UTF8 leading byte
- ):
- return "utf_8"
- return default
- # UTF8 four byte sequence: leading byte + 3 extension bytes
- if ORD(w) in range(240,244):
- if ( (n+3 < L)
- and (ORD(a[n+1]) in range(128,192)) # 3 valid UTF8 extension bytes
- and (ORD(a[n+2]) in range(128,192))
- and (ORD(a[n+3]) in range(128,192))
- ):
- if ((n+4 == L) # w is last character
- or (ORD(a[n+4]) < 128) # or next byte is ASCII char
- or (ORD(a[n+4]) in range(192,244)) # or next byte is UTF8 leading byte
- ):
- return "utf_8"
- return default
- # no valid UTF8 byte sequence:
- return default
- # end of checkforUTF8 ------------------------------------------------
- # ----------------------------------------------------------------------
- # Subroutine readtextfile
- # open as binary and try to guess the encoding
- # attention:
- # - Phyton 3: "a" is a binary array
- # - Python 2: "a" is string array!
- # ----------------------------------------------------------------------
- f = open(filename, 'rb')
- a = f.read()
- f.close()
- # check for codings with BOM:
- # Consider the order: Check for UTF32 first!
- if (a.startswith(b'\xff\xfe\x00\x00')
- or a.startswith(b'\x00\x00\xfe\xff')):
- coding = "utf_32" # no suffix _be/_le --> decoder skips the BOM
- elif (a.startswith(b'\xff\xfe')
- or a.startswith(b'\xfe\xff')):
- coding = "utf_16" # no suffix _be/_le --> decoder skips the BOM
- elif a.startswith(b'\xef\xbb\xbf'):
- coding = "utf_8_sig"
- # elif: there are some other codings with BOM - feel free to add them here
- # check for UTF variants without BOM:
- # Consider the order: Check for UTF32 first!
- elif checkforUTF32_BE(a):
- coding = "utf_32_be"
- elif checkforUTF32_LE(a):
- coding = "utf_32_le"
- elif checkforUTF16_BE(a):
- coding = "utf_16_be"
- elif checkforUTF16_LE(a):
- coding = "utf_16_le"
- # So finally we only have to look for UTF8 without BOM:
- else:
- coding = checkforUTF8(a)
- # decode to text with found coding; since our guess may be wrong
- # we replace unknown chars to avoid errors. Cause we examine program code
- # files (i.e. true program code should only consist of ASCII chars) these
- # replacements only should affect string literals and comments and should
- # have no effect on metric analysis.
- text = a.decode(coding,'replace')
- # Finally replace possible line break variants with \n:
- # todo: replace with a regex
- text = text.replace("\r\n","\n")
- text = text.replace("\r","\n")
- return text
- # end of readtextfile --------------------------------------------------
- def run(self, plugin, directory):
- IS_TEST_MODE = False
- if 'METRIXPLUSPLUS_TEST_MODE' in list(os.environ.keys()):
- IS_TEST_MODE = True
- def run_per_file(plugin, fname, full_path):
- exit_code = 0
- norm_path = re.sub(r'''[\\]''', "/", full_path)
- if os.path.isabs(norm_path) == False and norm_path.startswith('./') == False:
- norm_path = './' + norm_path
- if plugin.is_file_excluded(norm_path) == False:
- if os.path.isdir(full_path):
- if plugin.non_recursively == False:
- exit_code += run_recursively(plugin, full_path)
- else:
- parser = plugin.get_parser(full_path)
- if parser == None:
- logging.info("Skipping: " + norm_path)
- else:
- logging.info("Processing: " + norm_path)
- ts = time.time()
- text = self.readtextfile(full_path)
- #text = self.readfile_org(full_path)
- checksum = binascii.crc32(text.encode('utf8')) & 0xffffffff # to match python 3
- db_loader = plugin.get_plugin('metrixpp.mpp.dbf').get_loader()
- (data, is_updated) = db_loader.create_file_data(norm_path, checksum, text)
- procerrors = parser.process(plugin, data, is_updated)
- if plugin.is_proctime_enabled == True:
- data.set_data('std.general', 'proctime',
- (time.time() - ts) if IS_TEST_MODE == False else 0.01)
- if plugin.is_procerrors_enabled == True and procerrors != None and procerrors != 0:
- data.set_data('std.general', 'procerrors', procerrors)
- if plugin.is_size_enabled == True:
- data.set_data('std.general', 'size', len(text))
- db_loader.save_file_data(data)
- #logging.debug("-" * 60)
- exit_code += procerrors
- return exit_code
- #thread_pool = multiprocessing.pool.ThreadPool()
- #def mp_worker(args):
- # run_per_file(args[0], args[1], args[2])
- def run_recursively(plugin, directory):
- exit_code = 0
- #thread_pool.map(mp_worker,
- # [(plugin, f, os.path.join(subdir, f))
- # for subdir, dirs, files in os.walk(directory) for f in files])
- for fname in sorted(os.listdir(directory)):
- full_path = os.path.join(directory, fname)
- exit_code += run_per_file(plugin, fname, full_path)
- return exit_code
- if os.path.exists(directory) == False:
- logging.error("Skipping (does not exist): " + directory)
- return 1
- if os.path.isdir(directory):
- total_errors = run_recursively(plugin, directory)
- else:
- total_errors = run_per_file(plugin, os.path.basename(directory), directory)
- total_errors = total_errors # used, warnings are per file if not zero
- return 0 # ignore errors, collection is successful anyway
|