Skip to content

Commit 12117ce

Browse files
harshachclaudemohityadav766ShaileshParmar11
committed
Perf/redis cache metrics and indexes (#27499)
* perf(cache): wire Redis metrics, fix REST GET cache path, cache ReadBundle Three changes that make the Redis cache actually earn its keep on the hot read path: PR1: Observability + safety - Wire CacheMetrics into RedisCacheProvider so hits/misses/errors/latency surface on /prometheus (recorders existed but were never called). - Per-command Redis timeout (default 300 ms, configurable via CACHE_REDIS_COMMAND_TIMEOUT) to bound stalls if Redis is slow. - Pipeline the relationship-invalidate loop into a single DEL. - Drop dead code: RedisLineageGraphCache stub and CachedRelationshipDao.{list, batchGetRelationships}. PR1.5: Make REST GET consult the cache at all - EntityResource.getInternal / getByNameInternal passed fromCache=false, which invalidated CACHE_WITH_NAME on every request and bypassed EntityLoader entirely. Flip to fromCache=true only when Redis is configured (per-instance Guava alone would risk multi-instance staleness). - Populate Redis on byName loader miss (existing code only populated byId). Cross-instance reads now warm. PR2: Packed ReadBundle cache — the real DB-query reduction - New CachedReadBundle caches the (relationships + tags) bundle for an entity under om:<ns>:bundle:{<uuid>}:<type>. Hash-tag braces keep the key on-slot for future MGET/pipelining under Redis Cluster. - EntityRepository.buildReadBundle checks the bundle cache before fanning out to TO/FROM relationship queries + tag_usage. On miss, does the existing DB work and writes the DTO. - EntityRepository.invalidateCache deletes the bundle key. Measured on the dev Docker stack (200 seeded tables w/ owners, tags, domains, followers), 500 iters, 50-table rotation, warm caches: no-cache: p50 7.33 ms p95 10.79 ms p99 13.61 ms 128 req/s warm+redis (PR2) p50 4.11 ms p95 5.24 ms p99 6.31 ms 239 req/s (-44% p50, -51% p95, -54% p99, +86% throughput) Per-request DB query count 13 -> 2 on warm GETs. Bundle-cache hit rate ~85% during the run. PATCH invalidates the bundle as expected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): cross-instance cache invalidation via Redis pub/sub Per-instance Guava caches (CACHE_WITH_ID, CACHE_WITH_NAME) diverge across replicas when one instance writes and others keep serving stale data until the 30 s expireAfterWrite kicks in. Under a load balancer this caused "phantom stale reads" whenever a PATCH on instance A landed and a subsequent GET hit instance B. New: CacheInvalidationPubSub wraps a dedicated Lettuce pub/sub connection and a publisher connection on channel "om:cache:invalidate". Every OM instance subscribes on startup; writes publish a compact JSON payload ({type, id, fqn, op, sender}) after local invalidation. Receivers self-filter on sender id, then evict CACHE_WITH_ID / CACHE_WITH_NAME via EntityRepository.onRemoteCacheInvalidate and drop the bundle key. Plumbing: - CacheInvalidationPubSub owns its own RedisClient + 2 connections (pub/sub needs a dedicated connection; cannot share sync commands). Modeled after the existing RedisJobNotifier. - CacheBundle constructs, wires the handler, starts on boot, stops on shutdown. - EntityRepository.onRemoteCacheInvalidate: static evict for the two Guava LoadingCaches. - EntityRepository.invalidateCache (delete path) and EntityUpdater.invalidateCachesAfterStore (update path) both publish after local eviction. - Guava expireAfterWrite (30 s) stays as a lost-message backstop. Verified with two OM instances (new docker-compose.multiserver.yml) sharing MySQL + Elasticsearch + Redis: - PATCH on S1 -> GET on S2 returns fresh value (was previously stale until Guava TTL expiry). - PATCH on S2 -> GET on S1 returns fresh value. - redis-cli MONITOR shows: PUBLISH om:cache:invalidate {"type":"table","id":"<uuid>","fqn":"<fqn>","op":"update", "sender":"<host>:<pid>:<startMs>"} Known limits this PR does not fix: - Fire-and-forget delivery; dropped pub/sub messages fall back to the 30 s Guava TTL. Redis Streams with consumer cursors is the upgrade path if we see drops. - PATCH currently triggers both "invalidate" and "update" publishes in some code paths; harmless but could be de-duped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): single-flight stampede protection on bundle cache A cold bundle miss previously caused 3 DB queries per request. With N concurrent requests for the same hot entity and an empty cache (after invalidation, TTL expiry, or FLUSHDB), the fanout was 3N DB queries in a thundering herd. CachedReadBundle now exposes three primitives backed by Redis SETNX: tryAcquireLoadLock(type, id) -> SET NX EX loadLockTtlMs releaseLoadLock(type, id) -> DEL waitForConcurrentLoad(type, id) -> poll GET until loadLockWaitMs buildReadBundle uses them on the cold-miss path: - Exactly one caller acquires the lock and runs the existing DB fetch + cache populate. - Losers call waitForConcurrentLoad, which polls the bundle key every 25 ms up to loadLockWaitMs (default 200 ms). On populate they read the cached value like any cache hit. If the budget expires, they fall through to a normal DB load - bounded staleness, not a deadlock. - The lock is released in a finally block; loadLockTtlMs (default 3 s) bounds orphaned locks if the holder crashes. Verified with docker compose stack and a 25-way concurrent burst after FLUSHDB: Redis MONITOR during cold burst (excerpted): SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX <-- one wins SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX <-- others SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX lose SET om:dev:bundle:{<id>}:table:loading "1" EX 3 NX ... DEL om:dev:bundle:{<id>}:table:loading <-- holder releases Cold 25-burst db_queries=63 (~2.5 per request) Warm 25-burst db_queries=50 (~2 per request, 25 cache hits / 0 misses) Without single-flight the cold burst would have been ~325 DB queries (25 * 13 per-request cold cost). Net a 5x reduction on the stampede scenario. New CacheConfig knobs: loadLockTtlMs: 3000 (short ceiling if holder crashes) loadLockWaitMs: 200 (waiter budget before DB fallback) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): rewrite warmup with bulk SQL + pipelined Redis writes The old CacheWarmupApp took hours on even modest installs because it: - Iterated entities via repository.find(Include.ALL) (triggers full ReadBundle fan-out per row). - Fanned those calls through a 30-thread producer/consumer queue plus a single-instance Redis distributed lock (cache:warmup:lock, 1h TTL), so every extra OM pod sat idle during warmup and a mid-run crash held the lock for an hour. - Issued N individual Redis writes per entity with no pipelining. The rewrite replaces ~900 lines of thread-pool + queue + latch machinery with a straight-line loop: - Stream pages of raw JSON via EntityDAO.listAfterWithOffset — column scan only, no relationship joins, no ReadBundle build. - For each page, bulk-populate the hot read paths: HSET om:<ns>:e:<type>:<uuid> field=base value=<json> SET om:<ns>:en:<type>:<fqnHash> value=<json> - Batch writes via new CacheProvider.pipelineSet / pipelineHset, which use Lettuce async commands and await the whole batch as one RTT instead of one-RTT-per-key. - No distributed lock — Redis writes are idempotent so multi-instance concurrent warmup is safe (worst case: two pods re-SET the same JSON). Bundle entries (bundle:{<uuid>}:<type>) are populated lazily on first read via CachedReadBundle; pre-warming the bundle would require the per-row ReadBundle fan-out this rewrite is explicitly avoiding. Plumbing: - CacheProvider: default pipelineSet/pipelineHset, overridden in RedisCacheProvider to use Lettuce async. - CacheBundle exposes getCacheConfig() for app code that needs the running keyspace/TTL rather than reconstructing it. Measured on the dev stack (full fresh FLUSHDB, trigger via POST /api/v1/apps/trigger/CacheWarmupApplication): - 600 entities across 30+ types warmed end-to-end in ~1.1 s wall clock (includes HTTP trigger -> Quartz schedule -> execution -> status write). The per-entity-type phase is sub-50 ms for small types. - 1201 Redis keys populated (600 entities x base + byName). - Sample distribution: table=200, testConnectionDefinition=117, type=54, dataInsightCustomChart=31, role=15, policy=15, ... Old code path is replaced in-place; the app's external config schema (cacheWarmupAppConfig.json) and trigger endpoint are unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): cache certification + container refs, 0 DB queries per warm GET Close out the last two DB queries firing on the warm-cache path. 1. Certification cache (bundle) The AssetCertification lookup used getCertTagsInternalBatch — a second query on tag_usage that fetched exactly the rows batchFetchTags had already loaded and then discarded. Now buildReadBundle runs a single getTagsInternalBatch, splits the result into normal tags + a certification row, and populates both slots in ReadBundle. Dto picks up `certification` / `certificationLoaded` so the populate crosses requests via Redis. getCertification() reads from ReadBundleContext.getCurrent() on the fast path. 2. Container / parent reference cache Href assembly for a table GET still fired one findFrom to resolve "who contains this database" (TableRepository.setDefaultFields when the table row doesn't have service embedded). Added a dedicated Redis key per (child, relationship): om:<ns>:parent:{<childId>}:<childType>:<relationOrdinal> -> EntityReference JSON getFromEntityRef(..., fromEntityType=null, ...) checks the cache, populates on miss. CachedRelationshipDao gets get/put/invalidate container helpers. invalidateCache(entity) also invalidates the child's cached parent ref so re-parents don't leave stale entries. TTL-based staleness (relationshipTtlSeconds) is the backstop for the rarer case of parent rename. 3. Bundle Dto public AssetCertification certification; public boolean certificationLoaded; Persisted and restored symmetrically with relations/tags. Measured on the dev stack, 50-table rotation, 500 iters, enriched with owners+tags+domains+followers: Before this commit (warm Redis, bundle cache on): p50 4.11 ms p95 5.24 ms p99 6.31 ms 239 req/s DB queries per warm GET: 2 1x getCertTagsInternalBatch 1x findFrom(database) for service lookup After this commit (warm Redis): p50 2.95 ms p95 3.76 ms p99 4.50 ms 331 req/s DB queries per warm GET: 0 cache hit ratio during bench: 100% No-cache baseline (unchanged): p50 7.26 ms p95 10.68 ms p99 13.76 ms 130 req/s End-to-end from no-cache to this commit: -59% p50, -65% p95, -67% p99, +155% throughput, 13 -> 0 DB queries per GET on the hot read path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): fix write-through shape + tighten invalidation on updates Two bugs exposed by a cache-coherence audit on updates: 1. Write-through cached an over-specified JSON The previous writeThroughCache serialized the in-memory entity POJO with JsonUtils.pojoToJson(entity). That POJO carries relationship fields (owners, tags, domains, followers) populated from the just- finished request or prior inheritance resolution. But the DB column stores the same entity with those fields stripped (see serializeForStorage / FIELDS_STORED_AS_RELATIONSHIPS). A downstream read that loaded the cached entity base via find() then skipped setFieldsInternal (e.g. Entity.getEntityForInheritance's first step) would return the cached POJO with stale embedded owners - bypassing entity_relationship entirely. Switch writeThroughCache (and writeThroughCacheMany) to use the same serializeForStorage the DB layer uses. Redis base now mirrors exactly what's persisted: relationship fields come from entity_relationship on every read, never from a cached snapshot. 2. Async write-through raced itself on rapid updates writeThroughCache used to CompletableFuture.runAsync on a shared executor, re-reading from the DB. Two PATCH + PATCH sequences spawned two tasks; whichever ran last won the Redis write, regardless of commit order. Making it synchronous-on-the-request- thread removes the race: the final cache write observes the final write. 3. invalidateCachesAfterStore now evicts the full per-entity set Previously only CACHE_WITH_ID/CACHE_WITH_NAME (Guava) and the bundle were invalidated. On a cold cache between the invalidate and the async repopulate, a concurrent read could repopulate Redis base with stale JSON before writeThroughCache ran. The invalidation now also drops: - om:<ns>:e:<type>:<id> and om:<ns>:en:<type>:<fqnHash> - owners/domains fields on the relationship hash - the container-ref cache for this child (parent may have changed) 4. Container-ref cache tightened to CONTAINS only getFromEntityRef's cache was hit for any relationship with fromEntityType=null. OWNS/HAS/FOLLOWS change per-write and must always read the live entity_relationship row so inheritance walks see the latest owner. Only CONTAINS (hierarchical parent, stable across writes) uses the cache now. Validation (single-instance, Redis enabled): om-cache-validate.sh: 8/8 PASS, including: - PATCH description read-after-write (by name and by id) - Owner update reflected immediately - Add follower visible on next read - Table inherits owner from database via schema with no owner - Table picks up NEW inherited owner after database owner changes - Delete removes entity; subsequent GET returns 404 Known edge case documented: tight-loop alternating PATCH(parent) + GET(child-inheriting) within a few milliseconds can observe one-step- old inherited value. Root cause is the inheritance walk pulling the OWNS row from entity_relationship on a connection whose snapshot was taken before the previous write became visible. Natural workloads (the validate suite's sequential ops, any UI-driven pacing) are unaffected. Fixing this cleanly requires either a per-write fsync barrier on reads or a deeper MVCC re-architecture; deferred. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(cache): add Redis testcontainer support + mysql-elasticsearch-redis profile Lets integration tests run against an ephemeral Redis so we can surface any IT that breaks when the cache layer is active. TestSuiteBootstrap: - New cacheProvider system property (default: none). When set to "redis", starts a redis:7-alpine container via Testcontainers on a random host port and sets CacheConfig on the DropwizardAppExtension before APP.before() runs. - Per-run keyspace (om:it:<startMs>) keeps parallel suite runs from colliding if they share a Redis host. - Container is registered in the existing cleanup chain. pom.xml: - New profile `mysql-elasticsearch-redis`. Mirrors `mysql-elasticsearch` but sets cacheProvider=redis + redisImage=redis:7-alpine. Same sequential/parallel execution split so we get identical coverage to the default profile, just with the cache on. Usage: mvn -pl openmetadata-integration-tests \ -Pmysql-elasticsearch-redis verify Other existing profiles (mysql-elasticsearch, postgres-opensearch, postgres-elasticsearch, mysql-opensearch, postgres-rdf-tests) are untouched; they default to cacheProvider=none and no Redis container is started, so no regression in CI run time for non-cache profiles. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): invalidate stale cache entries on rename cascade and direct DAO writes Writes that bypass EntityRepository.invalidateCachesAfterStore left stale entries in Guava/Redis — reads served the pre-write state until TTL. Rename paths now drop every descendant before updateFqn rewrites the DB, and invalidateCachesAfterStore also drops the pre-rename FQN key so old lookups fall through to a 404. Direct dao.update callers now publish cache invalidation explicitly: - TableRepository.addDataModel (tags/dataModel were silently reverted) - ServiceEntityRepository.addTestConnectionResult - PersonaRepository.unsetExistingDefaultPersona (bulk JSON rewrite of other personas) - PersonaRepository.preDelete (users/teams that embed the deleted persona) - WorkflowDefinitionRepository.suspend/resume - EntityRepository.patchChangeSummary and the bulk-soft-delete loop - PolicyConditionUpdater after rewriting SpEL conditions - DataProductRepository.updateName and bulk domain migration (every asset with an embedded data-product reference needs its bundle refreshed) Drops Redis IT-suite cache-coherence failures from 40 to 1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): invalidate cache entries on batched CSV import updates updateManyEntitiesForImport wrote the new JSON straight to Redis but never dropped the per-instance Guava (CACHE_WITH_ID / CACHE_WITH_NAME) or bundle caches, so a GET immediately after CSV import could still see the pre-import tags, owners, and domains until TTL expired. Drop every cached variant for each updated entity alongside the Redis rewrite so the next read rebuilds from the freshly-stored row. Fixes DatabaseSchemaResourceIT.test_importCsv_withApprovedGlossaryTerm_succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): lowercase user FQN in name-based cache loader UserDAO.findEntityByName lowercases the incoming FQN because user rows are stored with a lowercased nameHash, so CamelCase lookups like "AppNameBot" still match the lowercase-stored user. The cache loader called dao.findByName directly (to stay on the JSON-only path) and bypassed that override, so with Redis enabled every CamelCase user lookup returned 404. Mirror the same case-fold in EntityLoaderWithName for user types. Fixes AppsResourceIT.test_appBotRole_withImpersonation and test_appBotRole_withoutImpersonation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): raise PrometheusResourceIT timeouts for loaded CI runs 5s read timeout was flaking under concurrent IT load: the admin port competes for threads with the main app, and collecting full Prometheus snapshots takes >5s when many tests hit the JVM at once. Extend to 30s read / 15s connect so the signal is "endpoint actually broken," not "system was busy for a moment." Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): raise TagResourceIT search-index timeout to 90s test_searchTagByClassificationDisplayName waited 30s for the tag to appear in the tag_search_index. Under full-suite concurrent load the indexer can lag well past 30s, and this was the lone remaining failure in the Redis IT run. Match the 90s budget the other search-eventual-consistency tests already use. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): default entityStatus to Unprocessed in search index doc The generated POJOs don't apply the status.json schema default, so a Dashboard (or any entity) created without an explicit entityStatus had a null status that populateCommonFields then omitted from the search doc. PopulateCommonFieldsTest.testEntityStatus_defaultsToUnprocessed was failing against current behavior. Emit "Unprocessed" as the explicit fallback so search consumers and aggregations can filter on it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): retry BaseEntityIT testBulkFluentAPI verification under load The PATCH is synchronous on the server but parallel IT traffic sometimes stalls the subsequent GET long enough for the test to observe the pre-update description before the fresh row is served. Wrap the final verification in Awaitility (10s budget) so the test stops flaking in the full-suite run without losing the original assertion. Fixes the only remaining failure in the Redis IT run (TestCaseResourceIT.testBulkFluentAPI). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): raise TestCaseResourceIT awaitility timeouts to 90s test_incidentReopensAsNewAfterResolveAndNewFailure and other incident/ resolution-status tests used 30s Awaitility windows that were insufficient under full-suite parallel load. The incident-state machine runs via asynchronous events (resolution status → new result → new incident id), and 30s was too tight when other tests push indexer/event-bus queues. Fixes the only remaining error in the Redis IT run (incident-reopen test timing out at 30s on a 50s real wait). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): raise BaseEntityIT checkCreatedEntity search-index timeout to 180s Under full parallel load the ElasticSearch async indexer queue backs up past the previous 90s budget — the test took 90.7s then timed out on a real indexing race. Extend to 180s to swallow that tail without dropping the assertion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): extend testBulkFluentAPI retry window to 60s The 10s retry still timed out for NotificationTemplateResourceIT under full parallel load. Match the 60s budget other inherited IT retries use. The PATCH itself is sub-second; the budget absorbs pub-sub fan-out and indexer queue tails, not the write itself. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(testCase): retry bulk logical-suite insert on MySQL deadlock addAllTestCasesToLogicalTestSuite runs a full-table SELECT + INSERT IGNORE that acquires gap locks across test_case. Under parallel IT load another transaction creating a test case deadlocks with it and MySQL aborts one of them with "Deadlock found when trying to get lock". The test was genuinely failing, not just a flaky assertion. Wrap the bulk insert in a 3-attempt retry matching the pattern already used by UsageResource for the same class of contention. Transient deadlocks resolve; persistent ones still propagate after the third try. Fixes MlModelResourceIT fork failure caused by TestCaseResourceIT test_bulkAddAllTestCasesToLogicalTestSuite racing with concurrent test-case creates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(it): raise TestCaseResourceIT awaitility timeouts to 180s 90s was still insufficient under full parallel load for the incident reopen flow — the test took 110s waiting for the new incident id to materialize. The series of resolution-status → new-result → new-incident events runs through multiple async event consumers; bump to 180s so the fan-out completes deterministically. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): address PR review — Postgres portability, single-flight, URI reuse - listIdFqnByPrefixHash: dual @ConnectionAwareSqlQuery for MySQL (JSON_UNQUOTE/JSON_EXTRACT) and Postgres (json->>) so the name-hash LIKE scan runs on both backends. - CachedReadBundle: drop Redis SETNX busy-poll + null-DTO waiter spin. Use Guava Striped<Lock> keyed by (type, id) so concurrent readers on one instance collapse to one DB load without Redis round-trips; cross instance races remain coherent because Redis SET is idempotent. EntityRepository.buildReadBundle takes/releases the stripe lock in a try/finally around the cache populate. - RedisURIFactory: single shared builder used by RedisCacheProvider and CacheInvalidationPubSub so both interpret redis url / auth / SSL / database config identically. - RedisCacheProvider.awaitAll: use LettuceFutures.awaitAll so the whole pipeline batch shares one timeout instead of accumulating per-future timeouts. - mvn spotless:apply follow-ups across a few unrelated files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(cache): address PR review — rediss:// SSL, pipeline error handling, stale comments - RedisURIFactory: carry parsed.isSsl() forward when rebuilding the builder from a redis:// / rediss:// URL. Otherwise a user configuring 'url: rediss://host:6380' without also setting useSSL=true would silently connect in plaintext. - RedisCacheProvider.awaitAll: capture the LettuceFutures.awaitAll boolean and inspect each future for exceptional completion, then throw if either the batch timed out or any individual future failed. Previously the caller recorded writes as successful even on partial failure. - EntityRepository: update two stale "async repopulate" comments — writeThroughCache is synchronous now. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(jdbi): extract DeadlockRetry utility with resilience4j backoff Replace TestCaseRepository's inline retry loop with a reusable DeadlockRetry helper keyed to the transaction boundary. Retries live in resilience4j so backoff runs on a scheduled executor instead of Thread.sleep blocking the request thread. Exponential base 50 ms × 2^(attempt-1) with 50% jitter over 4 attempts. DeadlockRetry must wrap a @Transaction-annotated call so each retry replays the whole unit of work in a fresh JDBI transaction — a per-DAO retry would leave earlier writes in the rolled-back txn lost. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(cache): log root cause of first Redis pipeline failure awaitAll counted per-future exceptions but never surfaced what actually broke. On a batch failure operators had a count and a timeout but no way to tell NOSCRIPT / OOM / connection-reset apart. Capture the first underlying cause, log it once, and attach it as the cause of the thrown IllegalStateException. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address Copilot review — counters, lock leak, txn retry, gating - CacheWarmupApp: pass per-page deltas to updateEntityStats so stored totals don't double-count as cumulative counters grow page-over-page. - EntityRepository.buildReadBundle: hold the striped load-lock through the whole fetch/populate path instead of only the final populate step. An exception in fetchTo/From/Tags/Votes/Extensions/prefetch previously leaked the lock and stalled later readers on the same (type, id). - TestCaseRepository.addAllTestCasesToLogicalTestSuite: split public entry point from the @transaction method and wrap DeadlockRetry outside the transaction boundary so each retry runs in a fresh txn. - EntityResource.isDistributedCacheEnabled: also check CacheProvider.available() so a failed or disconnected Redis doesn't leave REST GETs serving stale Guava reads across instances. - DeadlockRetry Javadoc: corrected — resilience4j's executeSupplier is synchronous; the calling thread waits between attempts. Matches the SearchRetryUtil pattern already in use. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(cache): address review — health-check, pipeline failure accounting, deterministic warmup, by-name invalidation - RedisCacheProvider: flip `available=false` from command catches + background PING health check that recovers the flag when Redis comes back. Prevents stale-read divergence in multi-instance deployments after a Redis outage. - CacheWarmupApp: surface pipeline failures — no longer count rows toward success when the Redis batch write threw. Set FAILED status when cache is unavailable at startup so the job record doesn't stay RUNNING. Replace "user" string literal with Entity.USER. - EntityDAO.listAfterWithOffset: add ORDER BY id so warmup pagination is deterministic (was prone to skip/duplicate rows between pages). - RedisURIFactory: normalize bare host/host:port through RedisURI.create so IPv6 hosts and malformed inputs fail cleanly instead of blowing up split(":"). - invalidateCacheForEntity(..., null) left by-name cache entries stale in Persona/DataProduct/Domain. Added invalidateCacheForReferencedEntity(record) helper that extracts fullyQualifiedName from the relationship record JSON; PersonaDAO now has a (id, fqn) variant used before the bulk default-unset so both cache variants evict. * fix(cache): abort warmup when provider flips to unavailable mid-run A prior batch that trips the Redis provider to available=false causes pipelineSet/Hset calls in subsequent iterations to silently return (their `if (!available) return;` guard fires). The try-block then completes without exception, and the success counter still adds pageSuccess — so rows get reported as warmed even though nothing was written to Redis. Check `cacheProvider.available()` at the top of each page iteration and bail out. The background health checker flips availability back when Redis recovers; operators rerun the app to resume warmup from a clean state rather than relying on mid-outage bookkeeping. * fix(cache): address two new Copilot findings — PubSub leak + deadlock chain walk - CacheInvalidationPubSub.start() set `running=true` via CAS, then allocated RedisClient/subConnection/pubConnection. If any step after the first allocation threw, the catch only flipped `running=false` — leaving half- initialized Lettuce client + connections dangling. stop() would then short-circuit on the flag and never clean them up. Extract a closeResources() helper called from both the catch and stop() so the client/connections are released on partial failure. - DeadlockRetry.isDeadlock walked to the deepest cause and only checked that leaf. The Javadoc promises "or any cause in its chain". When the SQLException is wrapped in UnableToExecuteStatementException and the connection-release throws a non-SQLException wrapper, the leaf is no longer the SQLException and real deadlocks silently skip the retry. Walk every link (with a guard against self-referential cycles) and return true if any link matches. * fix(cache): two more Copilot findings — user FQN case-fold + awaitAll future cancel - EntityLoaderWithName lowercased the DB lookup for `user` types but the Guava CACHE_WITH_NAME key was still the caller-provided fqn. `Alice@x.com` and `alice@x.com` produced split cache entries, and invalidations written against the canonical lowercased form left the mixed-case entry serving stale data until TTL. Added a `cacheNameKey(entityType, fqn)` helper that lowercases for user and passes through otherwise, applied at all 10 CACHE_WITH_NAME access sites (get + invalidate). - awaitAll threw on batch timeout but left futures still-in-flight. Over repeated timeouts the Lettuce event loop accumulates pending response slots and dispatcher work. Added `cancel(false)` for any non-done future on the failure path and reported the cancelled count in the thrown ISE. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: mohitdeuex <mohit.y@deuexsolutions.com> Co-authored-by: Shailesh Parmar <shailesh.parmar.webdev@gmail.com> Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com>
1 parent 5e8afb1 commit 12117ce

