Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,15 +329,34 @@ The system is designed to be extensible for future enhancements like historical

## 📋 Detection Rules

**19 Cost Optimization Rules** including:
- COST-001: Old generation instances (t2, m3, c4, r3)
- COST-002: Over-provisioned large instances
**27 Cost Optimization Rules** including:
- COST-001: Old generation EC2 instances (t2, m3, c4, r3)
- COST-002: Over-provisioned large instances (8xlarge+)
- COST-003: Unencrypted EBS volumes
- COST-004: Expensive Provisioned IOPS (io1/io2)
- COST-005: Expensive NAT Gateways
- COST-006: Unassociated Elastic IPs
- COST-007: DynamoDB Provisioned billing mode
- COST-008: EC2 detailed monitoring enabled
- COST-009: Old generation storage (gp2 vs gp3)
- COST-010: Missing S3 lifecycle policies
- COST-011: Missing AWS budgets
- COST-012: Missing Spot instance usage
- COST-013: Expensive premium storage (Premium_LRS)
- COST-014: Unnecessary Route53 health checks
- COST-015: CloudWatch log groups without retention period
- COST-016: Oversized root EBS volumes
- COST-017: Missing Cost and Usage Report
- COST-018: High DynamoDB provisioned capacity
- COST-019: Load balancers on single-instance deployments
- COST-020: Old generation RDS instance classes (db.t2, db.m4, db.r3, db.r4)
- COST-021: Lambda over-provisioned memory (≥3008 MB)
- COST-022: API Gateway REST API instead of HTTP API (3.5× cheaper)
- COST-023: SQS queues at maximum 14-day message retention
- COST-024: RDS Multi-AZ enabled in non-production environments
- COST-025: ECS task definitions without CPU/memory limits
- COST-026: Multiple NAT Gateways (potential redundancy)
- COST-027: Missing VPC Endpoints for S3/DynamoDB (NAT data-processing charges)
- Plus Checkov's 100+ security/compliance checks

## 🏅 Badge
Expand Down
181 changes: 174 additions & 7 deletions rules/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,101 @@ def check(self, content):
break
return matches

class CompoundInverseRule(Rule):
"""Rule that triggers when a pattern is absent AND all required resource patterns are present (directory-level)."""
def __init__(self, id, name, severity, description, remediation, estimated_savings,
absent_pattern, required_patterns):
super().__init__(id, name, severity, description, remediation, estimated_savings)
self.absent_pattern = absent_pattern
self.required_patterns = required_patterns # all must be present in all_content

def check(self, content):
return [] # Only evaluated at directory level


class BlockAnalysisRule(Rule):
"""Base class for rules that analyse individual HCL resource blocks."""

def _extract_blocks(self, content, resource_type):
"""Return a list of dicts with keys: name, start_line, content, first_line."""
blocks = []
lines = content.splitlines()
i = 0
while i < len(lines):
line = lines[i]
match = re.search(
rf'resource\s*["\']({resource_type})["\'\s]+["\']([^"\']+)["\']', line
)
if match:
start_line = i
resource_name = match.group(2)
block_lines = [line]
brace_count = line.count('{') - line.count('}')
i += 1
while i < len(lines) and brace_count > 0:
block_lines.append(lines[i])
brace_count += lines[i].count('{') - lines[i].count('}')
i += 1
blocks.append({
'name': resource_name,
'start_line': start_line + 1,
'content': '\n'.join(block_lines),
'first_line': lines[start_line].strip(),
})
continue
i += 1
return blocks


class RdsMultiAzNonProdRule(BlockAnalysisRule):
"""Flag RDS instances with multi_az=true whose resource name suggests a non-production environment."""
_NON_PROD = re.compile(r'(dev|staging|stage|test|qa|nonprod|non.prod)', re.IGNORECASE)

def check(self, content):
matches = []
for block in self._extract_blocks(content, r'aws_db_instance'):
if self._NON_PROD.search(block['name']):
if re.search(r'multi_az\s*=\s*true', block['content']):
matches.append({'line': block['start_line'], 'content': block['first_line']})
return matches


class EcsNoCpuMemoryRule(BlockAnalysisRule):
"""Flag ECS task definitions that do not specify a top-level cpu or memory value."""

def check(self, content):
matches = []
for block in self._extract_blocks(content, r'aws_ecs_task_definition'):
if not re.search(r'^\s*cpu\s*=', block['content'], re.MULTILINE):
matches.append({'line': block['start_line'], 'content': block['first_line']})
return matches


