Skip to content

Commit 8c20e1f

Browse files
authored
Merge pull request #6 from tinybirdco/shutdown
improve shutdown
2 parents c5a43dc + 78d3236 commit 8c20e1f

3 files changed

Lines changed: 240 additions & 108 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "tinybird-python-sdk"
7-
version = "0.3.4"
7+
version = "0.3.5"
88
description = "Python SDK for Tinybird"
99
readme = "README.md"
1010
authors = [

tb/a/api.py

Lines changed: 121 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ def __init__(
3535
self.retry_total = retry_total
3636

3737
self.token_error = TOKEN_ERROR
38+
self._shutdown = False
39+
self._pending_requests = 0
3840

3941
def ui_url(self) -> str:
4042
return self.api_url.replace("api", "ui")
@@ -48,8 +50,27 @@ async def _get_session(self) -> aiohttp.ClientSession:
4850

4951
async def close(self) -> None:
5052
"""Close the aiohttp session."""
51-
if self._session and not self._session.closed:
52-
await self._session.close()
53+
try:
54+
# Set shutdown flag to prevent new operations
55+
self._shutdown = True
56+
57+
# Wait for any pending requests to complete
58+
if self._pending_requests > 0:
59+
logging.info(
60+
f"Waiting for {self._pending_requests} pending requests to complete..."
61+
)
62+
# Give a short time for requests to complete
63+
for _ in range(5): # Try for up to 5 seconds
64+
if self._pending_requests == 0:
65+
break
66+
await asyncio.sleep(1)
67+
68+
# Close the session
69+
if self._session and not self._session.closed:
70+
await self._session.close()
71+
except asyncio.CancelledError:
72+
# If we're cancelled during close, just propagate the error
73+
raise
5374

5475
async def __aenter__(self):
5576
"""Support for async context manager."""
@@ -61,84 +82,127 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
6182

6283
async def _handle_rate_limit(self) -> None:
6384
"""Handle rate limiting by waiting if necessary."""
64-
if self.rate_limit_remaining == 0:
65-
time_to_sleep = min((self.rate_limit_reset - time.time()), 10)
66-
time_to_sleep = max(time_to_sleep, 1) + 1
67-
logging.info(f"Waiting {str(time_to_sleep)} seconds before retrying...")
68-
await asyncio.sleep(time_to_sleep)
69-
logging.info("Retrying")
85+
try:
86+
if self.rate_limit_remaining == 0:
87+
time_to_sleep = min((self.rate_limit_reset - time.time()), 10)
88+
time_to_sleep = max(time_to_sleep, 1) + 1
89+
logging.info(f"Waiting {str(time_to_sleep)} seconds before retrying...")
90+
await asyncio.sleep(time_to_sleep)
91+
logging.info("Retrying")
92+
except asyncio.CancelledError:
93+
self._shutdown = True
94+
raise
7095

7196
def _set_rate_limit(self, response: aiohttp.ClientResponse) -> None:
7297
"""Update rate limit information from response headers."""
73-
headers = response.headers
74-
if "X-Ratelimit-Limit" in headers:
75-
self.rate_limit_points = int(headers.get("X-Ratelimit-Limit"))
76-
self.rate_limit_remaining = int(headers.get("X-Ratelimit-Remaining"))
77-
self.rate_limit_reset = int(headers.get("X-Ratelimit-Reset"))
78-
self.retry_after = int(headers.get("Retry-After", "0"))
98+
try:
99+
headers = response.headers
100+
if "X-Ratelimit-Limit" in headers:
101+
self.rate_limit_points = int(headers.get("X-Ratelimit-Limit"))
102+
self.rate_limit_remaining = int(headers.get("X-Ratelimit-Remaining"))
103+
self.rate_limit_reset = int(headers.get("X-Ratelimit-Reset"))
104+
self.retry_after = int(headers.get("Retry-After", "0"))
105+
except asyncio.CancelledError:
106+
self._shutdown = True
107+
raise
79108

80109
async def send(self, path: str, method: str = "POST", **kwargs):
81110
@backoff.on_exception(
82111
backoff.expo, (RateLimitError,), max_tries=self.retry_total
83112
)
84113
async def _send():
85-
session = await self._get_session()
86-
headers = {"Authorization": f"Bearer {self.token}"}
87-
88-
if "headers" in kwargs:
89-
kwargs["headers"].update(headers)
90-
else:
91-
kwargs["headers"] = headers
92-
93-
url = f"{self.api_url}/{self.version}/{path.lstrip('/')}"
94-
95-
while True:
96-
if method == "POST":
97-
response = await session.post(url, **kwargs)
98-
elif method == "DELETE":
99-
response = await session.delete(url, **kwargs)
100-
else:
101-
response = await session.get(url, **kwargs)
102-
103-
self._set_rate_limit(response)
104-
105-
if response.status == 429:
106-
logging.warning(
107-
f"Too many requests, you can do {self.rate_limit_points} requests per minute..."
108-
)
109-
raise RateLimitError()
110-
else:
111-
break
112-
113-
if response.status == 403:
114-
logging.error(self.token_error)
115-
116-
response.raise_for_status()
117-
return response
114+
try:
115+
# If we're shutting down, don't start new requests
116+
if self._shutdown:
117+
raise asyncio.CancelledError()
118+
119+
# Increment pending requests counter
120+
self._pending_requests += 1
121+
122+
try:
123+
session = await self._get_session()
124+
headers = {"Authorization": f"Bearer {self.token}"}
125+
126+
if "headers" in kwargs:
127+
kwargs["headers"].update(headers)
128+
else:
129+
kwargs["headers"] = headers
130+
131+
url = f"{self.api_url}/{self.version}/{path.lstrip('/')}"
132+
133+
while True:
134+
if method == "POST":
135+
response = await session.post(url, **kwargs)
136+
elif method == "DELETE":
137+
response = await session.delete(url, **kwargs)
138+
else:
139+
response = await session.get(url, **kwargs)
140+
141+
self._set_rate_limit(response)
142+
143+
if response.status == 429:
144+
logging.warning(
145+
f"Too many requests, you can do {self.rate_limit_points} requests per minute..."
146+
)
147+
raise RateLimitError()
148+
else:
149+
break
150+
151+
if response.status == 403:
152+
logging.error(self.token_error)
153+
154+
response.raise_for_status()
155+
return response
156+
finally:
157+
# Decrement pending requests counter
158+
self._pending_requests -= 1
159+
except asyncio.CancelledError:
160+
self._shutdown = True
161+
raise
118162

119163
return await _send()
120164

121165
async def post(self, path: str, **kwargs) -> aiohttp.ClientResponse:
122166
"""Send a POST request to the Tinybird API."""
123-
return await self.send(path, method="POST", **kwargs)
167+
try:
168+
return await self.send(path, method="POST", **kwargs)
169+
except asyncio.CancelledError:
170+
self._shutdown = True
171+
raise
124172

125173
async def get(self, path: str, **kwargs) -> aiohttp.ClientResponse:
126174
"""Send a GET request to the Tinybird API."""
127-
return await self.send(path, method="GET", **kwargs)
175+
try:
176+
return await self.send(path, method="GET", **kwargs)
177+
except asyncio.CancelledError:
178+
self._shutdown = True
179+
raise
128180

129181
async def delete(self, path: str, **kwargs) -> aiohttp.ClientResponse:
130182
"""Send a DELETE request to the Tinybird API."""
131-
return await self.send(path, method="DELETE", **kwargs)
183+
try:
184+
return await self.send(path, method="DELETE", **kwargs)
185+
except asyncio.CancelledError:
186+
self._shutdown = True
187+
raise
132188

133189
async def get_json(self, path: str, **kwargs) -> Dict[str, Any]:
134190
"""Send a GET request and return the JSON response."""
135-
response = await self.get(path, **kwargs)
136-
return await response.json()
191+
try:
192+
response = await self.get(path, **kwargs)
193+
return await response.json()
194+
except asyncio.CancelledError:
195+
self._shutdown = True
196+
raise
137197

138198
async def post_json(self, path: str, **kwargs) -> Dict[str, Any]:
139199
"""Send a POST request and return the JSON response."""
140-
response = await self.post(path, **kwargs)
141-
return await response.json()
200+
try:
201+
response = await self.post(path, **kwargs)
202+
return await response.json()
203+
except asyncio.CancelledError:
204+
self._shutdown = True
205+
raise
142206

143207
async def initialize(self) -> None:
144208
"""Initialize the API by checking the token validity."""
@@ -148,3 +212,6 @@ async def initialize(self) -> None:
148212
if e.status == 403:
149213
logging.error(self.token_error)
150214
sys.exit(-1)
215+
except asyncio.CancelledError:
216+
self._shutdown = True
217+
raise

0 commit comments

Comments
 (0)