From 6863328fec14b38f0862c7ea3c0e491fcab2c76f Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Sun, 17 May 2026 03:57:32 +0000 Subject: [PATCH] feat: add findGrouped() to Adapter + ClickHouse implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new method `findGrouped(array $queries, string $groupBy, string $interval): array` to the audit Adapter abstraction and implement it in the ClickHouse adapter. It produces time-bucketed grouped counts for charts of the form "events per , grouped by ". The method validates `$groupBy` against {event, userType, resourceType} and `$interval` against {hour, day, week, month}, then reuses the existing `parseQueries()` filter/param pipeline to apply both the query filters and `getTenantFilter()` for tenant-safe scoping. `Query::limit()` / `Query::offset()` are applied to the group set (default 25, clamped to 100), with top-N selection by `SUM(count)` desc and ties broken by groupValue asc. A single SQL round-trip drives the aggregation via CTEs: top-N groups, time bounds, a zero-filled bucket axis built with `arrayJoin(range(...))` + `date_add(, i, bucket_from)`, and the per-bucket aggregate. A final `CROSS JOIN` + `LEFT JOIN` produces the dense (groupValue, bucket, count) result set, with buckets aligned via `toStartOfInterval(time, INTERVAL 1 , 'UTC')` and formatted as ISO-8601 UTC strings to match how `find()` returns `time`. The Database adapter throws explicitly — grouped time-bucketed aggregations are an analytical-store feature. Schema: a `_key_resource_type` bloom-filter index has been added to the ClickHouse `getIndexes()` definition so new installs get it. Existing installs need a one-off migration to add it; left out of this PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Audit/Adapter.php | 30 +++ src/Audit/Adapter/ClickHouse.php | 179 +++++++++++++++++ src/Audit/Adapter/Database.php | 18 ++ src/Audit/Audit.php | 15 ++ tests/Audit/Adapter/ClickHouseTest.php | 259 +++++++++++++++++++++++++ tests/Audit/Adapter/DatabaseTest.php | 8 + 6 files changed, 509 insertions(+) diff --git a/src/Audit/Adapter.php b/src/Audit/Adapter.php index a6d5a53..d2cae24 100644 --- a/src/Audit/Adapter.php +++ b/src/Audit/Adapter.php @@ -241,4 +241,34 @@ abstract public function find(array $queries = []): array; * @throws \Exception */ abstract public function count(array $queries = [], ?int $max = null): int; + + /** + * Find logs aggregated into (groupValue, bucket, count) rows. + * + * Produces time-bucketed grouped counts for charts of the form + * "events per , grouped by ". The result is a flat + * array of associative rows ordered by groupValue asc, bucket asc; + * cloud-side consumers reshape into nested groups. + * + * Adapters MUST: + * - reject `$groupBy` values outside {`event`, `userType`, `resourceType`} + * - reject `$interval` values outside {`hour`, `day`, `week`, `month`} + * - apply the same filters/tenant scoping as `find()` to `$queries` + * - apply `Query::limit()` / `Query::offset()` to the group set (top-N + * by `SUM(count)` desc, ties by groupValue asc), default limit 25, + * hard max 100 + * - zero-fill every bucket between the smallest and largest bucket + * implied by the time filter for every returned group, with UTC-aligned + * boundaries + * - return the bucket as an ISO-8601 UTC string (matching `find()`'s + * formatting of `time`) + * + * @param array<\Utopia\Audit\Query> $queries Filter/limit/offset queries; callers should include a time BETWEEN filter + * @param string $groupBy One of `event`, `userType`, `resourceType` + * @param string $interval One of `hour`, `day`, `week`, `month` + * @return array + * + * @throws \Exception + */ + abstract public function findGrouped(array $queries, string $groupBy, string $interval): array; } diff --git a/src/Audit/Adapter/ClickHouse.php b/src/Audit/Adapter/ClickHouse.php index 86b6f02..1647220 100644 --- a/src/Audit/Adapter/ClickHouse.php +++ b/src/Audit/Adapter/ClickHouse.php @@ -529,6 +529,13 @@ public function getIndexes(): array 'lengths' => [], 'orders' => [], ], + [ + '$id' => '_key_resource_type', + 'type' => Database::INDEX_KEY, + 'attributes' => ['resourceType'], + 'lengths' => [], + 'orders' => [], + ], ]; } @@ -1079,6 +1086,178 @@ public function count(array $queries = [], ?int $max = null): int return $trimmed !== '' ? (int) $trimmed : 0; } + /** + * Allowed groupBy columns for findGrouped(). + * + * @var list + */ + private const FIND_GROUPED_ATTRIBUTES = ['event', 'userType', 'resourceType']; + + /** + * Allowed interval values for findGrouped() mapped to ClickHouse date units. + * + * @var array + */ + private const FIND_GROUPED_INTERVALS = [ + 'hour' => 'hour', + 'day' => 'day', + 'week' => 'week', + 'month' => 'month', + ]; + + private const FIND_GROUPED_DEFAULT_LIMIT = 25; + + private const FIND_GROUPED_MAX_LIMIT = 100; + + /** + * Find logs aggregated into (groupValue, bucket, count) rows. + * + * Top-N groups are selected by `SUM(count)` desc (ties by groupValue asc), + * then paginated via the supplied `Query::limit()` / `Query::offset()`. + * Buckets are aligned to UTC via `toStartOfInterval(time, INTERVAL 1 , 'UTC')` + * and zero-filled across the time range implied by the filter set. + * + * @param array $queries + * @param string $groupBy + * @param string $interval + * @return array + * @throws Exception + */ + public function findGrouped(array $queries, string $groupBy, string $interval): array + { + if (!in_array($groupBy, self::FIND_GROUPED_ATTRIBUTES, true)) { + $allowed = implode(', ', self::FIND_GROUPED_ATTRIBUTES); + throw new Exception("Invalid groupBy '{$groupBy}'. Allowed: {$allowed}"); + } + + if (!array_key_exists($interval, self::FIND_GROUPED_INTERVALS)) { + $allowed = implode(', ', array_keys(self::FIND_GROUPED_INTERVALS)); + throw new Exception("Invalid interval '{$interval}'. Allowed: {$allowed}"); + } + + $unit = self::FIND_GROUPED_INTERVALS[$interval]; + + $this->validateAttributeName($groupBy); + $escapedGroup = $this->escapeIdentifier($groupBy); + $escapedTime = $this->escapeIdentifier('time'); + + $tableName = $this->getTableName(); + $escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName); + + $parsed = $this->parseQueries($queries); + + $filters = $parsed['filters']; + $tenantFilter = $this->getTenantFilter(); + if ($tenantFilter !== '') { + $filters[] = ltrim($tenantFilter, ' AND'); + } + + $whereClause = empty($filters) ? '' : ' WHERE ' . implode(' AND ', $filters); + + $params = $parsed['params']; + unset($params['limit'], $params['offset']); + + $limit = self::FIND_GROUPED_DEFAULT_LIMIT; + if (isset($parsed['limit'])) { + $limit = $parsed['limit']; + if ($limit > self::FIND_GROUPED_MAX_LIMIT) { + $limit = self::FIND_GROUPED_MAX_LIMIT; + } + if ($limit < 1) { + $limit = 1; + } + } + + $offset = 0; + if (isset($parsed['offset'])) { + $offset = max(0, $parsed['offset']); + } + + $params['group_limit'] = $limit; + $params['group_offset'] = $offset; + + $sql = " + WITH + top_groups AS ( + SELECT value FROM ( + SELECT {$escapedGroup} AS value, COUNT(*) AS sum_count + FROM {$escapedTable}{$whereClause} + GROUP BY value + ORDER BY sum_count DESC, value ASC + LIMIT {group_limit:UInt64} OFFSET {group_offset:UInt64} + ) + ), + bounds AS ( + SELECT + toStartOfInterval(MIN({$escapedTime}), INTERVAL 1 {$unit}, 'UTC') AS bucket_from, + toStartOfInterval(MAX({$escapedTime}), INTERVAL 1 {$unit}, 'UTC') AS bucket_to + FROM {$escapedTable}{$whereClause} + ), + buckets AS ( + SELECT arrayJoin( + arrayMap( + i -> date_add({$unit}, i, bucket_from), + range(0, toUInt64(dateDiff('{$unit}', bucket_from, bucket_to) + 1)) + ) + ) AS bucket + FROM bounds + WHERE bucket_from IS NOT NULL AND bucket_to IS NOT NULL + ), + agg AS ( + SELECT + {$escapedGroup} AS value, + toStartOfInterval({$escapedTime}, INTERVAL 1 {$unit}, 'UTC') AS bucket, + COUNT(*) AS count + FROM {$escapedTable}{$whereClause} + GROUP BY value, bucket + ) + SELECT + g.value AS value, + formatDateTime(b.bucket, '%Y-%m-%dT%H:%i:%S.000+00:00') AS bucket, + toUInt64(coalesce(a.count, 0)) AS count + FROM top_groups g + CROSS JOIN buckets b + LEFT JOIN agg a ON a.value = g.value AND a.bucket = b.bucket + WHERE g.value IN (SELECT value FROM top_groups) + ORDER BY value ASC, bucket ASC + FORMAT JSON + "; + + $result = $this->query($sql, $params); + $decoded = json_decode($result, true); + + if (!is_array($decoded) || !isset($decoded['data']) || !is_array($decoded['data'])) { + return []; + } + + $rows = []; + /** @var array> $data */ + $data = $decoded['data']; + foreach ($data as $row) { + if (!is_array($row)) { + continue; + } + $value = $row['value'] ?? ''; + $bucket = $row['bucket'] ?? ''; + $count = $row['count'] ?? 0; + + if (!is_string($value)) { + $value = is_scalar($value) ? (string) $value : ''; + } + if (!is_string($bucket)) { + $bucket = is_scalar($bucket) ? (string) $bucket : ''; + } + + $rows[] = [ + 'value' => $value, + 'bucket' => $bucket, + 'count' => is_numeric($count) ? (int) $count : 0, + ]; + } + + return $rows; + } + /** * Parse Query objects into SQL components. * diff --git a/src/Audit/Adapter/Database.php b/src/Audit/Adapter/Database.php index cda3383..dda152e 100644 --- a/src/Audit/Adapter/Database.php +++ b/src/Audit/Adapter/Database.php @@ -542,4 +542,22 @@ public function count(array $queries = [], ?int $max = null): int ); }); } + + /** + * Find logs aggregated into (groupValue, bucket, count) rows. + * + * Not implemented for the Database adapter — grouped time-bucketed + * aggregations are designed for analytical backends (ClickHouse). Callers + * relying on this method must use the ClickHouse adapter. + * + * @param array<\Utopia\Audit\Query> $queries + * @param string $groupBy + * @param string $interval + * @return array + * @throws \Exception + */ + public function findGrouped(array $queries, string $groupBy, string $interval): array + { + throw new Exception('findGrouped is not supported by the Database adapter'); + } } diff --git a/src/Audit/Audit.php b/src/Audit/Audit.php index 0e1deb4..beaf272 100644 --- a/src/Audit/Audit.php +++ b/src/Audit/Audit.php @@ -289,4 +289,19 @@ public function count(array $queries = [], ?int $max = null): int { return $this->adapter->count($queries, $max); } + + /** + * Find logs aggregated into (groupValue, bucket, count) rows. + * + * @param array $queries Filter/limit/offset queries; callers should include a time BETWEEN filter + * @param string $groupBy One of `event`, `userType`, `resourceType` + * @param string $interval One of `hour`, `day`, `week`, `month` + * @return array + * + * @throws \Exception + */ + public function findGrouped(array $queries, string $groupBy, string $interval): array + { + return $this->adapter->findGrouped($queries, $groupBy, $interval); + } } diff --git a/tests/Audit/Adapter/ClickHouseTest.php b/tests/Audit/Adapter/ClickHouseTest.php index b50da71..15a0760 100644 --- a/tests/Audit/Adapter/ClickHouseTest.php +++ b/tests/Audit/Adapter/ClickHouseTest.php @@ -911,4 +911,263 @@ public function testOrderRandomRejectedWithColumnOrder(): void Query::orderDesc('time'), ]); } + + public function testFindGroupedRejectsInvalidGroupBy(): void + { + $this->expectException(\Exception::class); + $this->expectExceptionMessage("Invalid groupBy 'userId'"); + + $this->audit->findGrouped([], 'userId', 'hour'); + } + + public function testFindGroupedRejectsInvalidInterval(): void + { + $this->expectException(\Exception::class); + $this->expectExceptionMessage("Invalid interval 'minute'"); + + $this->audit->findGrouped([], 'event', 'minute'); + } + + public function testFindGroupedByEventReturnsBucketedCounts(): void + { + $from = (new \DateTime())->modify('-1 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + ], + 'event', + 'hour' + ); + + $this->assertNotEmpty($rows); + + $events = []; + foreach ($rows as $row) { + $this->assertArrayHasKey('value', $row); + $this->assertArrayHasKey('bucket', $row); + $this->assertArrayHasKey('count', $row); + $this->assertIsString($row['value']); + $this->assertIsString($row['bucket']); + $this->assertIsInt($row['count']); + // Bucket is ISO-8601 UTC + $this->assertMatchesRegularExpression( + '/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/', + $row['bucket'] + ); + $events[$row['value']] = ($events[$row['value']] ?? 0) + $row['count']; + } + + // Fixture has 4 rows: 2 update, 1 delete, 1 insert + $this->assertEquals(2, $events['update'] ?? null); + $this->assertEquals(1, $events['delete'] ?? null); + $this->assertEquals(1, $events['insert'] ?? null); + } + + public function testFindGroupedOrdersByValueAscThenBucketAsc(): void + { + $from = (new \DateTime())->modify('-1 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + ], + 'event', + 'hour' + ); + + $this->assertNotEmpty($rows); + + $previousValue = null; + $previousBucketForValue = null; + foreach ($rows as $row) { + if ($previousValue === null || $row['value'] !== $previousValue) { + if ($previousValue !== null) { + $this->assertGreaterThan($previousValue, $row['value']); + } + $previousValue = $row['value']; + $previousBucketForValue = $row['bucket']; + continue; + } + $this->assertGreaterThanOrEqual($previousBucketForValue, $row['bucket']); + $previousBucketForValue = $row['bucket']; + } + } + + public function testFindGroupedZeroFillsBucketRange(): void + { + $from = (new \DateTime())->modify('-3 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + ], + 'event', + 'hour' + ); + + $this->assertNotEmpty($rows); + + // Group counts per value + $bucketsPerValue = []; + foreach ($rows as $row) { + $bucketsPerValue[$row['value']] = ($bucketsPerValue[$row['value']] ?? 0) + 1; + } + + // Fixture rows are within a single hour; bounds = [bucket_min, bucket_max] + // inclusive. With all 4 fixture rows in one hour, every group should have + // exactly one bucket (the min and max bucket are the same). + foreach ($bucketsPerValue as $value => $count) { + $this->assertGreaterThanOrEqual(1, $count, "Group '{$value}' should have at least one bucket"); + // Each group has the same number of buckets (zero-fill is consistent) + $this->assertEquals(reset($bucketsPerValue), $count, 'All groups should share the same bucket count'); + } + } + + public function testFindGroupedRespectsLimit(): void + { + $from = (new \DateTime())->modify('-1 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + Query::limit(1), + ], + 'event', + 'hour' + ); + + $this->assertNotEmpty($rows); + $values = array_unique(array_map(fn (array $r) => $r['value'], $rows)); + $this->assertCount(1, $values); + // Top-N by SUM(count) desc: 'update' has 2 rows, others have 1 → wins + $this->assertEquals('update', $rows[0]['value']); + } + + public function testFindGroupedClampsLimitToMax(): void + { + $from = (new \DateTime())->modify('-1 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + Query::limit(500), + ], + 'event', + 'hour' + ); + + // Hard max is 100; fixture only has 3 distinct events so result is small + $this->assertNotEmpty($rows); + $values = array_unique(array_map(fn (array $r) => $r['value'], $rows)); + $this->assertLessThanOrEqual(100, count($values)); + } + + public function testFindGroupedRespectsOffset(): void + { + $from = (new \DateTime())->modify('-1 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $page1 = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + Query::limit(1), + ], + 'event', + 'hour' + ); + $page2 = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + Query::limit(1), + Query::offset(1), + ], + 'event', + 'hour' + ); + + $this->assertNotEmpty($page1); + $this->assertNotEmpty($page2); + $this->assertNotEquals($page1[0]['value'], $page2[0]['value']); + } + + public function testFindGroupedByUserTypeReturnsRows(): void + { + $from = (new \DateTime())->modify('-1 hour'); + $to = (new \DateTime())->modify('+1 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($from), + \Utopia\Database\DateTime::format($to), + ), + ], + 'userType', + 'day' + ); + + $this->assertNotEmpty($rows); + // Fixture sets userType=member for every row + $total = 0; + foreach ($rows as $row) { + $this->assertEquals('member', $row['value']); + $total += $row['count']; + } + $this->assertEquals(4, $total); + } + + public function testFindGroupedReturnsEmptyForEmptyRange(): void + { + $past1 = (new \DateTime())->modify('-3 hour'); + $past2 = (new \DateTime())->modify('-2 hour'); + + $rows = $this->audit->findGrouped( + [ + Query::between( + 'time', + \Utopia\Database\DateTime::format($past1), + \Utopia\Database\DateTime::format($past2), + ), + ], + 'event', + 'hour' + ); + + $this->assertEquals([], $rows); + } } diff --git a/tests/Audit/Adapter/DatabaseTest.php b/tests/Audit/Adapter/DatabaseTest.php index a026208..8abb0e7 100644 --- a/tests/Audit/Adapter/DatabaseTest.php +++ b/tests/Audit/Adapter/DatabaseTest.php @@ -39,4 +39,12 @@ protected function initializeAudit(): void $this->audit->setup(); } } + + public function testFindGroupedNotSupported(): void + { + $this->expectException(\Exception::class); + $this->expectExceptionMessage('findGrouped is not supported by the Database adapter'); + + $this->audit->findGrouped([], 'event', 'hour'); + } }