Skip to content

fix(log-storage): plug clobber bugs in streamable S3 logs (partial.txt + logs.txt)#27926

Open
pmbrull wants to merge 27 commits intomainfrom
pmbrull/closestream-noop
Open

fix(log-storage): plug clobber bugs in streamable S3 logs (partial.txt + logs.txt)#27926
pmbrull wants to merge 27 commits intomainfrom
pmbrull/closestream-noop

Conversation

@pmbrull
Copy link
Copy Markdown
Collaborator

@pmbrull pmbrull commented May 6, 2026

Summary

Fixes three log-loss bugs in S3LogStorage that surfaced when ingestion runs hit idle gaps longer than the 5-minute stream timeout:

  • partial.txt 80MB→KB clobber. cleanupExpiredStreams reset partialLogOffsets[streamKey] after the idle threshold; the next writePartialLogsForStream then took the currentOffset == 0 path and PUT only the in-memory tail — overwriting the prior 80MB body. Replaced with an always-merge flush: GET existing partial.txt, append a snapshot of the new pendingFlush queue, PUT (with offset state in S3 user metadata).
  • logs.txt clobber. The 5-minute sweeper called completeMultipartUpload, materializing logs.txt. Resumed appends opened a fresh multipart upload to the same key; the next finalize overwrote logs.txt with only the latest segment. Removed the multipart upload from the write path entirely; logs.txt is now produced only on /close (or by the abandoned-run sweeper) via a server-side CopyObjectRequest from partial.txt.
  • SimpleLogBuffer overflow drift. partialLogOffsets indexed into a 1000-line ring; eviction caused contiguous-line gaps to be silently dropped from partial.txt. Replaced with pendingFlush (unbounded, watermark-drained) and a monotonic totalLinesAppended counter. SimpleLogBuffer is now SSE-tail-cache only.

The cleanup sweeper threshold moves from 5 min to 24h (configurable as streamTimeoutHours), interval from 1 min to 1h (configurable as cleanupIntervalMinutes). New configurable knobs: partialFlushIntervalMinutes, earlyFlushWatermarkBytes, pendingFlushAlertAfterFailures. streamTimeoutMinutes is deprecated.

Backward-compatible at the artifact level: same on-disk file names (partial.txt, logs.txt), same API endpoints, no changes to the Python ingestion client.

Design spec: docs/superpowers/specs/2026-05-05-streamable-logs-stability-design.md
Feature doc: docs/streamable-logs.md

Test Plan

  • mvn test -pl openmetadata-service -Dtest='S3LogStorageTest,LogStorageFactoryTest,LogStorageTest' → 53 tests, BUILD SUCCESS (verified locally)
  • mvn test-compile -pl openmetadata-integration-tests → BUILD SUCCESS (verified locally)
  • In a deployment with S3 log storage configured, run a long ingestion with idle gaps > 5 min and verify partial.txt versions on S3 grow monotonically (no shrink to KB)
  • Verify /close produces logs.txt byte-equal to the final partial.txt
  • Verify a second /close for the same (fqn, runId) returns gracefully (idempotent)
  • Verify abandoned runs (no /close, no appends for 24h) get logs.txt materialized by the sweeper
  • Verify SSE/live-tail UI experience is unchanged for active runs

🤖 Generated with Claude Code


Summary by Gitar

  • Test improvements:
    • Improved IngestionPipelineLogStreamingResourceIT to tolerate DefaultLogStorage environments where persistence is unavailable.
    • Enhanced log verification logic to strictly ensure that consecutive bursts are both present (no clobbering) in tests.
    • Made postClose call idempotent and tolerant of network/storage exceptions to improve CI stability.

This will update automatically on new commits.