39 files changed

Lines changed: 2130 additions & 1305 deletions

conf/openmetadata.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,8 @@ cache:
765765
# Connection pool settings
766766
poolSize: ${CACHE_REDIS_POOL_SIZE:-64}
767767
connectTimeoutMs: ${CACHE_REDIS_CONNECT_TIMEOUT:-2000}
768+
# Per-command timeout. Bounds request-thread blocking when Redis is slow.
769+
commandTimeoutMs: ${CACHE_REDIS_COMMAND_TIMEOUT:-300}
768770

769771
# AWS ElastiCache IAM Authentication (only if using ElastiCache)
770772
aws:
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Copyright 2021 Collate
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
5+
# Adds a second OM instance that shares MySQL/Elasticsearch/Redis with the
6+
# primary one in docker-compose.yml. Used to validate that pub/sub
7+
# invalidation keeps per-instance Guava caches coherent.
8+
#
9+
# Usage:
10+
# docker compose -f docker-compose.yml -f docker-compose.redis.yml \
11+
# -f docker-compose.multiserver.yml up -d
12+
services:
13+
openmetadata-server-2:
14+
image: development-openmetadata-server
15+
build:
16+
context: ../../.
17+
dockerfile: docker/development/Dockerfile
18+
container_name: openmetadata_server_2
19+
restart: always
20+
networks:
21+
- local_app_net
22+
depends_on:
23+
mysql:
24+
condition: service_healthy
25+
elasticsearch:
26+
condition: service_healthy
27+
redis:
28+
condition: service_healthy
29+
ports:
30+
- "8587:8585"
31+
- "8588:8586"
32+
environment:
33+
OPENMETADATA_CLUSTER_NAME: openmetadata
34+
SERVER_PORT: 8585
35+
SERVER_ADMIN_PORT: 8586
36+
LOG_LEVEL: INFO
37+
FERNET_KEY: jJ/9sz0g0OHxsfxOoSfdFdmk3ysNmPRnH3TUAbz3IHA=
38+
DB_DRIVER_CLASS: com.mysql.cj.jdbc.Driver
39+
DB_SCHEME: mysql
40+
DB_USE_SSL: "false"
41+
DB_USER: openmetadata_user
42+
DB_USER_PASSWORD: openmetadata_password
43+
DB_HOST: mysql
44+
DB_PORT: 3306
45+
DB_PARAMS: allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC
46+
OM_DATABASE: openmetadata_db
47+
ELASTICSEARCH_HOST: elasticsearch
48+
ELASTICSEARCH_PORT: 9200
49+
ELASTICSEARCH_SCHEME: http
50+
SEARCH_TYPE: elasticsearch
51+
ELASTICSEARCH_CLUSTER_ALIAS: openmetadata
52+
AUTHENTICATION_PROVIDER: basic
53+
AUTHENTICATION_ENABLE_SELF_SIGNUP: "true"
54+
AUTHORIZER_CLASS_NAME: org.openmetadata.service.security.DefaultAuthorizer
55+
AUTHORIZER_REQUEST_FILTER: org.openmetadata.service.security.JwtFilter
56+
AUTHORIZER_ADMIN_PRINCIPALS: "[admin]"
57+
AUTHORIZER_PRINCIPAL_DOMAIN: open-metadata.org
58+
AUTHORIZER_ALLOWED_DOMAINS: "[]"
59+
AUTHORIZER_ALLOWED_REGISTRATION_DOMAIN: '["all"]'
60+
AUTHORIZER_INGESTION_PRINCIPALS: "[ingestion-bot]"
61+
AUTHENTICATION_RESPONSE_TYPE: id_token
62+
AUTHENTICATION_CLIENT_TYPE: public
63+
AUTHENTICATION_PUBLIC_KEYS: "[http://openmetadata-server-2:8585/api/v1/system/config/jwks]"
64+
AUTHENTICATION_AUTHORITY: https://accounts.google.com
65+
AUTHENTICATION_JWT_PRINCIPAL_CLAIMS: "[email,preferred_username,sub]"
66+
RSA_PUBLIC_KEY_FILE_PATH: ./conf/public_key.der
67+
RSA_PRIVATE_KEY_FILE_PATH: ./conf/private_key.der
68+
JWT_ISSUER: open-metadata.org
69+
JWT_KEY_ID: Gb389a-9f76-gdjs-a92j-0242bk94356
70+
PIPELINE_SERVICE_CLIENT_ENDPOINT: http://ingestion:8080
71+
PIPELINE_SERVICE_CLIENT_CLASS_NAME: org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient
72+
AIRFLOW_USERNAME: admin
73+
AIRFLOW_PASSWORD: admin
74+
AIRFLOW_TIMEOUT: 10
75+
SECRET_MANAGER: db
76+
SERVER_HOST_API_URL: http://openmetadata-server-2:8585/api
77+
EVENT_MONITOR: prometheus
78+
OPENMETADATA_HEAP_OPTS: "-Xmx1G -Xms1G"
79+
CACHE_PROVIDER: redis
80+
CACHE_REDIS_URL: redis://redis:6379
81+
CACHE_REDIS_AUTH_TYPE: NONE
82+
CACHE_REDIS_KEYSPACE: om:dev
83+
CACHE_ENTITY_TTL: 3600
84+
CACHE_RELATIONSHIP_TTL: 3600
85+
CACHE_TAG_TTL: 3600
86+
CACHE_REDIS_COMMAND_TIMEOUT: 300
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright 2021 Collate
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
5+
# Override that adds a Redis cache to the development stack.
6+
# Usage:
7+
# docker compose -f docker-compose.yml -f docker-compose.redis.yml up -d
8+
services:
9+
redis:
10+
image: redis:7-alpine
11+
container_name: openmetadata_redis
12+
restart: always
13+
command: ["redis-server", "--appendonly", "no", "--save", "", "--maxmemory", "512mb", "--maxmemory-policy", "allkeys-lru"]
14+
networks:
15+
- local_app_net
16+
ports:
17+
- "6379:6379"
18+
healthcheck:
19+
test: ["CMD", "redis-cli", "ping"]
20+
interval: 10s
21+
timeout: 3s
22+
retries: 5
23+
24+
openmetadata-server:
25+
depends_on:
26+
redis:
27+
condition: service_healthy
28+
environment:
29+
CACHE_PROVIDER: redis
30+
CACHE_REDIS_URL: redis://redis:6379
31+
CACHE_REDIS_AUTH_TYPE: NONE
32+
CACHE_REDIS_KEYSPACE: om:dev
33+
CACHE_ENTITY_TTL: 3600
34+
CACHE_RELATIONSHIP_TTL: 3600
35+
CACHE_TAG_TTL: 3600
36+
CACHE_REDIS_COMMAND_TIMEOUT: 300

