|
1 | 1 | import logging |
2 | | -from datetime import date, timedelta |
| 2 | +from datetime import date, datetime, time, timedelta |
3 | 3 |
|
4 | 4 | from auditlog.models import LogEntry |
5 | 5 | from celery.utils.log import get_task_logger |
@@ -93,24 +93,57 @@ def cleanup_alerts(*args, **kwargs): |
93 | 93 | logger.info("total number of alerts deleted: %s", total_deleted_count) |
94 | 94 |
|
95 | 95 |
|
96 | | -@app.task(bind=True) |
97 | | -def flush_auditlog(*args, **kwargs): |
98 | | - retention_period = settings.AUDITLOG_FLUSH_RETENTION_PERIOD |
| 96 | +def run_flush_auditlog(retention_period: int | None = None, |
| 97 | + batch_size: int | None = None, |
| 98 | + max_batches: int | None = None) -> tuple[int, int, bool]: |
| 99 | + """ |
| 100 | + Deletes audit log entries older than the configured retention period. |
99 | 101 |
|
| 102 | + Returns a tuple of (deleted_total, batches_done, reached_limit). |
| 103 | + """ |
| 104 | + retention_period = retention_period if retention_period is not None else getattr(settings, "AUDITLOG_FLUSH_RETENTION_PERIOD", -1) |
100 | 105 | if retention_period < 0: |
101 | 106 | logger.info("Flushing auditlog is disabled") |
102 | | - return |
| 107 | + return 0, 0, False |
103 | 108 |
|
104 | 109 | logger.info("Running Cleanup Task for Logentries with %d Months retention", retention_period) |
105 | | - retention_date = date.today() - relativedelta(months=retention_period) |
106 | | - subset = LogEntry.objects.filter(timestamp__date__lt=retention_date) |
107 | | - event_count = subset.count() |
108 | | - logger.debug("Initially received %d Logentries", event_count) |
109 | | - if event_count > 0: |
110 | | - subset._raw_delete(subset.db) |
111 | | - logger.debug("Total number of audit log entries deleted: %s", event_count) |
| 110 | + # Compute a datetime cutoff at start of the cutoff day to keep index-usage friendly |
| 111 | + retention_day = date.today() - relativedelta(months=retention_period) |
| 112 | + # Use a timestamp to avoid postgres having to cast to a Date field |
| 113 | + cutoff_dt = datetime.combine(retention_day, time.min, tzinfo=timezone.get_current_timezone()) |
| 114 | + |
| 115 | + # Settings to control batching; sensible defaults if not configured |
| 116 | + batch_size = batch_size if batch_size is not None else getattr(settings, "AUDITLOG_FLUSH_BATCH_SIZE", 1000) |
| 117 | + max_batches = max_batches if max_batches is not None else getattr(settings, "AUDITLOG_FLUSH_MAX_BATCHES", 100) |
| 118 | + |
| 119 | + # Delete in batches to avoid long-running transactions and table locks |
| 120 | + deleted_total = 0 |
| 121 | + batches_done = 0 |
| 122 | + while batches_done < max_batches: |
| 123 | + batch_qs = LogEntry.objects.filter(timestamp__lt=cutoff_dt).order_by("pk") |
| 124 | + pks = list(batch_qs.values_list("pk", flat=True)[:batch_size]) |
| 125 | + if not pks: |
| 126 | + if batches_done == 0: |
| 127 | + logger.info("No outdated Logentries found") |
| 128 | + break |
| 129 | + qs = LogEntry.objects.filter(pk__in=pks) |
| 130 | + deleted_count = qs._raw_delete(qs.db) |
| 131 | + deleted_total += int(deleted_count) |
| 132 | + batches_done += 1 |
| 133 | + logger.info("Deleted batch %s (size ~%s), total deleted: %s", batches_done, batch_size, deleted_total) |
| 134 | + |
| 135 | + reached_limit = batches_done >= max_batches |
| 136 | + if reached_limit: |
| 137 | + logger.info("Reached max batches limit (%s). Remaining audit log entries will be deleted in the next run.", max_batches) |
112 | 138 | else: |
113 | | - logger.debug("No outdated Logentries found") |
| 139 | + logger.info("Total number of audit log entries deleted: %s", deleted_total) |
| 140 | + |
| 141 | + return deleted_total, batches_done, reached_limit |
| 142 | + |
| 143 | + |
| 144 | +@app.task(bind=True) |
| 145 | +def flush_auditlog(*args, **kwargs): |
| 146 | + run_flush_auditlog() |
114 | 147 |
|
115 | 148 |
|
116 | 149 | @app.task(bind=True) |
|
0 commit comments