class CwLogGroupNoRetentionRule(BlockAnalysisRule):
"""Flag CloudWatch log groups that do not set retention_in_days."""

def check(self, content):
matches = []
for block in self._extract_blocks(content, r'aws_cloudwatch_log_group'):
if not re.search(r'retention_in_days\s*=', block['content']):
matches.append({'line': block['start_line'], 'content': block['first_line']})
return matches


class MultipleNatGatewayRule(Rule):
"""Flag when more than one aws_nat_gateway is defined in the same file (likely redundancy)."""

def check(self, content):
nat_lines = [
(i + 1, line.strip())
for i, line in enumerate(content.splitlines())
if re.search(r'resource\s*["\']aws_nat_gateway["\']', line)
]
if len(nat_lines) > 1:
return [{'line': ln, 'content': lc} for ln, lc in nat_lines[1:]]
return []


class UnassociatedEipRule(Rule):
def check(self, content):
matches = []
Expand Down Expand Up @@ -217,14 +312,13 @@ def check(self, content):
estimated_savings="$0.50/month per health check",
pattern=r'resource\s*["\']aws_route53_health_check["\']'
),
RegexRule(
CwLogGroupNoRetentionRule(
id="COST-015",
name="CloudWatch Logs Without Retention",
severity="Medium",
description="CloudWatch logs without retention policy. Logs are kept indefinitely, increasing storage costs.",
remediation="Set appropriate retention periods for log groups (e.g., 7, 14, 30 days).",
estimated_savings="$5-50+/month depending on log volume",
pattern=r'aws_cloudwatch_log_group[^}]*\n(?!.*retention_in_days)'
description="CloudWatch log group without retention_in_days. Logs are kept indefinitely by default, silently growing to hundreds $/month.",
remediation="Set appropriate retention periods for log groups (e.g., 7, 14, or 30 days).",
estimated_savings="$5-50+/month depending on log volume"
),
RegexRule(
id="COST-016",
Expand Down Expand Up @@ -263,13 +357,86 @@ def check(self, content):
estimated_savings="$15-25/month per load balancer",
pattern=r'resource\s*["\']aws_(lb|elb|alb)["\']'
),
RegexRule(
id="COST-020",
name="RDS Old Generation Instance",
severity="High",
description="Usage of old generation RDS instance classes (db.t2, db.m3, db.m4, db.r3, db.r4). Newer generations are cheaper and faster.",
remediation="Upgrade to current generation instance classes (e.g., db.t3, db.m5, db.r5, db.r6g).",
estimated_savings="$20-100+/month per instance",
pattern=r'instance_class\s*=\s*["\'](db\.(t2\.|m3\.|m4\.|r3\.|r4\.))'
),
RegexRule(
id="COST-021",
name="Lambda Over-Provisioned Memory",
severity="Medium",
description="Lambda function with memory >= 3008 MB (the old Lambda maximum, a common cargo-cult setting). Lambda pricing scales linearly with memory; over-provisioning directly inflates costs.",
remediation="Profile the function with AWS Lambda Power Tuning and reduce memory to the minimum needed. Most functions run fine at 256–1024 MB.",
estimated_savings="$10-200+/month per high-traffic function",
pattern=r'memory_size\s*=\s*(3008|[4-9]\d{3}|\d{5,})'
),
RegexRule(
id="COST-022",
name="API Gateway REST Instead of HTTP API",
severity="Medium",
description="aws_api_gateway_rest_api (REST API) costs ~3.5x more per million requests than aws_apigatewayv2_api (HTTP API). Most modern use cases are supported by the HTTP API.",
remediation="Migrate to aws_apigatewayv2_api (HTTP API v2) unless REST-specific features (usage plans, request validation, custom authorizers v1) are required.",
estimated_savings="$1-50+/month per API depending on traffic",
pattern=r'resource\s*["\']aws_api_gateway_rest_api["\']'
),
RegexRule(
id="COST-023",
name="SQS Max Message Retention",
severity="Low",
description="SQS queue configured with the maximum 14-day (1209600 s) message retention. On high-volume queues this inflates storage costs and may indicate unprocessed message buildup.",
remediation="Set retention to the minimum business requirement (e.g., 1–4 days for most queues) and alert on queue depth to catch processing failures early.",
estimated_savings="$5-20+/month on high-volume queues",
pattern=r'message_retention_seconds\s*=\s*1209600'
),
RdsMultiAzNonProdRule(
id="COST-024",
name="RDS Multi-AZ in Non-Production Environment",
severity="Medium",
description="RDS instance with multi_az=true in what appears to be a non-production environment (resource name contains dev/staging/test/qa). Multi-AZ doubles the instance cost.",
remediation="Disable multi_az for non-production databases. Reserve Multi-AZ deployments for production workloads where HA is required.",
estimated_savings="Halves the RDS instance cost ($50-500+/month)"
),
EcsNoCpuMemoryRule(
id="COST-025",
name="ECS Task Definition Without CPU/Memory Limits",
severity="Medium",
description="aws_ecs_task_definition without explicit cpu and memory limits. This leads to unpredictable cluster over-provisioning as the scheduler cannot bin-pack tasks efficiently.",
remediation="Set cpu and memory at the task level. Start with the minimum viable values and scale up based on CloudWatch Container Insights metrics.",
estimated_savings="Cluster right-sizing savings ($20-200+/month)"
),
MultipleNatGatewayRule(
id="COST-026",
name="Multiple NAT Gateways (Potential Redundancy)",
severity="Medium",
description="More than one aws_nat_gateway defined. In development or staging environments a single NAT Gateway is usually sufficient; multiple gateways add ~$32/month each plus data-processing fees.",
remediation="Verify that each additional NAT Gateway is needed for HA in production. For dev/staging environments consider consolidating to a single gateway.",
estimated_savings="$32+/month per unnecessary gateway"
),
CompoundInverseRule(
id="COST-027",
name="Missing VPC Endpoints for S3/DynamoDB",
severity="High",
description="NAT Gateway and S3/DynamoDB resources are present but no aws_vpc_endpoint is defined. All S3 and DynamoDB traffic is routed through the NAT Gateway, incurring per-GB data-processing charges ($0.045/GB).",
remediation="Add Gateway VPC Endpoints for S3 (com.amazonaws.<region>.s3) and DynamoDB (com.amazonaws.<region>.dynamodb). Gateway endpoints are free and eliminate NAT data-processing charges for these services.",
estimated_savings="$50-500+/month depending on data volume",
absent_pattern=r'resource\s*["\']aws_vpc_endpoint["\']',
required_patterns=[
r'resource\s*["\']aws_nat_gateway["\']',
r'resource\s*["\']aws_(s3_bucket|dynamodb_table)["\']',
]
),
]