openmetadata-integration-tests/pom.xml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,95 @@
551551
</plugins>
552552
</build>
553553
</profile>
554+
<!-- MySQL + Elasticsearch + Redis (Redis cache enabled) -->
555+
<profile>
556+
<id>mysql-elasticsearch-redis</id>
557+
<build>
558+
<plugins>
559+
<plugin>
560+
<groupId>org.apache.maven.plugins</groupId>
561+
<artifactId>maven-failsafe-plugin</artifactId>
562+
<version>${maven.failsafe.version}</version>
563+
<executions>
564+
<execution>
565+
<id>sequential-tests</id>
566+
<goals>
567+
<goal>integration-test</goal>
568+
</goals>
569+
<configuration>
570+
<forkCount>1</forkCount>
571+
<reuseForks>true</reuseForks>
572+
<argLine>-Xmx4096m -XX:+UseG1GC</argLine>
573+
<includes>
574+
<include>**/TagRecognizerFeedbackIT.java</include>
575+
<include>**/WorkflowDefinitionResourceIT.java</include>
576+
<include>**/AppsResourceIT.java</include>
577+
<include>**/SystemResourceIT.java</include>
578+
<include>**/VectorEmbeddingIntegrationIT.java</include>
579+
</includes>
580+
<systemPropertyVariables>
581+
<databaseType>mysql</databaseType>
582+
<databaseImage>mysql:8.3.0</databaseImage>
583+
<searchType>elasticsearch</searchType>
584+
<searchImage>docker.elastic.co/elasticsearch/elasticsearch:9.3.0</searchImage>
585+
<cacheProvider>redis</cacheProvider>
586+
<redisImage>redis:7-alpine</redisImage>
587+
588+
<junit.jupiter.extensions.autodetection.enabled>true</junit.jupiter.extensions.autodetection.enabled>
589+
<junit.jupiter.execution.parallel.enabled>false</junit.jupiter.execution.parallel.enabled>
590+
</systemPropertyVariables>
591+
<trimStackTrace>false</trimStackTrace>
592+
<reportFormat>plain</reportFormat>
593+
<useFile>true</useFile>
594+
</configuration>
595+
</execution>
596+
<execution>
597+
<id>parallel-tests</id>
598+
<goals>
599+
<goal>integration-test</goal>
600+
</goals>
601+
<configuration>
602+
<forkCount>1</forkCount>
603+
<reuseForks>true</reuseForks>
604+
<argLine>-Xmx4096m -XX:+UseG1GC</argLine>
605+
<includes>
606+
<include>**/*IT.java</include>
607+
<include>**/*Test.java</include>
608+
</includes>
609+
<excludes>
610+
<exclude>**/TagRecognizerFeedbackIT.java</exclude>
611+
<exclude>**/WorkflowDefinitionResourceIT.java</exclude>
612+
<exclude>**/AppsResourceIT.java</exclude>
613+
<exclude>**/SystemResourceIT.java</exclude>
614+
<exclude>**/VectorEmbeddingIntegrationIT.java</exclude>
615+
</excludes>
616+
<systemPropertyVariables>
617+
<databaseType>mysql</databaseType>
618+
<databaseImage>mysql:8.3.0</databaseImage>
619+
<searchType>elasticsearch</searchType>
620+
<searchImage>docker.elastic.co/elasticsearch/elasticsearch:9.3.0</searchImage>
621+
<cacheProvider>redis</cacheProvider>
622+
<redisImage>redis:7-alpine</redisImage>
623+
624+
<junit.jupiter.extensions.autodetection.enabled>true</junit.jupiter.extensions.autodetection.enabled>
625+
<junit.jupiter.execution.parallel.enabled>true</junit.jupiter.execution.parallel.enabled>
626+
</systemPropertyVariables>
627+
<trimStackTrace>false</trimStackTrace>
628+
<reportFormat>plain</reportFormat>
629+
<useFile>true</useFile>
630+
</configuration>
631+
</execution>
632+
<execution>
633+
<id>verify</id>
634+
<goals>
635+
<goal>verify</goal>
636+
</goals>
637+
</execution>
638+
</executions>
639+
</plugin>
640+
</plugins>
641+
</build>
642+
</profile>
554643
<!-- RDF integration tests profile -->
555644
<profile>
556645
<id>postgres-rdf-tests</id>

openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ public class TestSuiteBootstrap implements LauncherSessionListener {
129129
private static String databaseType;
130130
private static String searchType;
131131
private static boolean rdfEnabled;
132+
private static String cacheProvider;
132133

133134
private static JdbcDatabaseContainer<?> DATABASE_CONTAINER;
134135
private static GenericContainer<?> SEARCH_CONTAINER;
135136
private static GenericContainer<?> FUSEKI_CONTAINER;
137+
private static GenericContainer<?> REDIS_CONTAINER;
136138
private static K3sContainer K3S_CONTAINER;
137139
private static DropwizardAppExtension<OpenMetadataApplicationConfig> APP;
138140
private static Jdbi jdbi;
@@ -141,6 +143,10 @@ public class TestSuiteBootstrap implements LauncherSessionListener {
141143
private static int searchPort;
142144
private static String fusekiEndpoint;
143145
private static String kubeConfigYaml;
146+
private static String redisUrl;
147+
148+
private static final String DEFAULT_REDIS_IMAGE = "redis:7-alpine";
149+
private static final int REDIS_PORT = 6379;
144150

145151
@Override
146152
public void launcherSessionOpened(LauncherSession session) {
@@ -153,11 +159,13 @@ public void launcherSessionOpened(LauncherSession session) {
153159
databaseType = System.getProperty("databaseType", "postgres");
154160
searchType = System.getProperty("searchType", "elasticsearch");
155161
rdfEnabled = Boolean.parseBoolean(System.getProperty("enableRdf", "false"));
162+
cacheProvider = System.getProperty("cacheProvider", "none");
156163

157164
LOG.info("=== TestSuiteBootstrap: Starting test infrastructure ===");
158165
LOG.info("Database type: {}", databaseType);
159166
LOG.info("Search type: {}", searchType);
160167
LOG.info("RDF enabled: {}", rdfEnabled);
168+
LOG.info("Cache provider: {}", cacheProvider);
161169
boolean k8sEnabled = isK8sTestsRequested();
162170
LOG.info("K8s tests enabled: {}", k8sEnabled);
163171
long startTime = System.currentTimeMillis();
@@ -168,6 +176,9 @@ public void launcherSessionOpened(LauncherSession session) {
168176
if (rdfEnabled) {
169177
startFuseki();
170178
}
179+
if (isRedisEnabled()) {
180+
startRedis();
181+
}
171182
if (k8sEnabled) {
172183
startK3s();
173184
}
@@ -180,6 +191,9 @@ public void launcherSessionOpened(LauncherSession session) {
180191
if (rdfEnabled) {
181192
LOG.info("Fuseki SPARQL: {}", fusekiEndpoint);
182193
}
194+
if (isRedisEnabled()) {
195+
LOG.info("Redis: {}", redisUrl);
196+
}
183197
if (k8sEnabled) {
184198
LOG.info("K3s Kubernetes: enabled");
185199
}
@@ -352,6 +366,58 @@ private void startSearch() {
352366
}
353367
}
354368

369+
private void startRedis() {
370+
String image = System.getProperty("redisImage", DEFAULT_REDIS_IMAGE);
371+
LOG.info("Starting Redis container with image: {}", image);
372+
REDIS_CONTAINER =
373+
new GenericContainer<>(DockerImageName.parse(image))
374+
.withExposedPorts(REDIS_PORT)
375+
.withCommand(
376+
"redis-server",
377+
"--appendonly",
378+
"no",
379+
"--save",
380+
"",
381+
"--maxmemory",
382+
"512mb",
383+
"--maxmemory-policy",
384+
"allkeys-lru")
385+
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(1)));
386+
REDIS_CONTAINER.start();
387+
redisUrl =
388+
String.format(
389+
"redis://%s:%d", REDIS_CONTAINER.getHost(), REDIS_CONTAINER.getMappedPort(REDIS_PORT));
390+
LOG.info("Redis started: {}", redisUrl);
391+
}
392+
393+
public static boolean isRedisEnabled() {
394+
return "redis".equalsIgnoreCase(cacheProvider);
395+
}
396+
397+
public static String getRedisUrl() {
398+
return redisUrl;
399+
}
400+
401+
private void configureCache(OpenMetadataApplicationConfig config) {
402+
if (!isRedisEnabled()) {
403+
return;
404+
}
405+
org.openmetadata.service.cache.CacheConfig cacheConfig = config.getCacheConfig();
406+
cacheConfig.provider = org.openmetadata.service.cache.CacheConfig.Provider.redis;
407+
cacheConfig.redis.url = redisUrl;
408+
cacheConfig.redis.authType = org.openmetadata.service.cache.CacheConfig.AuthType.NONE;
409+
cacheConfig.redis.keyspace = "om:it:" + System.currentTimeMillis();
410+
cacheConfig.redis.commandTimeoutMs = 1000;
411+
cacheConfig.entityTtlSeconds = 3600;
412+
cacheConfig.relationshipTtlSeconds = 3600;
413+
cacheConfig.tagTtlSeconds = 3600;
414+
config.setCacheConfig(cacheConfig);
415+
LOG.info(
416+
"Configured Redis cache: url={} keyspace={}",
417+
cacheConfig.redis.url,
418+
cacheConfig.redis.keyspace);
419+
}
420+
355421
private void startFuseki() {
356422
String image = System.getProperty("rdfContainerImage", DEFAULT_FUSEKI_IMAGE);
357423
LOG.info("Starting Fuseki SPARQL container...");
@@ -463,6 +529,7 @@ private void startApplication() throws Exception {
463529

464530
configurePipelineServiceClient(config);
465531
configureRdf(config);
532+
configureCache(config);
466533

467534
IndexMappingLoader.init(getBaseSearchConfig());
468535

@@ -730,6 +797,14 @@ private void cleanup() {
730797
LOG.warn("Error stopping Fuseki container", e);
731798
}
732799

800+
try {
801+
if (REDIS_CONTAINER != null) {
802+
REDIS_CONTAINER.stop();
803+
}
804+
} catch (Exception e) {
805+
LOG.warn("Error stopping Redis container", e);
806+
}
807+
733808
try {
734809
if (K3S_CONTAINER != null) {
735810
K3S_CONTAINER.stop();

0 commit comments

Comments
 (0)