diff --git a/cli.py b/cli.py index d11576b..5b17063 100755 --- a/cli.py +++ b/cli.py @@ -5,6 +5,7 @@ import json import logging import requests +import time try: from colorama import Fore, Style, init except ImportError: @@ -21,8 +22,16 @@ def init(*args, **kwargs): pass __version__ = "1.0.6" -# Setup basic logging -logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(message)s') +# Setup logging with timestamps +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stdout, + force=True +) + +logger = logging.getLogger(__name__) def send_slack_notification(message: str) -> None: """Send a Slack notification via webhook URL from environment variable.""" @@ -32,9 +41,9 @@ def send_slack_notification(message: str) -> None: try: response = requests.post(webhook_url, json={'text': message}, timeout=5) if response.status_code >= 400: - print(f"Slack notification failed: {response.status_code} - {response.text}", file=sys.stderr) + logger.error(f"Slack notification failed: {response.status_code} - {response.text}") except Exception as e: - print(f"Slack notification error: {e}", file=sys.stderr) + logger.error(f"Slack notification error: {e}") def build_gh_actions_context() -> dict: """Extract GitHub Actions context from environment variables.""" @@ -57,39 +66,39 @@ def setup_args(): parser = argparse.ArgumentParser( description="InfraScan CLI - Open Source IaC Cost & Security Scanner" ) - + parser.add_argument( "path", nargs="?", default="/scan", help="Path to the directory to scan (default: /scan when using Docker, or '.' for local use)" ) - + parser.add_argument( "--scanner", default="comprehensive", help="Scanner type(s) to run (default: comprehensive). Support multiple scanners separated by comma (e.g., 'regex,containers'). Options: regex, checkov, containers, comprehensive" ) - + parser.add_argument( "--format", choices=["text", "json", "html"], default="text", help="Output format (default: text)" ) - + parser.add_argument( "--out", help="File path to save JSON output explicitly (e.g., infrascan-report.json)" ) - + parser.add_argument( "--fail-on", choices=["any", "high_critical", "grade_a", "grade_b", "grade_c", "grade_d", "grade_f", "priority_critical", "priority_high", "priority_medium", "priority_low", "priority_info"], help="Exit with error code 1 if findings match criteria (any findings, high/critical findings, grade threshold, or priority threshold)" ) - + parser.add_argument( "--download-external-modules", action="store_true", @@ -99,7 +108,7 @@ def setup_args(): parser.add_argument( "--framework", default="auto", - choices=["auto", "terraform", "kubernetes", "cloudformation", "helm"], + choices=["auto", "terraform", "kubernetes", "cloudformation", "helm", "all"], help="IaC framework type (default: auto-detect)" ) @@ -109,39 +118,39 @@ def setup_args(): dest="include", help="Select specific files or directories to scan. Can be used multiple times." ) - + parser.add_argument( "--version", action="version", version=f"InfraScan v{__version__}", help="Show version information and exit" ) - + return parser.parse_args() def print_text_report(report_dict, resource_count, scanner_type): # Initialize colorama init(autoreset=True) - + overall = report_dict.get('overall', {}) findings_dict = report_dict.get('findings', {}) results = findings_dict.get('all', report_dict.get('results', [])) - + # Header print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") print(f"{Fore.CYAN}{Style.BRIGHT} InfraScan Report - {scanner_type.upper()} SCAN") print(f"{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") - + # Summary Info target_path = os.path.abspath(sys.argv[1] if len(sys.argv) > 1 and not sys.argv[1].startswith('--') else '.') print(f"{Style.BRIGHT}Path Scanned :{Style.RESET_ALL} {target_path}") print(f"{Style.BRIGHT}Resources Found :{Style.RESET_ALL} {resource_count}") print(f"{Style.BRIGHT}Total Findings :{Style.RESET_ALL} {len(results)}") - + # Grades Section print(f"\n{Style.BRIGHT}GRADING SUMMARY:") print(f"{'-' * 30}") - + def get_grade_color(letter): if letter == 'A': return Fore.GREEN if letter == 'B': return Fore.GREEN @@ -152,11 +161,11 @@ def get_grade_color(letter): def print_grade_line(name, grade): if not grade or (grade.get('max_score', 0) == 0 and grade.get('letter') != 'A'): return - + letter = grade.get('letter', '?') percentage = grade.get('percentage', 0) color = get_grade_color(letter) - + breakdown = grade.get('severity_breakdown', {}) counts = [ f"{Fore.RED}Crit:{breakdown.get('critical', 0)}{Style.RESET_ALL}", @@ -168,13 +177,13 @@ def print_grade_line(name, grade): print(f"{name:18}: {color}{Style.BRIGHT}{letter}{Style.RESET_ALL} ({percentage}%){br_str}") print_grade_line("Overall Health", overall) - + if scanner_type in ['regex', 'comprehensive']: print_grade_line("Cost Efficiency", report_dict.get('cost')) - + if scanner_type in ['checkov', 'comprehensive']: print_grade_line("IaC Security", report_dict.get('security')) - + if scanner_type in ['containers', 'comprehensive']: print_grade_line("Container Security", report_dict.get('container')) @@ -184,12 +193,12 @@ def print_grade_line(name, grade): print(f"\n{Fore.GREEN}{Style.BRIGHT}RECOMMENDATIONS:") for rec in recs: print(f" {Fore.GREEN}• {Style.BRIGHT}{rec}") - + # Findings Details if results: print(f"\n{Style.BRIGHT}FINDINGS DETAILS:") print(f"{'=' * 60}") - + # Categorize findings categories = [] if findings_dict.get('cost'): @@ -198,111 +207,143 @@ def print_grade_line(name, grade): categories.append(('IaC Security', findings_dict['security'])) if findings_dict.get('container'): categories.append(('Container Security', findings_dict['container'])) - + if not categories: categories = [('General Findings', results)] for cat_name, cat_findings in categories: if not cat_findings: continue - + print(f"\n{Style.BRIGHT}>>> {cat_name} ({len(cat_findings)})") - + # Limit display to 40 findings to avoid overwhelming CI logs display_limit = 40 for i, res in enumerate(cat_findings): if i >= display_limit: print(f"\n {Fore.YELLOW}... and {len(cat_findings) - display_limit} more findings (see full report for details)") break - + severity = res.get('severity', 'UNKNOWN').upper() sev_color = Fore.WHITE if severity == 'CRITICAL': sev_color = Fore.RED + Style.BRIGHT elif severity == 'HIGH': sev_color = Fore.RED elif severity == 'MEDIUM': sev_color = Fore.YELLOW elif severity == 'LOW': sev_color = Fore.CYAN - + rule_id = res.get('rule_id', 'N/A') file_path = res.get('file', 'Unknown') line_str = f":{res.get('line')}" if res.get('line') else "" - + print(f" {sev_color}[{severity}]{Style.RESET_ALL} {Style.BRIGHT}{rule_id}{Style.RESET_ALL}: {res.get('description', '')}") print(f" {Fore.WHITE}at {file_path}{line_str}{Style.RESET_ALL}") if res.get('resource'): print(f" {Fore.WHITE}resource: {res.get('resource')}{Style.RESET_ALL}") - + print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}\n") def should_fail(args, report_dict, results): if not args.fail_on: return False - + if args.fail_on == 'any' and len(results) > 0: - print("\n[ERROR] Build failed: Findings detected and --fail-on=any specified.", file=sys.stderr) + logger.error( + "Build failed: Findings detected and --fail-on=any specified." + ) return True - + if args.fail_on == 'high_critical': critical_high_count = sum(1 for r in results if r.get('severity', '').lower() in ['critical', 'high']) if critical_high_count > 0: - print(f"\n[ERROR] Build failed: {critical_high_count} high/critical findings detected and --fail-on=high_critical specified.", file=sys.stderr) + logger.error( + f"Build failed: {critical_high_count} high/critical findings " + f"detected and --fail-on=high_critical specified." + ) return True - + if args.fail_on.startswith('grade_'): grade_order = ['A', 'B', 'C', 'D', 'F'] fail_grade = args.fail_on.split('_')[1].upper() overall_letter = report_dict.get('overall', {}).get('letter', 'A') - + try: fail_idx = grade_order.index(fail_grade) current_idx = grade_order.index(overall_letter) - + if current_idx >= fail_idx: - print(f"\n[ERROR] Build failed: Overall grade is {overall_letter} and --fail-on={args.fail_on} specified (threshold: {fail_grade} or worse).", file=sys.stderr) + logger.error( + f"Build failed: Overall grade is {overall_letter} " + f"and --fail-on={args.fail_on} specified " + f"(threshold: {fail_grade} or worse)." + ) return True except ValueError: pass # Should not happen due to argparse choices - + if args.fail_on.startswith('priority_'): severity_weights = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1, 'info': 0.5} fail_priority = args.fail_on.split('_')[1] threshold_weight = severity_weights.get(fail_priority, 0) - + findings_at_or_above = [ - r for r in results + r for r in results if severity_weights.get(r.get('severity', 'info').lower(), 0.5) >= threshold_weight ] - + if findings_at_or_above: - print(f"\n[ERROR] Build failed: {len(findings_at_or_above)} findings with priority {fail_priority} or higher detected and --fail-on={args.fail_on} specified.", file=sys.stderr) + logger.error( + f"Build failed: {len(findings_at_or_above)} findings " + f"with priority {fail_priority} or higher detected " + f"and --fail-on={args.fail_on} specified." + ) return True - + return False def main(): + total_start = time.perf_counter() + load_dotenv() args = setup_args() - + target_path = os.path.abspath(args.path) - + if not os.path.exists(target_path): - print(f"Error: Path '{target_path}' does not exist.", file=sys.stderr) + logger.error(f"Path '{target_path}' does not exist.") sys.exit(1) - + try: if args.format == 'text': - print(f"Analyzing {target_path} with '{args.scanner}' scanner...") - + logger.info( + f"Analyzing {target_path} with '{args.scanner}' scanner..." + ) + # Run Scanners + logger.info("Starting directory scan") + + scan_start = time.perf_counter() + results, resource_count, recommendations = scan_directory( - target_path, + target_path, scanner_type=args.scanner, framework=args.framework, download_external_modules=args.download_external_modules, included_paths=args.include ) + scan_duration = time.perf_counter() - scan_start + + logger.info( + f"Directory scan completed in {scan_duration:.2f}s. " + f"Found {len(results)} findings in {resource_count} resources." + ) + # Generate Report + logger.info("Generating report") + + report_start = time.perf_counter() + report_generator = ReportGenerator() report = report_generator.generate_report( findings=results, @@ -310,43 +351,63 @@ def main(): scanner_type=args.scanner, extra_recommendations=recommendations ) - + + report_duration = time.perf_counter() - report_start + + logger.info( + f"Report generation completed in {report_duration:.2f}s" + ) + report_dict = report.to_dict() report_dict['results'] = results report_dict['summary'] = { 'total': len(results), 'scanner_used': args.scanner } - + # Output Results to file/stdout if args.out: + logger.info(f"Saving report to {args.out}") + + save_start = time.perf_counter() + if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) + + save_duration = time.perf_counter() - save_start + + logger.info( + f"Report saved in {save_duration:.2f}s" + ) elif args.format == 'html': html_output = generate_standalone_html(report_dict) with open(args.out, 'w', encoding='utf-8') as f: f.write(html_output) - else: # text format - # Default behavior for text mode with --out is to save JSON results + else: with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) - # Handle console output + # Console output if args.format == 'json' and not args.out: print(json.dumps(report_dict, indent=2)) elif args.format == 'html' and not args.out: print(generate_standalone_html(report_dict)) else: - # If format is text OR if output is saved to record/html/json - # always show the text summary in the console print_text_report(report_dict, resource_count, args.scanner) if args.out: - print(f"{Fore.GREEN}[v] Full {args.format.upper()} report saved to: {Fore.WHITE}{args.out}") - - # Send Slack notification if configured + print( + f"{Fore.GREEN}[v] Full {args.format.upper()} " + f"report saved to: {Fore.WHITE}{args.out}" + ) + + # Slack notification webhook_url = os.getenv('SLACK_WEBHOOK_URL', '').strip() if webhook_url: + logger.info("Sending Slack notification") + + slack_start = time.perf_counter() + overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) security = report_dict.get('security', {}) @@ -356,17 +417,31 @@ def main(): overall_grade = overall.get('letter', '?') if overall else '?' overall_pct = overall.get('percentage', 0) if overall else 0 - grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] + grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] + if cost and cost.get('max_score', 0) > 0: - grades_parts.append(f"Cost {cost.get('letter','?')} ({cost.get('percentage',0)}%)") + grades_parts.append( + f"Cost {cost.get('letter', '?')} " + f"({cost.get('percentage', 0)}%)" + ) + if security and security.get('max_score', 0) > 0: - grades_parts.append(f"Security {security.get('letter','?')} ({security.get('percentage',0)}%)") + grades_parts.append( + f"Security {security.get('letter', '?')} " + f"({security.get('percentage', 0)}%)" + ) + if container and container.get('max_score', 0) > 0: - grades_parts.append(f"Containers {container.get('letter','?')} ({container.get('percentage',0)}%)") + grades_parts.append( + f"Containers {container.get('letter', '?')} " + f"({container.get('percentage', 0)}%)" + ) + grades_summary = " | ".join(grades_parts) ctx = build_gh_actions_context() lines = ["🤖 InfraScan used in *GitHub Actions*"] + if ctx['repo']: lines.append(f"Repo: *{ctx['repo']}*") if ctx['branch']: @@ -375,24 +450,41 @@ def main(): lines.append(f"Workflow: _{ctx['workflow']}_") if ctx['actor']: lines.append(f"Triggered by: {ctx['actor']}") + lines.append(f"Grades: {grades_summary}") lines.append(f"Findings: {total_findings} | Scanner: {args.scanner}") + if ctx['run_url']: lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) + slack_duration = time.perf_counter() - slack_start + + logger.info( + f"Slack notification sent in {slack_duration:.2f}s" + ) # Determine Exit Code + logger.info("Evaluating fail conditions") + if should_fail(args, report_dict, results): sys.exit(1) - + + total_duration = time.perf_counter() - total_start + + logger.info( + f"InfraScan completed successfully in " + f"{total_duration:.2f}s" + ) sys.exit(0) - + except Exception as e: - print(f"An error occurred during scanning: {e}", file=sys.stderr) + logger.exception(f"An error occurred during scanning: {e}") + if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback traceback.print_exc() + sys.exit(1) if __name__ == "__main__": diff --git a/scanner/docker_scout_scanner.py b/scanner/docker_scout_scanner.py index 1a5f4e1..7d892de 100644 --- a/scanner/docker_scout_scanner.py +++ b/scanner/docker_scout_scanner.py @@ -15,6 +15,9 @@ import os import re import subprocess +from logging import getLogger + +logger = getLogger(__name__) from typing import List, Dict, Any, Tuple, Optional @@ -232,14 +235,14 @@ def check_image_exists(image: str) -> bool: def cleanup_image(image: str) -> None: """Remove a Docker image from local cache.""" try: - print(f" Removing image: {image}") + logger.info(f" Removing image: {image}") result = run_command(["docker", "rmi", "-f", image], timeout=30) if result.returncode == 0: - print(f" ✓ Removed {image}") + logger.info(f" ✓ Removed {image}") else: - print(f" Warning: Could not remove {image}: {result.stderr[:100]}") + logger.warning(f" Could not remove {image}: {result.stderr[:100]}") except Exception as e: - print(f" Warning: Failed to remove {image}: {e}") + logger.warning(f"Failed to remove {image}: {e}") # ============================================================================ @@ -281,11 +284,18 @@ def run_docker_scout_scan(directory_path: str, files: List[str] = None) -> Tuple compose_files = find_compose_files(directory_path) # Find Kubernetes files k8s_files = find_kubernetes_files(directory_path) - if not compose_files and not k8s_files: return findings, extra_recommendations, False - print(f"Found {len(compose_files)} Docker Compose file(s) and {len(k8s_files)} Kubernetes file(s) to scan") + if compose_files: + logger.info("[INFO] Found Docker Compose files:") + for file in compose_files: + logger.info(f" - {os.path.relpath(file, directory_path)}") + + if k8s_files: + logger.info("[INFO] Found Kubernetes files:") + for file in k8s_files: + logger.info(f" - {os.path.relpath(file, directory_path)}") # Collect ALL images from ALL files first all_images_map = {} # image -> source_file @@ -310,8 +320,14 @@ def run_docker_scout_scan(directory_path: str, files: List[str] = None) -> Tuple # Check if image exists locally before scanning image_existed_before = check_image_exists(image) - print(f"Scanning image: {image}") - + relative_file = os.path.relpath(compose_file, directory_path) + + logger.info( + f"[INFO] Scanning image '{image}' " + f"from file: {os.path.relpath(compose_file, directory_path)}" + ) + logger.info(f" Source file: {relative_file}") + try: image_findings, image_auth_failed = scan_image(image, compose_file, directory_path) findings.extend(image_findings) @@ -320,31 +336,31 @@ def run_docker_scout_scan(directory_path: str, files: List[str] = None) -> Tuple auth_failed = True if image_findings: - print(f" Found {len(image_findings)} vulnerabilities in {image}") + logger.info(f" Found {len(image_findings)} vulnerabilities in {image}") elif not image_auth_failed: - print(f" No vulnerabilities found or image unavailable: {image}") + logger.info(f" No vulnerabilities found or image unavailable: {image}") recommendation = get_image_recommendation(image) if recommendation and recommendation not in extra_recommendations: extra_recommendations.append(recommendation) - print(f" Added recommendation for Bitnami image: {image}") + logger.info(f" Added recommendation for Bitnami image: {image}") # Track for cleanup if image was pulled during scan and cleanup is enabled if cleanup_enabled and not image_existed_before and check_image_exists(image): images_to_cleanup.add(image) except Exception as e: - print(f"Warning: Failed to scan image {image}: {e}") + logger.warning(f"Failed to scan image {image}: {e}") continue # Cleanup images that were pulled during scan if cleanup_enabled and images_to_cleanup: - print(f"\nCleaning up {len(images_to_cleanup)} image(s) pulled during scan...") + logger.info(f"\nCleaning up {len(images_to_cleanup)} image(s) pulled during scan...") for image in images_to_cleanup: try: cleanup_image(image) except Exception as e: - print(f"Warning: Failed to cleanup image {image}: {e}") + logger.warning(f"Failed to cleanup image {image}: {e}") return findings, extra_recommendations, auth_failed @@ -391,27 +407,27 @@ def scan_image(image: str, compose_file: str, base_path: str) -> Tuple[List[Dict # 1. Detect Docker Hub login requirement specifically if result.returncode != 0 and ("Log in with your Docker ID" in result.stderr or "authentication required" in result.stderr.lower()): - print(f"\n[!] Docker Scout Error: Authentication required to access vulnerability database.") - print(f" To fix this, either:") - print(f" a) Set DOCKER_HUB_USERNAME and DOCKER_HUB_PASSWORD environment variables") - print(f" b) Use CONTAINER_SCANNER=grype to use the alternative scanner that doesn't require login") - print(f" Skipping Docker Scout scan for: {image}") + logger.error(f"\n[!] Docker Scout Error: Authentication required to access vulnerability database.") + logger.error(f" To fix this, either:") + logger.error(f" a) Set DOCKER_HUB_USERNAME and DOCKER_HUB_PASSWORD environment variables") + logger.error(f" b) Use CONTAINER_SCANNER=grype to use the alternative scanner that doesn't require login") + logger.error(f" Skipping Docker Scout scan for: {image}") return findings, True # 2. Check for other errors in output (missing image, pull failures, etc.) if result.stdout.strip().startswith('ERROR') or 'MANIFEST_UNKNOWN' in result.stdout: error_msg = result.stdout.split('\n')[0] if '\n' in result.stdout else result.stdout[:200] - print(f"Docker Scout error for image {image}: {error_msg}") + logger.info(f"Docker Scout error for image {image}: {error_msg}") return findings, False # 3. Handle non-zero exit code (with --exit-code, it means findings or real error) if result.returncode != 0 and not result.stdout.strip(): # If stdout is empty and return code is non-zero, it's likely a real failure - print(f"Docker Scout failed for image {image} (exit code {result.returncode})") + logger.warning(f"Docker Scout failed for image {image} (exit code {result.returncode})") if result.stderr: # Truncate stderr for cleaner output but keep the important part clean_stderr = result.stderr.strip().split('\n')[0] - print(f" Error: {clean_stderr}") + logger.error(f" Error: {clean_stderr}") return findings, False # 4. Parse successful output @@ -422,19 +438,19 @@ def scan_image(image: str, compose_file: str, base_path: str) -> Tuple[List[Dict except json.JSONDecodeError as e: # Fallback check for text output if "Analyzing image" in result.stdout or "Target" in result.stdout: - print(f" Docker Scout returned text instead of JSON for {image}. Trying fallback parser...") + logger.info(f" Docker Scout returned text instead of JSON for {image}. Trying fallback parser...") findings = parse_text_output(result.stdout, image, compose_file, base_path) else: - print(f" Failed to parse Docker Scout output for {image}: {e}") + logger.warning(f" Failed to parse Docker Scout output for {image}: {e}") if result.stderr and ("error" in result.stderr.lower() and "Available version" not in result.stderr): # Log real errors from stderr that aren't just update notifications - print(f" Docker Scout stderr: {result.stderr.strip()}") + logger.info(f" Docker Scout stderr: {result.stderr.strip()}") except subprocess.TimeoutExpired: - print(f"Timeout scanning image: {image}") + logger.info(f"Timeout scanning image: {image}") except Exception as e: - print(f"Error scanning image {image}: {e}") + logger.error(f"Error scanning image {image}: {e}") return findings, False @@ -492,7 +508,7 @@ def parse_sarif_format(sarif_data: Dict[str, Any], image: str, compose_file: str )) except Exception as e: - print(f"Error parsing SARIF format: {e}") + logger.error(f"Error parsing SARIF format: {e}") import traceback traceback.print_exc() @@ -574,7 +590,7 @@ def parse_docker_scout_output(scout_data: Dict[str, Any], image: str, compose_fi findings.append(finding) except Exception as e: - print(f"Error parsing Docker Scout output: {e}") + logger.error(f"Error parsing Docker Scout output: {e}") import traceback traceback.print_exc() @@ -656,7 +672,7 @@ def parse_text_output(text_output: str, image: str, compose_file: str, base_path findings = [] # This is a minimal fallback - just create a summary finding - print("Warning: Using fallback text parser. Install latest Docker Scout for JSON output.") + logger.warning("Using fallback text parser. Install latest Docker Scout for JSON output.") # Make file path relative file_path = os.path.relpath(compose_file, base_path) if compose_file and base_path else compose_file diff --git a/scanner/grype_scanner.py b/scanner/grype_scanner.py index 40c46c9..774cfa2 100644 --- a/scanner/grype_scanner.py +++ b/scanner/grype_scanner.py @@ -8,6 +8,9 @@ import json import os import subprocess +from logging import getLogger + +logger = getLogger(__name__) from typing import List, Dict, Any from scanner.image_utils import ( @@ -83,12 +86,12 @@ def run_grype_scan(directory_path: str, files: List[str] = None) -> List[Dict[st # Extract images from compose files and scan them for image, compose_file in all_images_map.items(): - print(f"Scanning image with Grype: {image}") + logger.info(f"Scanning image with Grype: {image}") try: image_findings = scan_image(image, compose_file, directory_path) findings.extend(image_findings) except Exception as e: - print(f"Warning: Failed to scan image {image}: {e}") + logger.warning(f" Failed to scan image {image}: {e}") continue return findings @@ -128,15 +131,15 @@ def scan_image(image: str, compose_file: str, base_path: str) -> List[Dict[str, grype_data = json.loads(result.stdout) findings = parse_grype_output(grype_data, image, compose_file, base_path) except json.JSONDecodeError as e: - print(f"Failed to parse Grype JSON output: {e}") + logger.warning(f"Failed to parse Grype JSON output: {e}") if result.stderr and "error" in result.stderr.lower(): - print(f"Grype stderr: {result.stderr}") + logger.info(f"Grype stderr: {result.stderr}") except subprocess.TimeoutExpired: - print(f"Timeout scanning image: {image}") + logger.info(f"Timeout scanning image: {image}") except Exception as e: - print(f"Error scanning image {image}: {e}") + logger.error(f"Error scanning image {image}: {e}") return findings @@ -203,7 +206,7 @@ def parse_grype_output(grype_data: Dict[str, Any], image: str, compose_file: str findings.append(finding) except Exception as e: - print(f"Error parsing Grype output: {e}") + logger.error(f"Error parsing Grype output: {e}") import traceback traceback.print_exc() @@ -276,8 +279,8 @@ def normalize_grype_finding(vuln: Dict[str, Any], artifact: Dict[str, Any], imag 'rule_id': vuln_id, 'rule_name': f"Vulnerability in {package_name}", 'severity': normalized_severity, - 'description': f"{description[:200]}..." if len(description) > 200 else description, - 'full_description': description, # Store full description for tooltips + 'description': description, + 'full_description': description, 'remediation': f"Update {package_name} from {package_version} to {fix_version}" if fix_available == 'Yes' else f"Review {package_name}@{package_version} - no fix available", 'estimated_savings': f"Security risk mitigation ({severity})", 'line': 0, diff --git a/scanner/parser.py b/scanner/parser.py index dbd23b9..89aa72f 100644 --- a/scanner/parser.py +++ b/scanner/parser.py @@ -1,5 +1,8 @@ import os import re +from logging import getLogger + +logger = getLogger(__name__) from rules.definitions import check_rules from scanner.checkov_scanner import is_checkov_available, run_checkov_scan from scanner.docker_scout_scanner import is_docker_scout_available, run_docker_scout_scan @@ -27,10 +30,12 @@ def detect_framework(path: str = None, files: list = None) -> str: - 'kubernetes' - 'cloudformation' - 'helm' + - 'all' (fallback for Docker/secrets/actions/etc.) """ tf_files = 0 k8s_files = 0 cfn_files = 0 + helm_files = 0 scan_files = [] if files: @@ -44,6 +49,8 @@ def detect_framework(path: str = None, files: list = None) -> str: file = os.path.basename(full_path) if file.endswith('.tf'): tf_files += 1 + elif file == 'Chart.yaml' or file == 'Chart.yml': + helm_files += 1 elif file.endswith(('.yml', '.yaml')): # Check file content for better detection try: @@ -56,12 +63,16 @@ def detect_framework(path: str = None, files: list = None) -> str: except Exception: continue - if k8s_files > tf_files and k8s_files > cfn_files: + if k8s_files > tf_files and k8s_files > cfn_files and k8s_files > helm_files: return 'kubernetes' - if cfn_files > tf_files: + if cfn_files > tf_files and cfn_files > helm_files: return 'cloudformation' + if helm_files > tf_files: + return 'helm' + if tf_files > 0: + return 'terraform' - return 'terraform' + return 'all' def count_resources(path=None, framework='terraform', files=None): """ @@ -85,7 +96,7 @@ def count_resources(path=None, framework='terraform', files=None): for file in f_list: scan_files.append(os.path.join(root, file)) - if framework == 'terraform': + if framework in ('terraform', 'all'): for full_path in scan_files: if full_path.endswith('.tf'): try: @@ -97,7 +108,8 @@ def count_resources(path=None, framework='terraform', files=None): resource_count += len(matches) except Exception: continue - elif framework == 'kubernetes': + + if framework in ('kubernetes', 'all'): from scanner.image_utils import find_kubernetes_files if files: k8s_files = [f for f in files if f.endswith(('.yml', '.yaml'))] @@ -115,6 +127,26 @@ def count_resources(path=None, framework='terraform', files=None): resource_count += 1 except Exception: continue + + if framework in ('containers', 'all'): + from scanner.image_utils import find_compose_files + if files: + from scanner.image_utils import filter_container_files + compose_files, _ = filter_container_files(files) + else: + compose_files = find_compose_files(path) + + for compose_file in compose_files: + try: + import yaml + with open(compose_file, 'r', encoding='utf-8') as f: + compose_data = yaml.safe_load(f) + if compose_data and isinstance(compose_data, dict) and 'services' in compose_data: + services = compose_data['services'] + if isinstance(services, dict): + resource_count += len(services) + except Exception: + continue return resource_count @@ -143,7 +175,7 @@ def resolve_included_paths(base_path, included_paths): elif os.path.isdir(full_path): for root, dirs, files in os.walk(full_path): for file in files: - if file.endswith(valid_extensions) or file.startswith('docker-compose'): + if file.endswith(valid_extensions) or file.startswith('docker-compose') or file.startswith('compose'): resolved_files.append(os.path.join(root, file)) return list(set(resolved_files)) @@ -194,16 +226,21 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e if included_paths: resolved_files = resolve_included_paths(path, included_paths) if not resolved_files: - print(f"Warning: No valid files found in included paths: {included_paths}") + logger.warning(f"No valid files found in included paths: {included_paths}") return [], 0, [] # Auto-detect framework if needed if framework == 'auto' or not framework: framework = detect_framework(path, files=resolved_files) - print(f"Detected framework: {framework}") + logger.info(f"Detected framework: {framework}") # Count resources for reporting resource_count = count_resources(path, framework, files=resolved_files) + # Log discovered files + if resolved_files: + logger.info("Files passed to Checkov:") + for file in resolved_files: + logger.info(f" - {os.path.relpath(file, path)}") # Run cost-focused regex scanner if 'regex' in active_scanners: @@ -220,6 +257,7 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e # Scan all files and collect results for file_path in all_files: + logger.info(f"[INFO] Scanning Terraform file: {os.path.relpath(file_path, path)}") file_results = scan_file(file_path) if file_results: results.extend(file_results) @@ -232,6 +270,10 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e if 'checkov' in active_scanners: if is_checkov_available(): try: + if resolved_files: + logger.info("[INFO] Files passed to Checkov:") + for file in resolved_files: + logger.info(f" - {os.path.relpath(file, path)}") checkov_results = run_checkov_scan( path, framework, @@ -243,9 +285,9 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e result['scanner'] = 'checkov' results.extend(checkov_results) except Exception as e: - print(f"Warning: Checkov scan failed: {e}") + logger.warning(f"Checkov scan failed: {e}") else: - print("Warning: Checkov is not installed. Install with: pip install checkov") + logger.warning("Checkov is not installed. Install with: pip install checkov") # Run container security scanner (Docker Scout or Grype based on config) extra_recommendations = [] # Track extra recommendations from container scanner @@ -262,16 +304,16 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e result['scanner'] = 'grype' results.extend(grype_results) except Exception as e: - print(f"Warning: Grype scan failed: {e}") + logger.warning(f"Grype scan failed: {e}") else: - print("Warning: Grype is not installed. See https://github.com/anchore/grype for installation") + logger.warning("Grype is not installed. See https://github.com/anchore/grype for installation") else: # docker-scout (default) if is_docker_scout_available(): try: scout_results, scout_recommendations, auth_failed = run_docker_scout_scan(path, files=resolved_files) if auth_failed and is_grype_available() and not scout_results: - print("\n[i] Falling back to Grype scanner (no Docker Hub login detected)...") + logger.info("\n[i] Falling back to Grype scanner (no Docker Hub login detected)...") try: from scanner.grype_scanner import run_grype_scan grype_results = run_grype_scan(path, files=resolved_files) @@ -279,9 +321,9 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e for result in grype_results: result['scanner'] = 'grype' results.extend(grype_results) - print(f" Grype scan completed with {len(grype_results)} findings.") + logger.info(f" Grype scan completed with {len(grype_results)} findings.") except Exception as grype_e: - print(f" Grype fallback failed: {grype_e}") + logger.info(f" Grype fallback failed: {grype_e}") else: # Add scanner tag for result in scout_results: @@ -289,9 +331,20 @@ def scan_directory(path, scanner_type='regex', framework='terraform', download_e results.extend(scout_results) extra_recommendations.extend(scout_recommendations) except Exception as e: - print(f"Warning: Docker Scout scan failed: {e}") + logger.warning(f"Docker Scout scan failed: {e}") + elif is_grype_available(): + logger.info("\n[i] Docker Scout is not installed, falling back to Grype scanner...") + try: + from scanner.grype_scanner import run_grype_scan + grype_results = run_grype_scan(path, files=resolved_files) + for result in grype_results: + result['scanner'] = 'grype' + results.extend(grype_results) + logger.info(f" Grype scan completed with {len(grype_results)} findings.") + except Exception as grype_e: + logger.info(f" Grype fallback failed: {grype_e}") else: - print("Warning: Docker Scout is not installed. See https://docs.docker.com/scout/ for installation") + logger.warning("Docker Scout is not installed. See https://docs.docker.com/scout/ for installation") # Add scanner tag to regex results and normalize paths for result in results: @@ -327,7 +380,7 @@ def scan_file(filepath): # In a more advanced version, we would parse HCL here findings = check_rules(filepath, content) except Exception as e: - print(f"Warning: Could not read file {filepath}: {e}") + logger.warning(f"Could not read file {filepath}: {e}") return findings