def check_rules(filepath, content):
"""Check only RegexRule rules (not InverseRegexRules) against a single file."""
"""Check per-file rules (RegexRule and BlockAnalysisRule subclasses) against a single file."""
findings = []
for rule in RULES:
if isinstance(rule, InverseRegexRule):
if isinstance(rule, (InverseRegexRule, CompoundInverseRule)):
continue

matches = rule.check(content)
Expand Down
36 changes: 34 additions & 2 deletions scanner/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def scan_directory_level(directory, file_paths, rules):
Returns:
List of findings
"""
from rules.definitions import InverseRegexRule
from rules.definitions import InverseRegexRule, CompoundInverseRule
findings = []

# Read all files into a dictionary to keep track of content per file
Expand Down Expand Up @@ -589,5 +589,37 @@ def scan_directory_level(directory, file_paths, rules):
"match_content": line.strip()
})
break

elif isinstance(rule, CompoundInverseRule):
# All required_patterns must be present AND absent_pattern must be missing.
absent_found = bool(re.search(rule.absent_pattern, all_content, re.MULTILINE | re.DOTALL))
if absent_found:
continue
all_required = all(
re.search(p, all_content, re.MULTILINE | re.DOTALL)
for p in rule.required_patterns
)
if not all_required:
continue
# Conditions met — attach the finding to the first file matching any required pattern.
for filepath, content in file_contents.items():
for p in rule.required_patterns:
resource_match = re.search(p, content, re.MULTILINE | re.DOTALL)
if resource_match:
for i, line in enumerate(content.splitlines()):
if re.search(p, line):
findings.append({
"file": filepath,
"rule_id": rule.id,
"rule_name": rule.name,
"severity": rule.severity,
"description": rule.description,
"remediation": rule.remediation,
"estimated_savings": rule.estimated_savings,
"line": i + 1,
"match_content": line.strip()
})
break
break
break

return findings
Loading