From b671cd017bbaa5509ec885cea534bc0a03acbc92 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 19 May 2026 12:11:05 +0200 Subject: [PATCH 1/8] Add timestamps to InfraScan CLI logs Added a logging function to print messages with timestamps, enhancing the visibility of the scanning process. Updated the main function to utilize this logging for various stages of execution. --- cli.py | 104 +++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 28 deletions(-) diff --git a/cli.py b/cli.py index d11576b..552b045 100755 --- a/cli.py +++ b/cli.py @@ -18,6 +18,18 @@ def init(*args, **kwargs): pass from scanner.parser import scan_directory from reporter.grading import ReportGenerator from reporter.html_generator import generate_standalone_html +from datetime import datetime + + +def log_with_timestamp(message: str) -> None: + """ + Print message prefixed with current timestamp. + + Example: + [2026-10-10 16:23:34] Starting scan... + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}", flush=True) __version__ = "1.0.6" @@ -282,27 +294,38 @@ def should_fail(args, report_dict, results): def main(): load_dotenv() args = setup_args() - + target_path = os.path.abspath(args.path) - + if not os.path.exists(target_path): print(f"Error: Path '{target_path}' does not exist.", file=sys.stderr) sys.exit(1) - + try: if args.format == 'text': - print(f"Analyzing {target_path} with '{args.scanner}' scanner...") - + log_with_timestamp( + f"Analyzing {target_path} with '{args.scanner}' scanner..." + ) + # Run Scanners + log_with_timestamp("Starting directory scan") + results, resource_count, recommendations = scan_directory( - target_path, + target_path, scanner_type=args.scanner, framework=args.framework, download_external_modules=args.download_external_modules, included_paths=args.include ) - + + log_with_timestamp( + f"Directory scan completed. Found {len(results)} findings " + f"in {resource_count} resources." + ) + # Generate Report + log_with_timestamp("Generating report") + report_generator = ReportGenerator() report = report_generator.generate_report( findings=results, @@ -310,16 +333,20 @@ def main(): scanner_type=args.scanner, extra_recommendations=recommendations ) - + + log_with_timestamp("Report generation completed") + report_dict = report.to_dict() report_dict['results'] = results report_dict['summary'] = { 'total': len(results), 'scanner_used': args.scanner } - + # Output Results to file/stdout if args.out: + log_with_timestamp(f"Saving report to {args.out}") + if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) @@ -327,46 +354,62 @@ def main(): html_output = generate_standalone_html(report_dict) with open(args.out, 'w', encoding='utf-8') as f: f.write(html_output) - else: # text format - # Default behavior for text mode with --out is to save JSON results + else: with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) - # Handle console output + # Console output if args.format == 'json' and not args.out: print(json.dumps(report_dict, indent=2)) elif args.format == 'html' and not args.out: print(generate_standalone_html(report_dict)) else: - # If format is text OR if output is saved to record/html/json - # always show the text summary in the console print_text_report(report_dict, resource_count, args.scanner) if args.out: - print(f"{Fore.GREEN}[v] Full {args.format.upper()} report saved to: {Fore.WHITE}{args.out}") - - # Send Slack notification if configured + print( + f"{Fore.GREEN}[v] Full {args.format.upper()} " + f"report saved to: {Fore.WHITE}{args.out}" + ) + + # Slack notification webhook_url = os.getenv('SLACK_WEBHOOK_URL', '').strip() if webhook_url: + log_with_timestamp("Sending Slack notification") + overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) security = report_dict.get('security', {}) container = report_dict.get('container', {}) total_findings = len(results) - overall_grade = overall.get('letter', '?') if overall else '?' - overall_pct = overall.get('percentage', 0) if overall else 0 + overall_grade = overall.get('letter', '?') + overall_pct = overall.get('percentage', 0) + + grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] - grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] if cost and cost.get('max_score', 0) > 0: - grades_parts.append(f"Cost {cost.get('letter','?')} ({cost.get('percentage',0)}%)") + grades_parts.append( + f"Cost {cost.get('letter', '?')} " + f"({cost.get('percentage', 0)}%)" + ) + if security and security.get('max_score', 0) > 0: - grades_parts.append(f"Security {security.get('letter','?')} ({security.get('percentage',0)}%)") + grades_parts.append( + f"Security {security.get('letter', '?')} " + f"({security.get('percentage', 0)}%)" + ) + if container and container.get('max_score', 0) > 0: - grades_parts.append(f"Containers {container.get('letter','?')} ({container.get('percentage',0)}%)") + grades_parts.append( + f"Containers {container.get('letter', '?')} " + f"({container.get('percentage', 0)}%)" + ) + grades_summary = " | ".join(grades_parts) ctx = build_gh_actions_context() lines = ["🤖 InfraScan used in *GitHub Actions*"] + if ctx['repo']: lines.append(f"Repo: *{ctx['repo']}*") if ctx['branch']: @@ -375,25 +418,30 @@ def main(): lines.append(f"Workflow: _{ctx['workflow']}_") if ctx['actor']: lines.append(f"Triggered by: {ctx['actor']}") + lines.append(f"Grades: {grades_summary}") lines.append(f"Findings: {total_findings} | Scanner: {args.scanner}") + if ctx['run_url']: lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) # Determine Exit Code + log_with_timestamp("Evaluating fail conditions") + if should_fail(args, report_dict, results): sys.exit(1) - + + log_with_timestamp("InfraScan completed successfully") sys.exit(0) - + except Exception as e: + log_with_timestamp(f"ERROR: {e}") print(f"An error occurred during scanning: {e}", file=sys.stderr) + if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback traceback.print_exc() - sys.exit(1) -if __name__ == "__main__": - main() + sys.exit(1) From 84ccf06d1db792e3dc3ff99b9df796858649dac8 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 19 May 2026 15:21:53 +0200 Subject: [PATCH 2/8] Address review feedback: reorganize log_with_timestamp, use logging module, restore main check --- cli.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/cli.py b/cli.py index 552b045..3362b92 100755 --- a/cli.py +++ b/cli.py @@ -18,18 +18,6 @@ def init(*args, **kwargs): pass from scanner.parser import scan_directory from reporter.grading import ReportGenerator from reporter.html_generator import generate_standalone_html -from datetime import datetime - - -def log_with_timestamp(message: str) -> None: - """ - Print message prefixed with current timestamp. - - Example: - [2026-10-10 16:23:34] Starting scan... - """ - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}", flush=True) __version__ = "1.0.6" @@ -291,6 +279,17 @@ def should_fail(args, report_dict, results): return False +def log_with_timestamp(message: str) -> None: + """ + Log message with current timestamp using the logging module. + + Example: + [2026-10-10 16:23:34] Starting scan... + """ + from datetime import datetime + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}", flush=True) + def main(): load_dotenv() args = setup_args() @@ -445,3 +444,6 @@ def main(): traceback.print_exc() sys.exit(1) + +if __name__ == "__main__": + main() From 0743293528296cf821404df07c0115a27160f002 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 19 May 2026 15:45:22 +0200 Subject: [PATCH 3/8] Refactor logging to use logger with INFO level Updated logging configuration to use INFO level and added timestamps to log messages. Replaced print statements with logger calls for error handling and notifications. --- cli.py | 120 +++++++++++++++++++++++++++------------------------------ 1 file changed, 57 insertions(+), 63 deletions(-) diff --git a/cli.py b/cli.py index 3362b92..7b093a3 100755 --- a/cli.py +++ b/cli.py @@ -22,7 +22,13 @@ def init(*args, **kwargs): pass __version__ = "1.0.6" # Setup basic logging -logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(message)s') +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + +logger = logging.getLogger(__name__) def send_slack_notification(message: str) -> None: """Send a Slack notification via webhook URL from environment variable.""" @@ -32,9 +38,9 @@ def send_slack_notification(message: str) -> None: try: response = requests.post(webhook_url, json={'text': message}, timeout=5) if response.status_code >= 400: - print(f"Slack notification failed: {response.status_code} - {response.text}", file=sys.stderr) + logger.error(f"Slack notification failed: {response.status_code} - {response.text}") except Exception as e: - print(f"Slack notification error: {e}", file=sys.stderr) + logger.error(f"Slack notification error: {e}") def build_gh_actions_context() -> dict: """Extract GitHub Actions context from environment variables.""" @@ -57,39 +63,39 @@ def setup_args(): parser = argparse.ArgumentParser( description="InfraScan CLI - Open Source IaC Cost & Security Scanner" ) - + parser.add_argument( "path", nargs="?", default="/scan", help="Path to the directory to scan (default: /scan when using Docker, or '.' for local use)" ) - + parser.add_argument( "--scanner", default="comprehensive", help="Scanner type(s) to run (default: comprehensive). Support multiple scanners separated by comma (e.g., 'regex,containers'). Options: regex, checkov, containers, comprehensive" ) - + parser.add_argument( "--format", choices=["text", "json", "html"], default="text", help="Output format (default: text)" ) - + parser.add_argument( "--out", help="File path to save JSON output explicitly (e.g., infrascan-report.json)" ) - + parser.add_argument( "--fail-on", choices=["any", "high_critical", "grade_a", "grade_b", "grade_c", "grade_d", "grade_f", "priority_critical", "priority_high", "priority_medium", "priority_low", "priority_info"], help="Exit with error code 1 if findings match criteria (any findings, high/critical findings, grade threshold, or priority threshold)" ) - + parser.add_argument( "--download-external-modules", action="store_true", @@ -109,39 +115,39 @@ def setup_args(): dest="include", help="Select specific files or directories to scan. Can be used multiple times." ) - + parser.add_argument( "--version", action="version", version=f"InfraScan v{__version__}", help="Show version information and exit" ) - + return parser.parse_args() def print_text_report(report_dict, resource_count, scanner_type): # Initialize colorama init(autoreset=True) - + overall = report_dict.get('overall', {}) findings_dict = report_dict.get('findings', {}) results = findings_dict.get('all', report_dict.get('results', [])) - + # Header print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") print(f"{Fore.CYAN}{Style.BRIGHT} InfraScan Report - {scanner_type.upper()} SCAN") print(f"{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") - + # Summary Info target_path = os.path.abspath(sys.argv[1] if len(sys.argv) > 1 and not sys.argv[1].startswith('--') else '.') print(f"{Style.BRIGHT}Path Scanned :{Style.RESET_ALL} {target_path}") print(f"{Style.BRIGHT}Resources Found :{Style.RESET_ALL} {resource_count}") print(f"{Style.BRIGHT}Total Findings :{Style.RESET_ALL} {len(results)}") - + # Grades Section print(f"\n{Style.BRIGHT}GRADING SUMMARY:") print(f"{'-' * 30}") - + def get_grade_color(letter): if letter == 'A': return Fore.GREEN if letter == 'B': return Fore.GREEN @@ -152,11 +158,11 @@ def get_grade_color(letter): def print_grade_line(name, grade): if not grade or (grade.get('max_score', 0) == 0 and grade.get('letter') != 'A'): return - + letter = grade.get('letter', '?') percentage = grade.get('percentage', 0) color = get_grade_color(letter) - + breakdown = grade.get('severity_breakdown', {}) counts = [ f"{Fore.RED}Crit:{breakdown.get('critical', 0)}{Style.RESET_ALL}", @@ -168,13 +174,13 @@ def print_grade_line(name, grade): print(f"{name:18}: {color}{Style.BRIGHT}{letter}{Style.RESET_ALL} ({percentage}%){br_str}") print_grade_line("Overall Health", overall) - + if scanner_type in ['regex', 'comprehensive']: print_grade_line("Cost Efficiency", report_dict.get('cost')) - + if scanner_type in ['checkov', 'comprehensive']: print_grade_line("IaC Security", report_dict.get('security')) - + if scanner_type in ['containers', 'comprehensive']: print_grade_line("Container Security", report_dict.get('container')) @@ -184,12 +190,12 @@ def print_grade_line(name, grade): print(f"\n{Fore.GREEN}{Style.BRIGHT}RECOMMENDATIONS:") for rec in recs: print(f" {Fore.GREEN}• {Style.BRIGHT}{rec}") - + # Findings Details if results: print(f"\n{Style.BRIGHT}FINDINGS DETAILS:") print(f"{'=' * 60}") - + # Categorize findings categories = [] if findings_dict.get('cost'): @@ -198,97 +204,86 @@ def print_grade_line(name, grade): categories.append(('IaC Security', findings_dict['security'])) if findings_dict.get('container'): categories.append(('Container Security', findings_dict['container'])) - + if not categories: categories = [('General Findings', results)] for cat_name, cat_findings in categories: if not cat_findings: continue - + print(f"\n{Style.BRIGHT}>>> {cat_name} ({len(cat_findings)})") - + # Limit display to 40 findings to avoid overwhelming CI logs display_limit = 40 for i, res in enumerate(cat_findings): if i >= display_limit: print(f"\n {Fore.YELLOW}... and {len(cat_findings) - display_limit} more findings (see full report for details)") break - + severity = res.get('severity', 'UNKNOWN').upper() sev_color = Fore.WHITE if severity == 'CRITICAL': sev_color = Fore.RED + Style.BRIGHT elif severity == 'HIGH': sev_color = Fore.RED elif severity == 'MEDIUM': sev_color = Fore.YELLOW elif severity == 'LOW': sev_color = Fore.CYAN - + rule_id = res.get('rule_id', 'N/A') file_path = res.get('file', 'Unknown') line_str = f":{res.get('line')}" if res.get('line') else "" - + print(f" {sev_color}[{severity}]{Style.RESET_ALL} {Style.BRIGHT}{rule_id}{Style.RESET_ALL}: {res.get('description', '')}") print(f" {Fore.WHITE}at {file_path}{line_str}{Style.RESET_ALL}") if res.get('resource'): print(f" {Fore.WHITE}resource: {res.get('resource')}{Style.RESET_ALL}") - + print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}\n") def should_fail(args, report_dict, results): if not args.fail_on: return False - + if args.fail_on == 'any' and len(results) > 0: print("\n[ERROR] Build failed: Findings detected and --fail-on=any specified.", file=sys.stderr) return True - + if args.fail_on == 'high_critical': critical_high_count = sum(1 for r in results if r.get('severity', '').lower() in ['critical', 'high']) if critical_high_count > 0: print(f"\n[ERROR] Build failed: {critical_high_count} high/critical findings detected and --fail-on=high_critical specified.", file=sys.stderr) return True - + if args.fail_on.startswith('grade_'): grade_order = ['A', 'B', 'C', 'D', 'F'] fail_grade = args.fail_on.split('_')[1].upper() overall_letter = report_dict.get('overall', {}).get('letter', 'A') - + try: fail_idx = grade_order.index(fail_grade) current_idx = grade_order.index(overall_letter) - + if current_idx >= fail_idx: print(f"\n[ERROR] Build failed: Overall grade is {overall_letter} and --fail-on={args.fail_on} specified (threshold: {fail_grade} or worse).", file=sys.stderr) return True except ValueError: pass # Should not happen due to argparse choices - + if args.fail_on.startswith('priority_'): severity_weights = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1, 'info': 0.5} fail_priority = args.fail_on.split('_')[1] threshold_weight = severity_weights.get(fail_priority, 0) - + findings_at_or_above = [ - r for r in results + r for r in results if severity_weights.get(r.get('severity', 'info').lower(), 0.5) >= threshold_weight ] - + if findings_at_or_above: print(f"\n[ERROR] Build failed: {len(findings_at_or_above)} findings with priority {fail_priority} or higher detected and --fail-on={args.fail_on} specified.", file=sys.stderr) return True - - return False -def log_with_timestamp(message: str) -> None: - """ - Log message with current timestamp using the logging module. - - Example: - [2026-10-10 16:23:34] Starting scan... - """ - from datetime import datetime - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}", flush=True) + return False def main(): load_dotenv() @@ -297,17 +292,17 @@ def main(): target_path = os.path.abspath(args.path) if not os.path.exists(target_path): - print(f"Error: Path '{target_path}' does not exist.", file=sys.stderr) + logger.error(f"Path '{target_path}' does not exist.") sys.exit(1) try: if args.format == 'text': - log_with_timestamp( + logger.info( f"Analyzing {target_path} with '{args.scanner}' scanner..." ) # Run Scanners - log_with_timestamp("Starting directory scan") + logger.info("Starting directory scan") results, resource_count, recommendations = scan_directory( target_path, @@ -317,13 +312,13 @@ def main(): included_paths=args.include ) - log_with_timestamp( + logger.info( f"Directory scan completed. Found {len(results)} findings " f"in {resource_count} resources." ) # Generate Report - log_with_timestamp("Generating report") + logger.info("Generating report") report_generator = ReportGenerator() report = report_generator.generate_report( @@ -333,7 +328,7 @@ def main(): extra_recommendations=recommendations ) - log_with_timestamp("Report generation completed") + logger.info("Report generation completed") report_dict = report.to_dict() report_dict['results'] = results @@ -344,7 +339,7 @@ def main(): # Output Results to file/stdout if args.out: - log_with_timestamp(f"Saving report to {args.out}") + logger.info(f"Saving report to {args.out}") if args.format == 'json': with open(args.out, 'w') as f: @@ -373,7 +368,7 @@ def main(): # Slack notification webhook_url = os.getenv('SLACK_WEBHOOK_URL', '').strip() if webhook_url: - log_with_timestamp("Sending Slack notification") + logger.info("Sending Slack notification") overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) @@ -427,17 +422,16 @@ def main(): send_slack_notification(" | ".join(lines)) # Determine Exit Code - log_with_timestamp("Evaluating fail conditions") + logger.info("Evaluating fail conditions") if should_fail(args, report_dict, results): sys.exit(1) - log_with_timestamp("InfraScan completed successfully") + logger.info("InfraScan completed successfully") sys.exit(0) except Exception as e: - log_with_timestamp(f"ERROR: {e}") - print(f"An error occurred during scanning: {e}", file=sys.stderr) + logger.error(f"ERROR: {e}") if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback From ffd60651551fd409d3003b6b4c0b3e85f8f75904 Mon Sep 17 00:00:00 2001 From: Natan Date: Mon, 25 May 2026 08:37:34 +0200 Subject: [PATCH 4/8] Add timestamped logging and execution duration metrics Add timestamped logging and execution duration metrics to InfraScan CLI. This update improves log visibility during long-running scans by adding timestamps to log output and tracking execution time for key operations such as: * directory scanning * report generation * report saving * Slack notifications * total CLI execution The goal is to make GitHub Actions and CI logs easier to debug and help identify performance bottlenecks during large scans. --- cli.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/cli.py b/cli.py index 7b093a3..f1e4c59 100755 --- a/cli.py +++ b/cli.py @@ -5,6 +5,7 @@ import json import logging import requests +import time try: from colorama import Fore, Style, init except ImportError: @@ -21,11 +22,13 @@ def init(*args, **kwargs): pass __version__ = "1.0.6" -# Setup basic logging +# Setup logging with timestamps logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stdout, + force=True ) logger = logging.getLogger(__name__) @@ -105,7 +108,7 @@ def setup_args(): parser.add_argument( "--framework", default="auto", - choices=["auto", "terraform", "kubernetes", "cloudformation", "helm"], + choices=["auto", "terraform", "kubernetes", "cloudformation", "helm", "all"], help="IaC framework type (default: auto-detect)" ) @@ -286,6 +289,8 @@ def should_fail(args, report_dict, results): return False def main(): + total_start = time.time() + load_dotenv() args = setup_args() @@ -303,7 +308,9 @@ def main(): # Run Scanners logger.info("Starting directory scan") - + + scan_start = time.time() + results, resource_count, recommendations = scan_directory( target_path, scanner_type=args.scanner, @@ -311,15 +318,19 @@ def main(): download_external_modules=args.download_external_modules, included_paths=args.include ) - + + scan_duration = time.time() - scan_start + logger.info( - f"Directory scan completed. Found {len(results)} findings " - f"in {resource_count} resources." + f"Directory scan completed in {scan_duration:.2f}s. " + f"Found {len(results)} findings in {resource_count} resources." ) # Generate Report logger.info("Generating report") + report_start = time.time() + report_generator = ReportGenerator() report = report_generator.generate_report( findings=results, @@ -328,7 +339,11 @@ def main(): extra_recommendations=recommendations ) - logger.info("Report generation completed") + report_duration = time.time() - report_start + + logger.info( + f"Report generation completed in {report_duration:.2f}s" + ) report_dict = report.to_dict() report_dict['results'] = results @@ -340,10 +355,18 @@ def main(): # Output Results to file/stdout if args.out: logger.info(f"Saving report to {args.out}") + + save_start = time.time() if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) + + save_duration = time.time() - save_start + + logger.info( + f"Report saved in {save_duration:.2f}s" + ) elif args.format == 'html': html_output = generate_standalone_html(report_dict) with open(args.out, 'w', encoding='utf-8') as f: @@ -370,6 +393,8 @@ def main(): if webhook_url: logger.info("Sending Slack notification") + slack_start = time.time() + overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) security = report_dict.get('security', {}) @@ -420,6 +445,11 @@ def main(): lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) + slack_duration = time.time() - slack_start + + logger.info( + f"Slack notification sent in {slack_duration:.2f}s" + ) # Determine Exit Code logger.info("Evaluating fail conditions") @@ -427,11 +457,16 @@ def main(): if should_fail(args, report_dict, results): sys.exit(1) - logger.info("InfraScan completed successfully") + total_duration = time.time() - total_start + + logger.info( + f"InfraScan completed successfully in " + f"{total_duration:.2f}s" + ) sys.exit(0) except Exception as e: - logger.error(f"ERROR: {e}") + logger.exception(f"An error occurred during scanning: {e}") if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback @@ -441,3 +476,4 @@ def main(): if __name__ == "__main__": main() + From be246d5fd32c81a7b129d94810828253f790f411 Mon Sep 17 00:00:00 2001 From: Natan Date: Mon, 25 May 2026 16:55:40 +0200 Subject: [PATCH 5/8] Improve logging consistency and execution timing - added execution timing logs for scan, report generation, Slack notifications, and total runtime - replaced time.time() with time.perf_counter() for accurate duration measurements - unified error handling with logger.error() instead of raw stderr prints - fixed potential NoneType crash in Slack notification grading logic - improved logging consistency across CLI execution flow --- cli.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/cli.py b/cli.py index f1e4c59..5b17063 100755 --- a/cli.py +++ b/cli.py @@ -248,13 +248,18 @@ def should_fail(args, report_dict, results): return False if args.fail_on == 'any' and len(results) > 0: - print("\n[ERROR] Build failed: Findings detected and --fail-on=any specified.", file=sys.stderr) + logger.error( + "Build failed: Findings detected and --fail-on=any specified." + ) return True if args.fail_on == 'high_critical': critical_high_count = sum(1 for r in results if r.get('severity', '').lower() in ['critical', 'high']) if critical_high_count > 0: - print(f"\n[ERROR] Build failed: {critical_high_count} high/critical findings detected and --fail-on=high_critical specified.", file=sys.stderr) + logger.error( + f"Build failed: {critical_high_count} high/critical findings " + f"detected and --fail-on=high_critical specified." + ) return True if args.fail_on.startswith('grade_'): @@ -267,7 +272,11 @@ def should_fail(args, report_dict, results): current_idx = grade_order.index(overall_letter) if current_idx >= fail_idx: - print(f"\n[ERROR] Build failed: Overall grade is {overall_letter} and --fail-on={args.fail_on} specified (threshold: {fail_grade} or worse).", file=sys.stderr) + logger.error( + f"Build failed: Overall grade is {overall_letter} " + f"and --fail-on={args.fail_on} specified " + f"(threshold: {fail_grade} or worse)." + ) return True except ValueError: pass # Should not happen due to argparse choices @@ -283,13 +292,17 @@ def should_fail(args, report_dict, results): ] if findings_at_or_above: - print(f"\n[ERROR] Build failed: {len(findings_at_or_above)} findings with priority {fail_priority} or higher detected and --fail-on={args.fail_on} specified.", file=sys.stderr) + logger.error( + f"Build failed: {len(findings_at_or_above)} findings " + f"with priority {fail_priority} or higher detected " + f"and --fail-on={args.fail_on} specified." + ) return True return False def main(): - total_start = time.time() + total_start = time.perf_counter() load_dotenv() args = setup_args() @@ -309,7 +322,7 @@ def main(): # Run Scanners logger.info("Starting directory scan") - scan_start = time.time() + scan_start = time.perf_counter() results, resource_count, recommendations = scan_directory( target_path, @@ -319,7 +332,7 @@ def main(): included_paths=args.include ) - scan_duration = time.time() - scan_start + scan_duration = time.perf_counter() - scan_start logger.info( f"Directory scan completed in {scan_duration:.2f}s. " @@ -329,7 +342,7 @@ def main(): # Generate Report logger.info("Generating report") - report_start = time.time() + report_start = time.perf_counter() report_generator = ReportGenerator() report = report_generator.generate_report( @@ -339,7 +352,7 @@ def main(): extra_recommendations=recommendations ) - report_duration = time.time() - report_start + report_duration = time.perf_counter() - report_start logger.info( f"Report generation completed in {report_duration:.2f}s" @@ -356,13 +369,13 @@ def main(): if args.out: logger.info(f"Saving report to {args.out}") - save_start = time.time() + save_start = time.perf_counter() if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) - save_duration = time.time() - save_start + save_duration = time.perf_counter() - save_start logger.info( f"Report saved in {save_duration:.2f}s" @@ -393,7 +406,7 @@ def main(): if webhook_url: logger.info("Sending Slack notification") - slack_start = time.time() + slack_start = time.perf_counter() overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) @@ -401,8 +414,8 @@ def main(): container = report_dict.get('container', {}) total_findings = len(results) - overall_grade = overall.get('letter', '?') - overall_pct = overall.get('percentage', 0) + overall_grade = overall.get('letter', '?') if overall else '?' + overall_pct = overall.get('percentage', 0) if overall else 0 grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] @@ -445,7 +458,7 @@ def main(): lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) - slack_duration = time.time() - slack_start + slack_duration = time.perf_counter() - slack_start logger.info( f"Slack notification sent in {slack_duration:.2f}s" @@ -457,7 +470,7 @@ def main(): if should_fail(args, report_dict, results): sys.exit(1) - total_duration = time.time() - total_start + total_duration = time.perf_counter() - total_start logger.info( f"InfraScan completed successfully in " @@ -476,4 +489,3 @@ def main(): if __name__ == "__main__": main() - From e226ae4bdc4b28725d85fe03e1e3cd9ea462b434 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 16 Jun 2026 17:00:46 +0200 Subject: [PATCH 6/8] Replace print statements with structured logging --- scanner/docker_scout_scanner.py | 76 ++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/scanner/docker_scout_scanner.py b/scanner/docker_scout_scanner.py index 1a5f4e1..7d892de 100644 --- a/scanner/docker_scout_scanner.py +++ b/scanner/docker_scout_scanner.py @@ -15,6 +15,9 @@ import os import re import subprocess +from logging import getLogger + +logger = getLogger(__name__) from typing import List, Dict, Any, Tuple, Optional @@ -232,14 +235,14 @@ def check_image_exists(image: str) -> bool: def cleanup_image(image: str) -> None: """Remove a Docker image from local cache.""" try: - print(f" Removing image: {image}") + logger.info(f" Removing image: {image}") result = run_command(["docker", "rmi", "-f", image], timeout=30) if result.returncode == 0: - print(f" ✓ Removed {image}") + logger.info(f" ✓ Removed {image}") else: - print(f" Warning: Could not remove {image}: {result.stderr[:100]}") + logger.warning(f" Could not remove {image}: {result.stderr[:100]}") except Exception as e: - print(f" Warning: Failed to remove {image}: {e}") + logger.warning(f"Failed to remove {image}: {e}") # ============================================================================ @@ -281,11 +284,18 @@ def run_docker_scout_scan(directory_path: str, files: List[str] = None) -> Tuple compose_files = find_compose_files(directory_path) # Find Kubernetes files k8s_files = find_kubernetes_files(directory_path) - if not compose_files and not k8s_files: return findings, extra_recommendations, False - print(f"Found {len(compose_files)} Docker Compose file(s) and {len(k8s_files)} Kubernetes file(s) to scan") + if compose_files: + logger.info("[INFO] Found Docker Compose files:") + for file in compose_files: + logger.info(f" - {os.path.relpath(file, directory_path)}") + + if k8s_files: + logger.info("[INFO] Found Kubernetes files:") + for file in k8s_files: + logger.info(f" - {os.path.relpath(file, directory_path)}") # Collect ALL images from ALL files first all_images_map = {} # image -> source_file @@ -310,8 +320,14 @@ def run_docker_scout_scan(directory_path: str, files: List[str] = None) -> Tuple # Check if image exists locally before scanning image_existed_before = check_image_exists(image) - print(f"Scanning image: {image}") - + relative_file = os.path.relpath(compose_file, directory_path) + + logger.info( + f"[INFO] Scanning image '{image}' " + f"from file: {os.path.relpath(compose_file, directory_path)}" + ) + logger.info(f" Source file: {relative_file}") + try: image_findings, image_auth_failed = scan_image(image, compose_file, directory_path) findings.extend(image_findings) @@ -320,31 +336,31 @@ def run_docker_scout_scan(directory_path: str, files: List[str] = None) -> Tuple auth_failed = True if image_findings: - print(f" Found {len(image_findings)} vulnerabilities in {image}") + logger.info(f" Found {len(image_findings)} vulnerabilities in {image}") elif not image_auth_failed: - print(f" No vulnerabilities found or image unavailable: {image}") + logger.info(f" No vulnerabilities found or image unavailable: {image}") recommendation = get_image_recommendation(image) if recommendation and recommendation not in extra_recommendations: extra_recommendations.append(recommendation) - print(f" Added recommendation for Bitnami image: {image}") + logger.info(f" Added recommendation for Bitnami image: {image}") # Track for cleanup if image was pulled during scan and cleanup is enabled if cleanup_enabled and not image_existed_before and check_image_exists(image): images_to_cleanup.add(image) except Exception as e: - print(f"Warning: Failed to scan image {image}: {e}") + logger.warning(f"Failed to scan image {image}: {e}") continue # Cleanup images that were pulled during scan if cleanup_enabled and images_to_cleanup: - print(f"\nCleaning up {len(images_to_cleanup)} image(s) pulled during scan...") + logger.info(f"\nCleaning up {len(images_to_cleanup)} image(s) pulled during scan...") for image in images_to_cleanup: try: cleanup_image(image) except Exception as e: - print(f"Warning: Failed to cleanup image {image}: {e}") + logger.warning(f"Failed to cleanup image {image}: {e}") return findings, extra_recommendations, auth_failed @@ -391,27 +407,27 @@ def scan_image(image: str, compose_file: str, base_path: str) -> Tuple[List[Dict # 1. Detect Docker Hub login requirement specifically if result.returncode != 0 and ("Log in with your Docker ID" in result.stderr or "authentication required" in result.stderr.lower()): - print(f"\n[!] Docker Scout Error: Authentication required to access vulnerability database.") - print(f" To fix this, either:") - print(f" a) Set DOCKER_HUB_USERNAME and DOCKER_HUB_PASSWORD environment variables") - print(f" b) Use CONTAINER_SCANNER=grype to use the alternative scanner that doesn't require login") - print(f" Skipping Docker Scout scan for: {image}") + logger.error(f"\n[!] Docker Scout Error: Authentication required to access vulnerability database.") + logger.error(f" To fix this, either:") + logger.error(f" a) Set DOCKER_HUB_USERNAME and DOCKER_HUB_PASSWORD environment variables") + logger.error(f" b) Use CONTAINER_SCANNER=grype to use the alternative scanner that doesn't require login") + logger.error(f" Skipping Docker Scout scan for: {image}") return findings, True # 2. Check for other errors in output (missing image, pull failures, etc.) if result.stdout.strip().startswith('ERROR') or 'MANIFEST_UNKNOWN' in result.stdout: error_msg = result.stdout.split('\n')[0] if '\n' in result.stdout else result.stdout[:200] - print(f"Docker Scout error for image {image}: {error_msg}") + logger.info(f"Docker Scout error for image {image}: {error_msg}") return findings, False # 3. Handle non-zero exit code (with --exit-code, it means findings or real error) if result.returncode != 0 and not result.stdout.strip(): # If stdout is empty and return code is non-zero, it's likely a real failure - print(f"Docker Scout failed for image {image} (exit code {result.returncode})") + logger.warning(f"Docker Scout failed for image {image} (exit code {result.returncode})") if result.stderr: # Truncate stderr for cleaner output but keep the important part clean_stderr = result.stderr.strip().split('\n')[0] - print(f" Error: {clean_stderr}") + logger.error(f" Error: {clean_stderr}") return findings, False # 4. Parse successful output @@ -422,19 +438,19 @@ def scan_image(image: str, compose_file: str, base_path: str) -> Tuple[List[Dict except json.JSONDecodeError as e: # Fallback check for text output if "Analyzing image" in result.stdout or "Target" in result.stdout: - print(f" Docker Scout returned text instead of JSON for {image}. Trying fallback parser...") + logger.info(f" Docker Scout returned text instead of JSON for {image}. Trying fallback parser...") findings = parse_text_output(result.stdout, image, compose_file, base_path) else: - print(f" Failed to parse Docker Scout output for {image}: {e}") + logger.warning(f" Failed to parse Docker Scout output for {image}: {e}") if result.stderr and ("error" in result.stderr.lower() and "Available version" not in result.stderr): # Log real errors from stderr that aren't just update notifications - print(f" Docker Scout stderr: {result.stderr.strip()}") + logger.info(f" Docker Scout stderr: {result.stderr.strip()}") except subprocess.TimeoutExpired: - print(f"Timeout scanning image: {image}") + logger.info(f"Timeout scanning image: {image}") except Exception as e: - print(f"Error scanning image {image}: {e}") + logger.error(f"Error scanning image {image}: {e}") return findings, False @@ -492,7 +508,7 @@ def parse_sarif_format(sarif_data: Dict[str, Any], image: str, compose_file: str )) except Exception as e: - print(f"Error parsing SARIF format: {e}") + logger.error(f"Error parsing SARIF format: {e}") import traceback traceback.print_exc() @@ -574,7 +590,7 @@ def parse_docker_scout_output(scout_data: Dict[str, Any], image: str, compose_fi findings.append(finding) except Exception as e: - print(f"Error parsing Docker Scout output: {e}") + logger.error(f"Error parsing Docker Scout output: {e}") import traceback traceback.print_exc() @@ -656,7 +672,7 @@ def parse_text_output(text_output: str, image: str, compose_file: str, base_path findings = [] # This is a minimal fallback - just create a summary finding - print("Warning: Using fallback text parser. Install latest Docker Scout for JSON output.") + logger.warning("Using fallback text parser. Install latest Docker Scout for JSON output.") # Make file path relative file_path = os.path.relpath(compose_file, base_path) if compose_file and base_path else compose_file From 9cabfaf9ed596d8f3ae634f3ab6b70f9cb6c3cdb Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 16 Jun 2026 17:03:10 +0200 Subject: [PATCH 7/8] Add logger usage and remove print statements Updated logging to replace print statements for better traceability and maintainability. --- scanner/grype_scanner.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/scanner/grype_scanner.py b/scanner/grype_scanner.py index 40c46c9..774cfa2 100644 --- a/scanner/grype_scanner.py +++ b/scanner/grype_scanner.py @@ -8,6 +8,9 @@ import json import os import subprocess +from logging import getLogger + +logger = getLogger(__name__) from typing import List, Dict, Any from scanner.image_utils import ( @@ -83,12 +86,12 @@ def run_grype_scan(directory_path: str, files: List[str] = None) -> List[Dict[st # Extract images from compose files and scan them for image, compose_file in all_images_map.items(): - print(f"Scanning image with Grype: {image}") + logger.info(f"Scanning image with Grype: {image}") try: image_findings = scan_image(image, compose_file, directory_path) findings.extend(image_findings) except Exception as e: - print(f"Warning: Failed to scan image {image}: {e}") + logger.warning(f" Failed to scan image {image}: {e}") continue return findings @@ -128,15 +131,15 @@ def scan_image(image: str, compose_file: str, base_path: str) -> List[Dict[str, grype_data = json.loads(result.stdout) findings = parse_grype_output(grype_data, image, compose_file, base_path) except json.JSONDecodeError as e: - print(f"Failed to parse Grype JSON output: {e}") + logger.warning(f"Failed to parse Grype JSON output: {e}") if result.stderr and "error" in result.stderr.lower(): - print(f"Grype stderr: {result.stderr}") + logger.info(f"Grype stderr: {result.stderr}") except subprocess.TimeoutExpired: - print(f"Timeout scanning image: {image}") + logger.info(f"Timeout scanning image: {image}") except Exception as e: - print(f"Error scanning image {image}: {e}") + logger.error(f"Error scanning image {image}: {e}") return findings @@ -203,7 +206,7 @@ def parse_grype_output(grype_data: Dict[str, Any], image: str, compose_file: str findings.append(finding) except Exception as e: - print(f"Error parsing Grype output: {e}") + logger.error(f"Error parsing Grype output: {e}") import traceback traceback.print_exc() @@ -276,8 +279,8 @@ def normalize_grype_finding(vuln: Dict[str, Any], artifact: Dict[str, Any], imag 'rule_id': vuln_id, 'rule_name': f"Vulnerability in {package_name}", 'severity': normalized_severity, - 'description': f"{description[:200]}..." if len(description) > 200 else description, - 'full_description': description, # Store full description for tooltips + 'description': description, + 'full_description': description, 'remediation': f"Update {package_name} from {package_version} to {fix_version}" if fix_available == 'Yes' else f"Review {package_name}@{package_version} - no fix available", 'estimated_savings': f"Security risk mitigation ({severity})", 'line': 0, From dbc67d110da84ebf09a7db5b9d2a0276a8b20817 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 16 Jun 2026 17:05:46 +0200 Subject: [PATCH 8/8] Replace print statements with logger in parser --- scanner/parser.py | 89 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 18 deletions(-) diff --git a/scanner/parser.py b/scanner/parser.py index dbd23b9..89aa72f 100644 --- a/scanner/parser.py +++ b/scanner/parser.py @@ -1,5 +1,8 @@ import os import re +from logging import getLogger + +logger = getLogger(__name__) from rules.definitions import check_rules from scanner.checkov_scanner import is_checkov_available, run_checkov_scan from scanner.docker_scout_scanner import is_docker_scout_available, run_docker_scout_scan @@ -27,10 +30,12 @@ def detect_framework(path: str = None, files: list = None) -> str: - 'kubernetes' - 'cloudformation' - 'helm' + - 'all' (fallback for Docker/secrets/actions/etc.) """ tf_files = 0 k8s_files = 0 cfn_files = 0 + helm_files = 0 scan_files = [] if files: @@ -44,6 +49,8 @@ def detect_framework(path: str = None, files: list = None) -> str: file = os.path.basename(full_path) if file.endswith('.tf'): tf_files += 1 + elif file == 'Chart.yaml' or file == 'Chart.yml': + helm_files += 1 elif file.endswith(('.yml', '.yaml')): # Check file content for better detection try: @@ -56,12 +63,16 @@ def detect_framework(path: str = None, files: list = None) -> str: except Exception: continue - if k8s_files > tf_files and k8s_files > cfn_files: + if k8s_files > tf_files and k8s_files > cfn_files and k8s_files > helm_files: return 'kubernetes' - if cfn_files > tf_files: + if cfn_files > tf_files and cfn_files > helm_files: return 'cloudformation' + if helm_files > tf_files: + return 'helm' + if tf_files > 0: + return 'terraform' - return 'terraform' + return 'all' def count_resources(path=None, framework='terraform', files=None): """ @@ -85,7 +96,7 @@ def count_resources(path=None, framework='terraform', files=None): for file in f_list: scan_files.append(os.path.join(root, file)) - if framework == 'terraform': + if framework in ('terraform', 'all'): for full_path in scan_files: if full_path.endswith('.tf'): try: @@ -97,7 +108,8 @@ def count_resources(path=None, framework='terraform', files=None): resource_count += len(matches) except Exception: continue - elif framework == 'kubernetes': + + if framework in ('kubernetes', 'all'): from scanner.image_utils import find_kubernetes_files if files: k8s_files = [f for f in files if f.endswith(('.yml', '.yaml'))] @@ -115,6 +127,26 @@ def count_resources(path=None, framework='terraform', files=None): resource_count += 1 except Exception: continue + + if framework in ('containers', 'all'): + from scanner.image_utils import find_compose_files + if files: + from scanner.image_utils import filter_container_files + compose_files, _ = filter_container_files(files) + else: + compose_files = find_compose_files(path) + + for compose_file in compose_files: + try: + import yaml + with open(compose_file, 'r', encoding='utf-8') as f: + compose_data = yaml.safe_load(f) + if compose_data and isinstance(compose_data, dict) and 'services' in compose_data: + services = compose_data['services'] + if isinstance(services, dict): + resource_count += len(services) + except Exception: + continue return resource_count @@ -143,7 +175,7 @@ def resolve_included_paths(base_path, included_paths): elif os.path.isdir(full_path): for root, dirs, files in os.walk(full_path): for file in files: - if file.endswith(valid_extensions) or file.startswith('docker-compose'): + if file.endswith(valid_extensions) or file.startswith('docker-compose') or file.startswith('compose'): resolved_files.append(os.path.join(root, file)) return list(set(resolved_files)) @@ -194,16 +226,21 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e if included_paths: resolved_files = resolve_included_paths(path, included_paths) if not resolved_files: - print(f"Warning: No valid files found in included paths: {included_paths}") + logger.warning(f"No valid files found in included paths: {included_paths}") return [], 0, [] # Auto-detect framework if needed if framework == 'auto' or not framework: framework = detect_framework(path, files=resolved_files) - print(f"Detected framework: {framework}") + logger.info(f"Detected framework: {framework}") # Count resources for reporting resource_count = count_resources(path, framework, files=resolved_files) + # Log discovered files + if resolved_files: + logger.info("Files passed to Checkov:") + for file in resolved_files: + logger.info(f" - {os.path.relpath(file, path)}") # Run cost-focused regex scanner if 'regex' in active_scanners: @@ -220,6 +257,7 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e # Scan all files and collect results for file_path in all_files: + logger.info(f"[INFO] Scanning Terraform file: {os.path.relpath(file_path, path)}") file_results = scan_file(file_path) if file_results: results.extend(file_results) @@ -232,6 +270,10 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e if 'checkov' in active_scanners: if is_checkov_available(): try: + if resolved_files: + logger.info("[INFO] Files passed to Checkov:") + for file in resolved_files: + logger.info(f" - {os.path.relpath(file, path)}") checkov_results = run_checkov_scan( path, framework, @@ -243,9 +285,9 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e result['scanner'] = 'checkov' results.extend(checkov_results) except Exception as e: - print(f"Warning: Checkov scan failed: {e}") + logger.warning(f"Checkov scan failed: {e}") else: - print("Warning: Checkov is not installed. Install with: pip install checkov") + logger.warning("Checkov is not installed. Install with: pip install checkov") # Run container security scanner (Docker Scout or Grype based on config) extra_recommendations = [] # Track extra recommendations from container scanner @@ -262,16 +304,16 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e result['scanner'] = 'grype' results.extend(grype_results) except Exception as e: - print(f"Warning: Grype scan failed: {e}") + logger.warning(f"Grype scan failed: {e}") else: - print("Warning: Grype is not installed. See https://github.com/anchore/grype for installation") + logger.warning("Grype is not installed. See https://github.com/anchore/grype for installation") else: # docker-scout (default) if is_docker_scout_available(): try: scout_results, scout_recommendations, auth_failed = run_docker_scout_scan(path, files=resolved_files) if auth_failed and is_grype_available() and not scout_results: - print("\n[i] Falling back to Grype scanner (no Docker Hub login detected)...") + logger.info("\n[i] Falling back to Grype scanner (no Docker Hub login detected)...") try: from scanner.grype_scanner import run_grype_scan grype_results = run_grype_scan(path, files=resolved_files) @@ -279,9 +321,9 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e for result in grype_results: result['scanner'] = 'grype' results.extend(grype_results) - print(f" Grype scan completed with {len(grype_results)} findings.") + logger.info(f" Grype scan completed with {len(grype_results)} findings.") except Exception as grype_e: - print(f" Grype fallback failed: {grype_e}") + logger.info(f" Grype fallback failed: {grype_e}") else: # Add scanner tag for result in scout_results: @@ -289,9 +331,20 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e results.extend(scout_results) extra_recommendations.extend(scout_recommendations) except Exception as e: - print(f"Warning: Docker Scout scan failed: {e}") + logger.warning(f"Docker Scout scan failed: {e}") + elif is_grype_available(): + logger.info("\n[i] Docker Scout is not installed, falling back to Grype scanner...") + try: + from scanner.grype_scanner import run_grype_scan + grype_results = run_grype_scan(path, files=resolved_files) + for result in grype_results: + result['scanner'] = 'grype' + results.extend(grype_results) + logger.info(f" Grype scan completed with {len(grype_results)} findings.") + except Exception as grype_e: + logger.info(f" Grype fallback failed: {grype_e}") else: - print("Warning: Docker Scout is not installed. See https://docs.docker.com/scout/ for installation") + logger.warning("Docker Scout is not installed. See https://docs.docker.com/scout/ for installation") # Add scanner tag to regex results and normalize paths for result in results: @@ -327,7 +380,7 @@ def scan_file(filepath): # In a more advanced version, we would parse HCL here findings = check_rules(filepath, content) except Exception as e: - print(f"Warning: Could not read file {filepath}: {e}") + logger.warning(f"Could not read file {filepath}: {e}") return findings