Skip to content

Commit dbbab49

Browse files
committed
test(ray): Fix flaky ray tests
1 parent fe4a09e commit dbbab49

1 file changed

Lines changed: 39 additions & 9 deletions

File tree

tests/integrations/ray/test_ray.py

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import os
33
import shutil
4+
import time
45
import uuid
56

67
import pytest
@@ -54,7 +55,7 @@ def setup_sentry(span_streaming=False, transport=None):
5455
)
5556

5657

57-
def read_error_from_log(job_id, ray_temp_dir):
58+
def _parse_error_from_log(job_id, ray_temp_dir):
5859
# Find the actual session directory that Ray created
5960
session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")]
6061
if not session_dirs:
@@ -66,14 +67,16 @@ def read_error_from_log(job_id, ray_temp_dir):
6667
if not os.path.exists(log_dir):
6768
raise FileNotFoundError(f"No logs directory found at {log_dir}")
6869

69-
log_file = [
70+
log_files = [
7071
f
7172
for f in os.listdir(log_dir)
7273
if "worker" in f and job_id in f and f.endswith(".out")
73-
][0]
74+
]
75+
if not log_files:
76+
return None
7477

7578
next_line_is_event_payload = False
76-
with open(os.path.join(log_dir, log_file), "r") as file:
79+
with open(os.path.join(log_dir, log_files[0]), "r") as file:
7780
for line in file:
7881
try:
7982
payload = json.loads(line)
@@ -89,7 +92,19 @@ def read_error_from_log(job_id, ray_temp_dir):
8992
return None
9093

9194

92-
def read_spans_from_log(job_id, ray_temp_dir):
95+
def read_error_from_log(job_id, ray_temp_dir, timeout=10):
96+
deadline = time.monotonic() + timeout
97+
while True:
98+
try:
99+
error = _parse_error_from_log(job_id, ray_temp_dir)
100+
except FileNotFoundError:
101+
error = None
102+
if error is not None or time.monotonic() >= deadline:
103+
return error
104+
time.sleep(0.1)
105+
106+
107+
def _parse_spans_from_log(job_id, ray_temp_dir):
93108
# Find the actual session directory that Ray created
94109
session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")]
95110
if not session_dirs:
@@ -101,15 +116,17 @@ def read_spans_from_log(job_id, ray_temp_dir):
101116
if not os.path.exists(log_dir):
102117
raise FileNotFoundError(f"No logs directory found at {log_dir}")
103118

104-
log_file = [
119+
log_files = [
105120
f
106121
for f in os.listdir(log_dir)
107122
if "worker" in f and job_id in f and f.endswith(".out")
108-
][0]
123+
]
124+
if not log_files:
125+
return []
109126

110127
spans = []
111128
next_line_is_span_payload = False
112-
with open(os.path.join(log_dir, log_file), "r") as file:
129+
with open(os.path.join(log_dir, log_files[0]), "r") as file:
113130
for line in file:
114131
try:
115132
payload = json.loads(line)
@@ -125,6 +142,19 @@ def read_spans_from_log(job_id, ray_temp_dir):
125142
return spans
126143

127144

145+
def read_spans_from_log(job_id, ray_temp_dir, min_spans=1, timeout=10):
146+
deadline = time.monotonic() + timeout
147+
spans = []
148+
while True:
149+
try:
150+
spans = _parse_spans_from_log(job_id, ray_temp_dir)
151+
except FileNotFoundError:
152+
spans = []
153+
if len(spans) >= min_spans or time.monotonic() >= deadline:
154+
return spans
155+
time.sleep(0.1)
156+
157+
128158
def example_task(span_streaming: bool):
129159
if span_streaming:
130160
with sentry_sdk.traces.start_span(
@@ -205,7 +235,7 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming):
205235
ray.get(future)
206236

207237
job_id = future.job_id().hex()
208-
worker_spans = read_spans_from_log(job_id, ray_temp_dir)
238+
worker_spans = read_spans_from_log(job_id, ray_temp_dir, min_spans=2)
209239
finally:
210240
if os.path.exists(ray_temp_dir):
211241
shutil.rmtree(ray_temp_dir, ignore_errors=True)

0 commit comments

Comments
 (0)