pmbrull and others added 23 commits May 5, 2026 15:57
closeStream used to throw IllegalStateException("Log storage is not
configured") which the resource layer translates to a 500 response.
That made the contract surprising for callers: any defensive cleanup
path (exit handlers, retry logic, generic teardown) had to know in
advance whether streaming was configured before calling close, or eat
spurious server errors.

Closing a stream is naturally idempotent — same shape as DELETE on a
non-existent resource. When log storage is not configured, return
silently with a debug log so callers can call close() defensively
without checking state first.

Adds a unit test covering the no-op path.
Captures the design discussion for fixing partial.txt and logs.txt
clobber bugs in S3LogStorage when ingestion runs hit idle gaps longer
than the 5-minute stream timeout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
End-to-end documentation of the streamable logs feature: architecture,
storage layout, run lifecycle, read paths, abandoned-run recovery,
configuration, concurrency model, and observability. Reflects the
post-fix design captured in the streamable-logs-stability spec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Step-by-step TDD plan grouped into 8 PR-sized tasks: config schema
additions, per-stream lock, pendingFlush + merge-always flush, multipart
removal, sweeper rewrite, /close rewrite, read-path correction, and
integration tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds streamTimeoutHours, cleanupIntervalMinutes, partialFlushIntervalMinutes,
earlyFlushWatermarkBytes, pendingFlushAlertAfterFailures. Deprecates
streamTimeoutMinutes in favor of streamTimeoutHours. Pure schema-only
change; no Java code consumes these fields yet.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Addresses code review on Task 1: project convention uses the JSON Schema
deprecated keyword alongside description annotation. Also clarifies that
earlyFlushWatermarkBytes default (5242880) equals 5 MB.
…rage

Reads streamTimeoutHours, cleanupIntervalMinutes, partialFlushIntervalMinutes,
earlyFlushWatermarkBytes, pendingFlushAlertAfterFailures from
LogStorageConfiguration with sane defaults. No behavioral change yet —
values are stored but not consumed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…drop FQN

Addresses code review on Task 2: warning now fires whenever
streamTimeoutMinutes is set (not only for values < 30 min), since the
field is deprecated for all deployments. Also imports java.lang.reflect.Field
in the test helper instead of using a fully-qualified name (CLAUDE.md
no-FQN rule).
Introduces streamLocks map and acquire/release helpers. appendLogs,
writePartialLogsForStream, closeStream, and cleanupExpiredStreams all
serialize on the per-stream lock. No behavior change; locking is
pure mutual-exclusion at this point.
Move iterator.remove() inside the per-stream lock to prevent a window
where a concurrent appendLogs sees the still-present closed StreamContext
and writes to a closed stream. Also clarifies the comment on flush(fqn,runId)
ordering and documents that streamLocks accumulates monotonically until
Tasks 7 and 8 add cleanup.
…unter

Each appendLogs now also populates per-stream pendingFlush (lines awaiting
flush) and totalLinesAppended (monotonic logical line counter). State is
written but not yet consumed; the new flush logic in the next commit reads it.
…add test

Addresses review on Task 4: documents that pendingFlush ArrayList values
may only be accessed under the per-stream lock; clarifies that
consecutiveFlushFailures is written and consumed in Task 5 (not just
consumed); aligns its type with AtomicInteger for consistency with
the other counters; adds a test for the trailing-newline trim path.
…S3 metadata

Replaces the old writePartialLogsForStream that skipped the read-merge step
when partialLogOffsets[streamKey] was 0 (the canonical 80MB->KB clobber bug).
The new flush always reads existing partial.txt, appends a snapshot of
pendingFlush, and PUTs with offset state in S3 user-defined metadata.

Also adds an early-flush watermark trigger so high-burst writes don't
pile up unbounded in pendingFlush.

Closes the partial.txt-clobber half of the streamable-logs-stability spec.
… language

Addresses code review on Task 5: production code comments should describe
invariants, not the planning-doc task that filled the gap. Also clarifies
the parse-before-lock and the byte-counter atomicity assumption.
…tream as server-side copy

appendLogs no longer initiates a multipart upload; bytes flow only through
pendingFlush -> partial.txt PUTs.

closeStream now: (1) drains pendingFlush via final partial.txt PUT,
(2) issues CopyObjectRequest from partial.txt to logs.txt server-side,
(3) deletes partial.txt and the .active marker, (4) drops in-memory state.

Idempotent: a second /close sees no partial.txt (NoSuchKeyException) and
returns gracefully.

Closes the logs.txt-clobber half of the streamable-logs-stability spec
and finalizes the canonical /close flow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cover counter from metadata

Addresses code review on combined Tasks 6+8:
- dropStreamState now removes activeListeners entries (SSE listener leak fix).
- cleanupExpiredStreams now removes streamLocks entries on expire (lock leak fix).
- copyPartialToLogs applies SSE configuration to CopyObjectRequest (was unencrypted on copy).
- writePartialLogsForStreamLocked reads last-flushed-line metadata from existing
  partial.txt and uses it to keep totalLinesAppended monotonic across restarts.
- consecutiveFlushFailures reset uses computeIfAbsent + set(0) instead of allocating
  a new AtomicInteger every successful flush.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…h/1h)

