From 54848b4cd0a7ed571ec7cbeb63266c347a83a4b0 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 07:22:57 +0000 Subject: [PATCH 1/4] chore(bigframes): optimize system test teardown --- .../bigframes/session/anonymous_dataset.py | 33 +++++++++++-------- packages/bigframes/noxfile.py | 1 + 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/packages/bigframes/bigframes/session/anonymous_dataset.py b/packages/bigframes/bigframes/session/anonymous_dataset.py index 1a3d43655b79..dfb1bba3a8e9 100644 --- a/packages/bigframes/bigframes/session/anonymous_dataset.py +++ b/packages/bigframes/bigframes/session/anonymous_dataset.py @@ -16,6 +16,7 @@ import threading import uuid import warnings +from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Sequence import google.cloud.bigquery as bigquery @@ -170,19 +171,23 @@ def _cleanup_old_udfs(self): def close(self): """Delete tables that were created with this session's session_id.""" - for table_ref in self._table_ids: - self.bqclient.delete_table(table_ref, not_found_ok=True) + with ThreadPoolExecutor() as executor: + for table_ref in self._table_ids: + executor.submit(self.bqclient.delete_table, table_ref, not_found_ok=True) self._table_ids.clear() - try: - # Before closing the session, attempt to clean up any uncollected, - # old Python UDFs residing in the anonymous dataset. These UDFs - # accumulate over time and can eventually exceed resource limits. - # See more from b/450913424. - self._cleanup_old_udfs() - except Exception as e: - # Log a warning on the failure, do not interrupt the workflow. - msg = bfe.format_message( - f"Failed to clean up the old Python UDFs before closing the session: {e}" - ) - warnings.warn(msg, category=bfe.CleanupFailedWarning) + def run_cleanup(): + try: + # Before closing the session, attempt to clean up any uncollected, + # old Python UDFs residing in the anonymous dataset. These UDFs + # accumulate over time and can eventually exceed resource limits. + # See more from b/450913424. + self._cleanup_old_udfs() + except Exception as e: + # Log a warning on the failure, do not interrupt the workflow. + msg = bfe.format_message( + f"Failed to clean up the old Python UDFs before closing the session: {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) + + threading.Thread(target=run_cleanup, daemon=True, name="bigframes-udf-cleanup").start() diff --git a/packages/bigframes/noxfile.py b/packages/bigframes/noxfile.py index e7c105a552e8..0a33264aa8ea 100644 --- a/packages/bigframes/noxfile.py +++ b/packages/bigframes/noxfile.py @@ -354,6 +354,7 @@ def run_system( "py.test", "-v", f"-n={num_workers}", + "--dist=worksteal", # Any individual test taking longer than 15 mins will be terminated. f"--timeout={timeout_seconds}", # Log 20 slowest tests From 87cd5f7b991a3ef8552fb58d9e87b7485b2d5c84 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 07:26:30 +0000 Subject: [PATCH 2/4] fix lint --- packages/bigframes/bigframes/session/anonymous_dataset.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/bigframes/bigframes/session/anonymous_dataset.py b/packages/bigframes/bigframes/session/anonymous_dataset.py index dfb1bba3a8e9..aa2755553fdd 100644 --- a/packages/bigframes/bigframes/session/anonymous_dataset.py +++ b/packages/bigframes/bigframes/session/anonymous_dataset.py @@ -173,7 +173,9 @@ def close(self): """Delete tables that were created with this session's session_id.""" with ThreadPoolExecutor() as executor: for table_ref in self._table_ids: - executor.submit(self.bqclient.delete_table, table_ref, not_found_ok=True) + executor.submit( + self.bqclient.delete_table, table_ref, not_found_ok=True + ) self._table_ids.clear() def run_cleanup(): @@ -190,4 +192,6 @@ def run_cleanup(): ) warnings.warn(msg, category=bfe.CleanupFailedWarning) - threading.Thread(target=run_cleanup, daemon=True, name="bigframes-udf-cleanup").start() + threading.Thread( + target=run_cleanup, daemon=True, name="bigframes-udf-cleanup" + ).start() From d595fee03c65991d9b8b0d5f113fce7bc3205618 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 17:34:53 +0000 Subject: [PATCH 3/4] address PR feedback --- .../bigframes/session/anonymous_dataset.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/bigframes/bigframes/session/anonymous_dataset.py b/packages/bigframes/bigframes/session/anonymous_dataset.py index aa2755553fdd..624def41e0da 100644 --- a/packages/bigframes/bigframes/session/anonymous_dataset.py +++ b/packages/bigframes/bigframes/session/anonymous_dataset.py @@ -171,12 +171,17 @@ def _cleanup_old_udfs(self): def close(self): """Delete tables that were created with this session's session_id.""" - with ThreadPoolExecutor() as executor: - for table_ref in self._table_ids: - executor.submit( - self.bqclient.delete_table, table_ref, not_found_ok=True - ) - self._table_ids.clear() + if self._table_ids: + try: + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.bqclient.delete_table, table_ref, not_found_ok=True) + for table_ref in self._table_ids + ] + for future in futures: + future.result() + finally: + self._table_ids.clear() def run_cleanup(): try: From c7d1f83f23ad9b57c1328b2ad2404ae0fd815fed Mon Sep 17 00:00:00 2001 From: ohmayr Date: Fri, 12 Jun 2026 17:43:19 +0000 Subject: [PATCH 4/4] revert udf cleanup to sync --- .../bigframes/session/anonymous_dataset.py | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/packages/bigframes/bigframes/session/anonymous_dataset.py b/packages/bigframes/bigframes/session/anonymous_dataset.py index 624def41e0da..ed718ff909f0 100644 --- a/packages/bigframes/bigframes/session/anonymous_dataset.py +++ b/packages/bigframes/bigframes/session/anonymous_dataset.py @@ -175,7 +175,9 @@ def close(self): try: with ThreadPoolExecutor() as executor: futures = [ - executor.submit(self.bqclient.delete_table, table_ref, not_found_ok=True) + executor.submit( + self.bqclient.delete_table, table_ref, not_found_ok=True + ) for table_ref in self._table_ids ] for future in futures: @@ -183,20 +185,15 @@ def close(self): finally: self._table_ids.clear() - def run_cleanup(): - try: - # Before closing the session, attempt to clean up any uncollected, - # old Python UDFs residing in the anonymous dataset. These UDFs - # accumulate over time and can eventually exceed resource limits. - # See more from b/450913424. - self._cleanup_old_udfs() - except Exception as e: - # Log a warning on the failure, do not interrupt the workflow. - msg = bfe.format_message( - f"Failed to clean up the old Python UDFs before closing the session: {e}" - ) - warnings.warn(msg, category=bfe.CleanupFailedWarning) - - threading.Thread( - target=run_cleanup, daemon=True, name="bigframes-udf-cleanup" - ).start() + try: + # Before closing the session, attempt to clean up any uncollected, + # old Python UDFs residing in the anonymous dataset. These UDFs + # accumulate over time and can eventually exceed resource limits. + # See more from b/450913424. + self._cleanup_old_udfs() + except Exception as e: + # Log a warning on the failure, do not interrupt the workflow. + msg = bfe.format_message( + f"Failed to clean up the old Python UDFs before closing the session: {e}" + ) + warnings.warn(msg, category=bfe.CleanupFailedWarning)