#!/usr/bin/env python3 """svx_keywords.py Python module and wrapper code for extracting survex keywords from a source data file tree. For usage see README.md. Copyright (c) 2023 Patrick B Warren Email: patrickbwarren@gmail.com This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import re, sys from pathlib import Path def svx_encoding(p): '''Try to figure out the character encoding that works for a file''' success = False for encoding in ['utf-8', 'iso-8859-1']: # list of options to try with p.open('r', encoding=encoding) as fp: try: fp.readlines() # we don't need to capture the output here except UnicodeDecodeError: pass else: # if we didn't fail, we found something that works success = True break if not success: raise UnicodeDecodeError(f'Couldnt determine the character encoding for {p}') return encoding # In the following, hook can be a function which accepts the path p # and the context, and returns a line of text (typically, a report). # The returned value is recorded as a postscript either in the # SvxReader class for the initial file open, or in the SvxRecord class # for subsequent file openings. The wrapper code below checks for # such a postscript and prints it out at the appropriate time. This # means the file openings are reported _after_ the relevant *include # statement. def svx_open(p, hook=None, context=[]): '''open a survex file and reset line counter''' if not p.exists(): raise FileNotFoundError(p) encoding = svx_encoding(p) fp = p.open('r', encoding=encoding) postscript = hook(p, context) if hook else '' line_number = 0 return fp, line_number, encoding, postscript def svx_readline(fp, line_number): '''read a line from the survex file and increment line counter''' return fp.readline(), line_number+1 def extract_keyword_arguments(clean, keywords, keyword_char): '''Extract a keyword and arguments from a cleaned up line''' if clean and clean[0] == keyword_char: # detect keyword by presence of keyword character clean_list = clean[1:].split() # drop the keyword char and split on white space for keyword in keywords: # identify the keyword from the list of possible ones if clean_list[0].upper() == keyword: keyword = clean_list[0] # the first entry, preserving case arguments = clean_list[1:] # the rest is the argument break # break out of for loop at this point else: # terminal clause in for loop keyword, arguments = '', [] # the default position else: # line did not start with the keyword character keyword, arguments = '', [] # the default position return keyword, keyword.upper(), arguments class SvxRecord: def __init__(self, p, encoding, line_number, context, line): '''Use this for storing results on a line per line basis''' self.path = p self.encoding = encoding.upper() self.line = line_number self.context = context self.text = line self.postscript = '' # An iterator for iterating over files that can be called in context. # Returns successive lines from the svx source tree, keeping track of # begin and end statements. A stack is used to keep track of the # include files - items on the stack are tuples of file information. # The initial stack entry acts as a sentinel to stop the iteration. class SvxReader: def __init__(self, svx_file, open_hook=None, keyword_char='*', comment_char=';'): '''Instantiate with default properties''' self.keyword_char = keyword_char self.comment_char = comment_char self.open_hook = open_hook self.p = Path(svx_file).with_suffix('.svx') # add the suffix if not already present self.top_level = self.p self.context = [] # keep this as a list self.keywords = set(['INCLUDE', 'BEGIN', 'END']) self.stack = [(None, None, 0, '')] # initialise file stack with a sentinel self.fp, self.line_number, self.encoding, self.postscript = svx_open(self.p, hook=self.open_hook) self.files_visited = 1 def __iter__(self): '''Return an iterator for a top level svx file''' return self def __next__(self): '''Return the next line or stop iteration''' if not self.fp: raise StopIteration self.line, self.line_number = svx_readline(self.fp, self.line_number) # read line and increment the line number counter if not self.line: self.fp.close() # we ran out of lines for the file being currently processed self.p, self.fp, self.line_number, self.encoding = self.stack.pop() # back to the including file return next(self) self.line = self.line.strip() # remove leading and trailing whitespace then remove comments clean = self.line.split(self.comment_char)[0].strip() if self.comment_char in self.line else self.line keyword, uc_keyword, arguments = extract_keyword_arguments(clean, self.keywords, self.keyword_char) # preserving case if uc_keyword == 'BEGIN' and arguments: # add the survex context (assume lower case) self.context.append(arguments[0].lower()) if uc_keyword == 'END' and arguments: # remove the most recent survex context self.context.pop() record = SvxRecord(self.p, self.encoding, self.line_number, self.context, self.line) # before push if uc_keyword == 'INCLUDE': # process an INCLUDE statement self.stack.append((self.p, self.fp, self.line_number, self.encoding)) # push onto stack filename = ' '.join(arguments).strip('"').replace('\\', '/') # remove any quotes and replace backslashes self.p = Path(self.p.parent, filename).with_suffix('.svx') # the new path (add the suffix if not already present) self.fp, self.line_number, self.encoding, record.postscript = svx_open(self.p, hook=self.open_hook, context=self.context) self.files_visited = self.files_visited + 1 return record def __enter__(self): return self def __exit__(self, type, value, traceback): if type == FileNotFoundError: p, fp, line_number, encoding = self.stack.pop() # back to the including file print(f'{p}:{line_number}: {self.line.expandtabs()}') if __name__ == "__main__": # The following are used in colorized strings below and draws on # https://stackoverflow.com/questions/5947742/how-to-change-the-output-color-of-echo-in-linux NC = '\033[0m' RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[0;33m' BLUE = '\033[0;34m' PURPLE = '\033[0;35m' CYAN = '\033[0;36m' import re import argparse keyword_char, comment_char = '*', ';' # for the time being parser = argparse.ArgumentParser(description='Analyze a survex data source tree.') parser.add_argument('svx_file', help='top level survex file (.svx)') parser.add_argument('-d', '--directories', action='store_true', help='absolute file paths instead of relative ones') parser.add_argument('-l', '--list-files', action='store_true', help='trace (output) the files that are visited') parser.add_argument('-k', '--keywords', default=None, help='a set of keywords (comma-separated, case insensitive) to use instead of default') parser.add_argument('-a', '--additional-keywords', default=None, help='a set of keywords (--ditto--) to add to the default') parser.add_argument('-e', '--excluded-keywords', default=None, help='a set of keywords (--ditto--) to exclude from the default') parser.add_argument('-t', '--totals', action='store_true', help='print totals for each keyword') parser.add_argument('-s', '--summarize', action='store_true', help='print a one-line summary') parser.add_argument('-g', '--grep', default=None, help='pattern to match (switch to grep mode)') parser.add_argument('-i', '--ignore-case', action='store_true', help='ignore case (when in grep mode)') parser.add_argument('-n', '--no-ignore-case', action='store_true', help='preserve case (when in keyword mode)') parser.add_argument('-x', '--context', action='store_true', help='include survex context in printed results') parser.add_argument('-y', '--omit-linen', action='store_true', help='omit line numbers in output') parser.add_argument('-c', '--color', action='store_true', help='colorize printed results') parser.add_argument('-q', '--quiet', action='store_true', help='only print errors (in case of -o only)') parser.add_argument('-o', '--output', help='(optional) output to spreadsheet (.ods, .xlsx)') args = parser.parse_args() if args.list_files: def open_hook(p, context): '''hook for tracing which files are being visited''' path = str(p.absolute()) if args.directories else str(p) context = '.'.join(context) if args.context else '' entered = '' # ensure consistency if args.color: context = f'{BLUE}{context}{CYAN}' if context else '' if args.omit_linen: postscript = f'{PURPLE}{path}{CYAN}:{BLUE}{context}:{RED}{entered}{NC}' else: postscript = f'{PURPLE}{path}{CYAN}:{GREEN}0{CYAN}:{BLUE}{context}:{RED}{entered}{NC}' else: if args.omit_linen: postscript = f'{path}:{context}:{entered}' else: postscript = f'{path}:0:{context}:{entered}' return postscript else: open_hook = None if args.grep: # simple grep mode flags = re.IGNORECASE if args.ignore_case else 0 pattern = re.compile(args.grep, flags=flags) no_matches = True with SvxReader(args.svx_file, open_hook=open_hook) as svx_reader: if svx_reader.postscript: # catch the trace of the initial file open print(svx_reader.postscript) for record in svx_reader: match = pattern.search(record.text) if match: no_matches = False match = match.group() record_text = record.text.expandtabs() record_path = str(record.path.absolute()) if args.directories else str(record.path) record_context = '.'.join(record.context) if args.color: context = f'{BLUE}{record_context}{CYAN}' if args.context else '' if args.omit_linen: line = f'{PURPLE}{record_path}{CYAN}:{BLUE}{context}{CYAN}:{NC}{record_text}' else: line = f'{PURPLE}{record_path}{CYAN}:{GREEN}{record.line}{CYAN}:{BLUE}{context}{CYAN}:{NC}{record_text}' line = line.replace(match, f'{RED}{match}{NC}') else: context = record_context if args.context else '' if args.omit_linen: line = f'{record_path}:{context}:{record_text}' else: line = f'{record_path}:{record.line}:{context}:{record_text}' print(line) if record.postscript: print(record.postscript) if no_matches: sys.exit(1) # reproduce what grep returns if there are no matches else: # keyword matching mode if args.keywords: keywords = set(args.keywords.upper().split(',')) else: keywords = set(['INCLUDE', 'BEGIN', 'END']) if args.additional_keywords: to_be_added = set(args.additional_keywords.upper().split(',')) keywords = keywords.union(to_be_added) if args.excluded_keywords: to_be_removed = set(args.excluded_keywords.upper().split(',')) keywords = keywords.difference(to_be_removed) count = dict.fromkeys(keywords, 0) records = [] with SvxReader(args.svx_file, open_hook=open_hook) as svx_reader: if svx_reader.postscript: # catch the trace of the initial file open print(svx_reader.postscript) for record in svx_reader: clean = record.text.split(comment_char)[0].strip() if comment_char in record.text else record.text keyword, uc_keyword, arguments = extract_keyword_arguments(clean, keywords, keyword_char) # preserving case if keyword: record_text = record.text.expandtabs() record_path = str(record.path.absolute()) if args.directories else str(record.path) record_context = '.'.join(record.context) if args.output: arguments = ' '.join(arguments) keyword = keyword if args.no_ignore_case else uc_keyword records.append((record_path, record.encoding, record.line, record_context, keyword, arguments, record_text)) if args.totals or args.summarize or args.output: count[uc_keyword] = count[uc_keyword] + 1 else: if args.color: context = f'{BLUE}{record_context}{CYAN}' if args.context else '' if args.omit_linen: line = f'{PURPLE}{record_path}{CYAN}:{BLUE}{context}{CYAN}:{NC}{record_text}' else: line = f'{PURPLE}{record_path}{CYAN}:{GREEN}{record.line}{CYAN}:{BLUE}{context}{CYAN}:{NC}{record_text}' line = line.replace(keyword, f'{RED}{keyword}{NC}', 1) line = line.replace(keyword_char, f'{RED}{keyword_char}{NC}', 1) line = line.replace(f'{NC}{RED}', f'{RED}') # simplify else: context = record_context if args.context else '' if args.omit_linen: line = f'{record_path}:{context}:{record_text}' else: line = f'{record_path}:{record.line}:{context}:{record_text}' print(line) if record.postscript: print(record.postscript) top_level = str(svx_reader.top_level.absolute()) if args.directories else str(svx_reader.top_level) files_visited = f'{svx_reader.files_visited} files visited' if args.totals: for keyword in count: if args.color: summary = f'{PURPLE}{top_level}{CYAN}:{RED}{keyword}{CYAN}:{NC} {count[keyword]} records found ({files_visited})' else: summary = f'{top_level}:{keyword}: {count[keyword]} records found ({files_visited})' print(summary) if args.summarize or (args.output and not args.quiet): keyword_list = '|'.join(sorted(keywords)) tot_count = sum(count.values()) if args.color: summary = f'{PURPLE}{top_level}{CYAN}:{RED}{keyword_list}{CYAN}:{NC} {tot_count} records found ({files_visited})' else: summary = f'{top_level}:{keyword_list}: {tot_count} records found ({files_visited})' print(summary) if args.output: import pandas as pd schema = {'path':str, 'encoding':str, 'line':int, 'context':str, 'keyword':str, 'argument':str, 'full':str} df = pd.DataFrame(records, columns=schema.keys()).astype(schema) df.to_excel(args.output, index=False) if not args.quiet: print(f'Dataframe ({len(df.columns)} columns, {len(df)} rows) written to {args.output}')