Bumps the idle threshold from 5 min to streamTimeoutHours (default 24h)
and the poll interval from 1 min to cleanupIntervalMinutes (default 1h).
On expire, finalizes the abandoned run by copying partial.txt -> logs.txt
server-side, deleting partial.txt, and dropping in-memory state — same
end-state as closeStream.

Also wires partialFlushIntervalMinutes into the periodic flush schedule
and removes the legacy streamTimeoutMs field that no longer drives behavior.
Addresses code review on Task 7: streamLocks.remove was unconditionally
in the finally block of finalizeAbandonedStream, so it ran even when the
sweeper returned early to retry next tick on a copy failure. That meant
the next sweep tick would create a fresh ReentrantLock, and any
concurrent appendLogs in the meantime would contend on a different lock
object than the retry, defeating mutual exclusion.

Now we only remove the lock entry once finalization has succeeded
(after dropStreamState). The retry path leaves the lock in place so
the next tick and any concurrent appendLogs see the same lock identity.
getCombinedLogsForActiveStream now appends the in-memory pendingFlush
snapshot to the partial.txt body when reading mid-run, so the UI's
paginated GET surfaces the most recent tail even before the next
scheduled flush has happened.

Only appends pendingFlush when a partial.txt file exists, avoiding
duplication in the fallback path where recentLogsCache already
includes those lines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Addresses review on Task 9: the unsafe null-lock fallback in the
pendingFlush append path is removed (it was structurally unreachable
but a latent hazard for future lifecycle changes). The pendingFlush
read now happens entirely under the per-stream lock, with a
conservative skip if no lock entry exists.

Also documents the recentLogsCache-vs-pendingFlush invariant in the
fallback path and adds a total-count assertion to the new test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- testIdleGapDoesNotClobberPartial: two log bursts within an open run;
  asserts both are present in the read response.
- testCloseProducesLogsTxtMatchingPartial: write, close, read; asserts
  content survives the close.
- testCloseIsIdempotent: a second /close is a graceful no-op.

Tests are tolerant of the storage backend in the test environment
(DefaultLogStorage in CI may not persist; S3LogStorage in S3-configured
environments). Deep behavioral coverage is in S3LogStorageTest unit tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- closeStream and finalizeAbandonedStream now propagate PUT failures
  from writePartialLogsForStreamLocked (which returns boolean).
  closeStream throws IOException; the sweeper retains state for retry.
  Fixes silent data loss when the final flush PUT fails.
- streamLocks entries are no longer removed; this prevents an
  acquire-vs-remove race that would break mutual exclusion. Memory
  growth is bounded by maxConcurrentStreams in practice.
- cleanupAbandonedStreams re-checks expiration inside the per-stream
  lock so a stream that was bumped by appendLogs between the scan
  and the lock acquisition is not finalized.
- deleteLogs now acquires the per-stream lock before mutating state.
- getCombinedLogsForActiveStream appends pendingFlush in BOTH the
  S3-found and memory-fallback branches, so reads aren't truncated
  when recentLogsCache evicts oldest at its 1000-line cap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…no duplicates)

The previous Issue 5 fix appended pendingFlush unconditionally, which
caused duplicate lines in the read response when the fallback branch
used recentLogsCache (since both are populated by the same appendLogs).

Now: in the foundPartialFile branch, append pendingFlush AFTER the S3
body (non-overlapping by construction). In the fallback branch
(no partial.txt yet), use pendingFlush directly as the canonical
source — this is more complete than recentLogsCache (1000-line cap)
and avoids the duplicate issue. recentLogsCache remains a defensive
fallback for the rare case where pendingFlush is empty in the fallback
path.
Copilot AI review requested due to automatic review settings May 6, 2026 04:02
@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels May 6, 2026
Comment on lines +1037 to +1038
String newContent = String.join("\n", snapshot) + "\n";
String mergedBody = existingBody + newContent;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Merge with legacy partial.txt may corrupt lines (no guard)

In writePartialLogsForStreamLocked, the merge at line 1038 does mergedBody = existingBody + newContent. If existingBody was written by the old code (or by an interrupted PUT) and does not end with ``, the last existing line and the first new line will be concatenated into a single corrupted line.

This can occur during the upgrade window: the old writePartialLogsForStream did not guarantee a trailing newline on partial.txt. After upgrading, the first flush for an in-progress run will read the old file and append directly, corrupting one log line.

The fix is a one-line guard before the merge.

Suggested fix:

String newContent = String.join("
", snapshot) + "
";
String separator = (!existingBody.isEmpty() && !existingBody.endsWith("
")) ? "
" : "";
String mergedBody = existingBody + separator + newContent;

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

PutObjectRequest putRequest = putRequestBuilder.build();
Map<String, String> metadata = new HashMap<>();
metadata.put("last-flushed-line", Long.toString(lastFlushedLine));
metadata.put("total-bytes", Integer.toString(mergedBody.length()));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Bug: Metadata 'total-bytes' stores char count, not byte count

metadata.put("total-bytes", Integer.toString(mergedBody.length())) records the number of Java char units, not the number of UTF-8 bytes. For any non-ASCII log content (e.g., internationalized metadata values, Unicode table names), this will diverge from the actual S3 object size, making the 'drift detection' cross-check unreliable.

Use mergedBody.getBytes(StandardCharsets.UTF_8).length for correctness, or rename the metadata key to total-chars to avoid confusion. Since the merged body is already materialized as a String, converting to bytes just for the length adds a full copy; an alternative is to compute it from the RequestBody after construction.

Suggested fix:

metadata.put("total-bytes", Long.toString(mergedBody.getBytes(StandardCharsets.UTF_8).length));

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

✅ TypeScript Types Auto-Updated

The generated TypeScript types have been automatically updated based on JSON schema changes in this PR.

@github-actions github-actions Bot requested a review from a team as a code owner May 6, 2026 04:08
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes multiple log-loss / clobber scenarios in S3-backed streamable ingestion logs by moving durability to partial.txt (with metadata offsets), producing logs.txt only on finalization via server-side copy, and adding new configuration knobs and documentation to support the updated lifecycle.

Changes:

  • Add new log-storage configuration fields (timeouts/intervals/watermarks) and deprecate streamTimeoutMinutes.
  • Rewrite S3LogStorage to remove multipart upload writes, add pendingFlush + monotonic line counters, and finalize via CopyObject(partial.txt -> logs.txt).
  • Expand unit + integration tests and add design/spec documentation for the new behavior.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
openmetadata-spec/src/main/resources/json/schema/configuration/logStorageConfiguration.json Adds new configuration properties and deprecates streamTimeoutMinutes.
openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java Core behavior change: pending flush queue, metadata offsets, abandoned-stream cleanup, server-side finalize copy, no multipart on append.
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java Makes closeStream a no-op when log storage is not configured.
openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java Adds extensive unit coverage for merge behavior, restart recovery, close idempotency, and abandoned cleanup.
openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/IngestionPipelineRepositoryTest.java Adds a unit test asserting closeStream is a no-op when storage is disabled.
openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/IngestionPipelineLogStreamingResourceIT.java Adds IT coverage for idle-gap safety + /close behavior.
docs/superpowers/specs/2026-05-05-streamable-logs-stability-design.md Design specification for the new streamable-logs durability model.
docs/superpowers/plans/2026-05-05-streamable-logs-stability.md Detailed implementation plan for the changes in this PR.
docs/streamable-logs.md Feature documentation describing end-to-end streamable logs behavior post-fix.
Comments suppressed due to low confidence (1)

openmetadata-spec/src/main/resources/json/schema/configuration/logStorageConfiguration.json:107

  • asyncBufferSizeMB remains in the schema, but S3LogStorage no longer reads or uses it (there are no code references). This makes the config misleading for operators; consider deprecating it in the schema (similar to streamTimeoutMinutes) or reintroducing a clear usage for it.
    "asyncBufferSizeMB": {
      "description": "Size of async buffer in MB for batching log writes",
      "type": "integer",
      "minimum": 1,
      "default": 5
    }

Comment on lines +188 to +211
this.streamTimeoutHours =
s3Config.getStreamTimeoutHours() != null ? s3Config.getStreamTimeoutHours() : 24;
this.cleanupIntervalMinutes =
s3Config.getCleanupIntervalMinutes() != null ? s3Config.getCleanupIntervalMinutes() : 60;
this.partialFlushIntervalMinutes =
s3Config.getPartialFlushIntervalMinutes() != null
? s3Config.getPartialFlushIntervalMinutes()
: 2;
this.earlyFlushWatermarkBytes =
s3Config.getEarlyFlushWatermarkBytes() != null
? s3Config.getEarlyFlushWatermarkBytes().longValue()
: 5L * 1024 * 1024;
this.pendingFlushAlertAfterFailures =
s3Config.getPendingFlushAlertAfterFailures() != null
? s3Config.getPendingFlushAlertAfterFailures()
: 10;

if (s3Config.getStreamTimeoutMinutes() != null) {
LOG.warn(
"streamTimeoutMinutes={} is deprecated; migrate to streamTimeoutHours "
+ "(current effective value: {}h). Values below 30 min may cause stream churn.",
s3Config.getStreamTimeoutMinutes(),
streamTimeoutHours);
}
Comment on lines +343 to +371
// Track the run as live (no multipart upload here — bytes flow through pendingFlush ->
// partial.txt).
StreamContext ctx =
activeStreams.computeIfAbsent(
streamKey,
k -> {
try {
if (activeStreams.size() >= maxConcurrentStreams) {
cleanupExpiredStreams();
if (activeStreams.size() >= maxConcurrentStreams) {
throw new IOException(
"Maximum concurrent log streams reached: " + maxConcurrentStreams);
}
}

String key = buildS3Key(pipelineFQN, runId);
MultipartS3OutputStream stream =
new MultipartS3OutputStream(
s3AsyncClient,
bucketName,
key,
enableSSE,
storageClass,
isCustomEndpoint,
sseAlgorithm,
kmsKeyId,
metrics);
LOG.info("Created multipart upload stream for {}/{}", pipelineFQN, runId);
return new StreamContext(stream, System.currentTimeMillis(), metrics);
} catch (IOException e) {
throw new RuntimeException("Failed to create multipart upload stream", e);
}
});
streamKey, k -> new StreamContext(System.currentTimeMillis(), metrics));
ctx.updateAccessTime();

// Track lines for the durable-pending flush queue and the logical line counter.
String[] splitLines = logContent.split("\n", -1);
int lineCount = splitLines.length;
if (lineCount > 0 && splitLines[lineCount - 1].isEmpty()) {
lineCount--;
}
if (lineCount > 0) {
List<String> queue = pendingFlush.computeIfAbsent(streamKey, k -> new ArrayList<>());
AtomicLong bytes = pendingFlushBytes.computeIfAbsent(streamKey, k -> new AtomicLong());
AtomicLong counter = totalLinesAppended.computeIfAbsent(streamKey, k -> new AtomicLong());
long addedBytes = 0;
for (int i = 0; i < lineCount; i++) {
queue.add(splitLines[i]);
addedBytes += splitLines[i].length() + 1L; // +1 for the join newline at flush time
}
bytes.addAndGet(addedBytes);
counter.addAndGet(lineCount);
if (bytes.get() >= earlyFlushWatermarkBytes) {
final String key = streamKey;
cleanupExecutor.execute(() -> writePartialLogsForStream(key));
}
}
Comment on lines +1212 to +1214
activeListeners.remove(streamKey);
}

Comment on lines +1045 to +1049
Map<String, String> metadata = new HashMap<>();
metadata.put("last-flushed-line", Long.toString(lastFlushedLine));
metadata.put("total-bytes", Integer.toString(mergedBody.length()));
metadata.put("writer-epoch", Long.toString(writerEpoch));
metadata.put("writer-version", "streamable-logs-v2");
Comment on lines +361 to +363
for (int i = 0; i < lineCount; i++) {
queue.add(splitLines[i]);
addedBytes += splitLines[i].length() + 1L; // +1 for the join newline at flush time
Comment thread docs/streamable-logs.md

## Concurrency Model

Coordination is a per-stream `ReentrantLock` keyed by `streamKey = fqn + "/" + runId`. The lock is held for the duration of `appendLogs`, periodic flush, abandoned-run cleanup, and `/close`. Locks live in a `ConcurrentHashMap<String, ReentrantLock>` and are removed when the stream is dropped.
Comment on lines +326 to +327
statusCode == 200 || statusCode == 404 || statusCode == 501 || statusCode == 500,
"Close must not 5xx unexpectedly, got: " + statusCode);
pmbrull and others added 2 commits May 6, 2026 07:58
…e rule

The multipart upload write path was removed; the bucket lifecycle's
abortIncompleteMultipartUpload(7 days) rule served only as migration
cleanup for in-flight uploads from the old code at deploy time. After
the migration window it does nothing.

Drops the rule from configureLifecyclePolicy, the AWS SDK import, the
"7 days multipart cleanup" string in the startup log, and the
corresponding bullet in docs/streamable-logs.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Local-only working notes (specs, plans) live there and shouldn't be tracked.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 6, 2026 05:58
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 11 changed files in this pull request and generated 7 comments.

Comment on lines +187 to +190
this.streamTimeoutHours =
s3Config.getStreamTimeoutHours() != null ? s3Config.getStreamTimeoutHours() : 24;
this.cleanupIntervalMinutes =
s3Config.getCleanupIntervalMinutes() != null ? s3Config.getCleanupIntervalMinutes() : 60;
long addedBytes = 0;
for (int i = 0; i < lineCount; i++) {
queue.add(splitLines[i]);
addedBytes += splitLines[i].length() + 1L; // +1 for the join newline at flush time
Comment on lines +1038 to +1042
Map<String, String> metadata = new HashMap<>();
metadata.put("last-flushed-line", Long.toString(lastFlushedLine));
metadata.put("total-bytes", Integer.toString(mergedBody.length()));
metadata.put("writer-epoch", Long.toString(writerEpoch));
metadata.put("writer-version", "streamable-logs-v2");
Comment on lines +121 to +125
// Per-stream coordination lock keyed by `<fqn>/<runId>`. Entries are populated
// lazily and never removed; this is intentional to avoid an
// acquire-vs-remove race that would break mutual exclusion. The map is
// bounded by maxConcurrentStreams in practice (100 by default).
private final Map<String, ReentrantLock> streamLocks = new ConcurrentHashMap<>();
Comment on lines +691 to +698
// NOTE: The per-stream lock is not acquired here because we iterate across all streams for the
// pipeline. This method may race with active writers; out of scope for this fix.
activeStreams
.entrySet()
.removeIf(
entry -> {
if (entry.getKey().startsWith(pipelineFQN + "/")) {
try {
entry.getValue().close();
// Clean up partial log offset tracking
partialLogOffsets.remove(entry.getKey());
} catch (Exception e) {
LOG.warn("Error closing stream during deleteAll: {}", e.getMessage());
}
dropStreamState(entry.getKey());
Comment thread docs/streamable-logs.md
Comment on lines +3 to +6
This document describes the end-to-end design of OpenMetadata's streamable ingestion-pipeline log system: how logs flow from a running connector to durable S3 storage, how the UI reads them while a run is in progress, and how the system handles long idle gaps, restarts, and abandoned runs.

The design described here reflects the system after the stability fix specified in [`docs/superpowers/specs/2026-05-05-streamable-logs-stability-design.md`](superpowers/specs/2026-05-05-streamable-logs-stability-design.md).

Comment thread docs/streamable-logs.md
Comment on lines +220 to +229
| `streamTimeoutMinutes` | `streamTimeoutHours`. If set, used as-is; if `< 30`, a deprecation warning is logged. |

## Concurrency Model

Coordination is a per-stream `ReentrantLock` keyed by `streamKey = fqn + "/" + runId`. The lock is held for the duration of `appendLogs`, periodic flush, abandoned-run cleanup, and `/close`. Locks live in a `ConcurrentHashMap<String, ReentrantLock>` and are removed when the stream is dropped.

A single-threaded `ScheduledExecutorService` (`cleanupExecutor`) drives:
- Periodic flushes (`writePartialLogs`)
- Abandoned-run sweeper (`cleanupAbandonedStreams`)
- Metrics updates (`updateStreamMetrics`)
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

Jest test Coverage

UI tests summary

Lines Statements Branches Functions
Coverage: 62%
62.42% (62939/100817) 42.75% (33918/79324) 45.75% (10035/21934)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

🔴 Playwright Results — 2 failure(s), 32 flaky

✅ 3964 passed · ❌ 2 failed · 🟡 32 flaky · ⏭️ 86 skipped

Shard Passed Failed Flaky Skipped
🟡 Shard 1 294 0 5 4
🟡 Shard 2 748 0 6 8
🔴 Shard 3 739 1 6 7
🟡 Shard 4 774 0 1 18
🟡 Shard 5 683 0 4 41
🔴 Shard 6 726 1 10 8

Genuine Failures (failed on all attempts)

Features/Permissions/ServiceEntityPermissions.spec.ts › AutoPilot trigger button is hidden with view-only permission (shard 3)
�[31m"beforeAll" hook timeout of 60000ms exceeded.�[39m
Pages/InputOutputPorts.spec.ts › Output ports list displays entity cards (shard 6)
�[31mTest timeout of 60000ms exceeded.�[39m
🟡 32 flaky test(s) (passed on retry)
  • Features/Pagination.spec.ts › should test pagination on Metrics page (shard 1, 2 retries)
  • Features/TagsSuggestion.spec.ts › should add and accept requested tags for a table asset (shard 1, 2 retries)
  • Features/TagsSuggestion.spec.ts › should decline suggested tags for a container column (shard 1, 2 retries)
  • Flow/Metric.spec.ts › Verify Metric Type Update (shard 1, 1 retry)
  • Flow/Metric.spec.ts › Verify Unit of Measurement Update (shard 1, 2 retries)
  • Features/ActivityAPI.spec.ts › Activity event is created when description is updated (shard 2, 1 retry)
  • Features/ActivityAPI.spec.ts › Activity event is created when tags are added (shard 2, 1 retry)
  • Features/ChangeSummaryBadge.spec.ts › AI badge should appear on entity description with Suggested source (shard 2, 2 retries)
  • Features/ChangeSummaryBadge.spec.ts › AI badge should NOT appear for manually-edited descriptions (shard 2, 1 retry)
  • Features/ColumnBulkOperations.spec.ts › should update pending changes counter when editing selected columns (shard 2, 1 retry)
  • Features/DataQuality/BundleSuiteBulkOperations.spec.ts › Add test case to existing Bundle Suite (shard 2, 1 retry)
  • Features/Permissions/ServiceEntityPermissions.spec.ts › Database Service allow entity-specific permission operations (shard 3, 2 retries)
  • Features/Permissions/ServiceEntityPermissions.spec.ts › AutoPilot trigger button is visible with Trigger permission (shard 3, 2 retries)
  • Features/Permissions/ServiceEntityPermissions.spec.ts › Storage Service allow common operations permissions (shard 3, 2 retries)
  • Features/Permissions/ServiceEntityPermissions.spec.ts › AutoPilot trigger button is visible with Trigger permission (shard 3, 2 retries)
  • Features/Table.spec.ts › should persist current page (shard 3, 1 retry)
  • Features/UserProfileOnlineStatus.spec.ts › Should not show online status for inactive users (shard 3, 1 retry)
  • Pages/CustomProperties.spec.ts › Should search custom properties for dashboard in right panel (shard 4, 1 retry)
  • Pages/Entity.spec.ts › Inactive Announcement create & delete (shard 5, 2 retries)
  • Pages/Entity.spec.ts › User should be denied access to edit description when deny policy rule is applied on an entity (shard 5, 1 retry)
  • Pages/EntityDataConsumer.spec.ts › User as Owner Add, Update and Remove (shard 5, 2 retries)
  • Pages/EntityDataConsumer.spec.ts › No edit owner permission (shard 5, 2 retries)
  • Pages/InputOutputPorts.spec.ts › Add single output port (shard 6, 2 retries)
  • Pages/InputOutputPorts.spec.ts › Add multiple input ports at once (shard 6, 2 retries)
  • Pages/InputOutputPorts.spec.ts › Output port drawer only shows data product assets (shard 6, 1 retry)
  • Pages/InputOutputPorts.spec.ts › Remove single output port (shard 6, 2 retries)
  • Pages/InputOutputPorts.spec.ts › Exit fullscreen with button (shard 6, 2 retries)
  • Pages/InputOutputPorts.spec.ts › Exit fullscreen with Escape key (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)
  • Pages/Lineage/LineageRightPanel.spec.ts › Verify custom properties tab IS visible for supported type: searchIndex (shard 6, 1 retry)
  • ... and 2 more

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

…gs ITs

CI runs the integration tests against the bootstrap config which uses
DefaultLogStorage (delegates to k8s/Airflow which isn't running). The
storage returns:
- "No pods found for this pipeline" sentinel for getLogs
- non-2xx status (the SDK wraps it as statusCode -1) for /close

Adjustments:
- testIdleGapDoesNotClobberPartial: parse JSON, only assert when total>0.
  When storage actually persists (S3 deployments), assert BOTH bursts
  are present — that's the real "no clobber" check.
- postClose helper: tolerate any exception from the close call
  (idempotency is the contract; transient errors are acceptable).

The deep behavioural coverage continues to live in S3LogStorageTest unit
tests where mock S3 is the storage backend.
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented May 6, 2026

Code Review ⚠️ Changes requested 1 resolved / 3 findings

Refactors S3 log storage to prevent data clobbering by replacing multipart appends with an always-merge flush strategy and server-side copies. Further investigation is required to address potential line corruption during legacy merges and a mismatch between 'total-bytes' metadata and actual byte counts.

⚠️ Edge Case: Merge with legacy partial.txt may corrupt lines (no guard)

📄 openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java:1037-1038

In writePartialLogsForStreamLocked, the merge at line 1038 does mergedBody = existingBody + newContent. If existingBody was written by the old code (or by an interrupted PUT) and does not end with ``, the last existing line and the first new line will be concatenated into a single corrupted line.

This can occur during the upgrade window: the old writePartialLogsForStream did not guarantee a trailing newline on partial.txt. After upgrading, the first flush for an in-progress run will read the old file and append directly, corrupting one log line.

The fix is a one-line guard before the merge.

Suggested fix
String newContent = String.join("
", snapshot) + "
";
String separator = (!existingBody.isEmpty() && !existingBody.endsWith("
")) ? "
" : "";
String mergedBody = existingBody + separator + newContent;
💡 Bug: Metadata 'total-bytes' stores char count, not byte count

📄 openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java:1047

metadata.put("total-bytes", Integer.toString(mergedBody.length())) records the number of Java char units, not the number of UTF-8 bytes. For any non-ASCII log content (e.g., internationalized metadata values, Unicode table names), this will diverge from the actual S3 object size, making the 'drift detection' cross-check unreliable.

Use mergedBody.getBytes(StandardCharsets.UTF_8).length for correctness, or rename the metadata key to total-chars to avoid confusion. Since the merged body is already materialized as a String, converting to bytes just for the length adds a full copy; an alternative is to compute it from the RequestBody after construction.

Suggested fix
metadata.put("total-bytes", Long.toString(mergedBody.getBytes(StandardCharsets.UTF_8).length));
✅ 1 resolved
Edge Case: getCombinedLogsForActiveStream can transiently miss lines

📄 openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java:1352-1366
The read path in getCombinedLogsForActiveStream reads partial.txt from S3 without holding the per-stream lock, then acquires the lock to snapshot pendingFlush. If a flush completes between the two steps (draining pendingFlush into a new partial.txt), the reader sees the old partial.txt (missing the flushed lines) and an empty pendingFlush — resulting in a transient gap in the returned logs.

Since this is a live-read endpoint for the UI and the next request will see the correct state, this is unlikely to cause user confusion in practice. However, if a single GET is used for archival/comparison purposes, it may produce an incomplete snapshot.

A fix would be to hold the per-stream lock for the entire read (S3 GET + pendingFlush snapshot), or to retry on the detected condition (partial.txt offset < expected based on totalLinesAppended).

🤖 Prompt for agents
Code Review: Refactors S3 log storage to prevent data clobbering by replacing multipart appends with an always-merge flush strategy and server-side copies. Further investigation is required to address potential line corruption during legacy merges and a mismatch between 'total-bytes' metadata and actual byte counts.

1. ⚠️ Edge Case: Merge with legacy partial.txt may corrupt lines (no 
 guard)
   Files: openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java:1037-1038

   In `writePartialLogsForStreamLocked`, the merge at line 1038 does `mergedBody = existingBody + newContent`. If `existingBody` was written by the old code (or by an interrupted PUT) and does not end with `
   `, the last existing line and the first new line will be concatenated into a single corrupted line.
   
   This can occur during the upgrade window: the old `writePartialLogsForStream` did not guarantee a trailing newline on `partial.txt`. After upgrading, the first flush for an in-progress run will read the old file and append directly, corrupting one log line.
   
   The fix is a one-line guard before the merge.

   Suggested fix:
   String newContent = String.join("
   ", snapshot) + "
   ";
   String separator = (!existingBody.isEmpty() && !existingBody.endsWith("
   ")) ? "
   " : "";
   String mergedBody = existingBody + separator + newContent;

2. 💡 Bug: Metadata 'total-bytes' stores char count, not byte count
   Files: openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java:1047

   `metadata.put("total-bytes", Integer.toString(mergedBody.length()))` records the number of Java `char` units, not the number of UTF-8 bytes. For any non-ASCII log content (e.g., internationalized metadata values, Unicode table names), this will diverge from the actual S3 object size, making the 'drift detection' cross-check unreliable.
   
   Use `mergedBody.getBytes(StandardCharsets.UTF_8).length` for correctness, or rename the metadata key to `total-chars` to avoid confusion. Since the merged body is already materialized as a String, converting to bytes just for the length adds a full copy; an alternative is to compute it from the RequestBody after construction.

   Suggested fix:
   metadata.put("total-bytes", Long.toString(mergedBody.getBytes(StandardCharsets.UTF_8).length));

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented May 6, 2026

@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented May 6, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants