@@ -35,6 +35,8 @@ def __init__(
3535 self .retry_total = retry_total
3636
3737 self .token_error = TOKEN_ERROR
38+ self ._shutdown = False
39+ self ._pending_requests = 0
3840
3941 def ui_url (self ) -> str :
4042 return self .api_url .replace ("api" , "ui" )
@@ -48,8 +50,27 @@ async def _get_session(self) -> aiohttp.ClientSession:
4850
4951 async def close (self ) -> None :
5052 """Close the aiohttp session."""
51- if self ._session and not self ._session .closed :
52- await self ._session .close ()
53+ try :
54+ # Set shutdown flag to prevent new operations
55+ self ._shutdown = True
56+
57+ # Wait for any pending requests to complete
58+ if self ._pending_requests > 0 :
59+ logging .info (
60+ f"Waiting for { self ._pending_requests } pending requests to complete..."
61+ )
62+ # Give a short time for requests to complete
63+ for _ in range (5 ): # Try for up to 5 seconds
64+ if self ._pending_requests == 0 :
65+ break
66+ await asyncio .sleep (1 )
67+
68+ # Close the session
69+ if self ._session and not self ._session .closed :
70+ await self ._session .close ()
71+ except asyncio .CancelledError :
72+ # If we're cancelled during close, just propagate the error
73+ raise
5374
5475 async def __aenter__ (self ):
5576 """Support for async context manager."""
@@ -61,84 +82,127 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
6182
6283 async def _handle_rate_limit (self ) -> None :
6384 """Handle rate limiting by waiting if necessary."""
64- if self .rate_limit_remaining == 0 :
65- time_to_sleep = min ((self .rate_limit_reset - time .time ()), 10 )
66- time_to_sleep = max (time_to_sleep , 1 ) + 1
67- logging .info (f"Waiting { str (time_to_sleep )} seconds before retrying..." )
68- await asyncio .sleep (time_to_sleep )
69- logging .info ("Retrying" )
85+ try :
86+ if self .rate_limit_remaining == 0 :
87+ time_to_sleep = min ((self .rate_limit_reset - time .time ()), 10 )
88+ time_to_sleep = max (time_to_sleep , 1 ) + 1
89+ logging .info (f"Waiting { str (time_to_sleep )} seconds before retrying..." )
90+ await asyncio .sleep (time_to_sleep )
91+ logging .info ("Retrying" )
92+ except asyncio .CancelledError :
93+ self ._shutdown = True
94+ raise
7095
7196 def _set_rate_limit (self , response : aiohttp .ClientResponse ) -> None :
7297 """Update rate limit information from response headers."""
73- headers = response .headers
74- if "X-Ratelimit-Limit" in headers :
75- self .rate_limit_points = int (headers .get ("X-Ratelimit-Limit" ))
76- self .rate_limit_remaining = int (headers .get ("X-Ratelimit-Remaining" ))
77- self .rate_limit_reset = int (headers .get ("X-Ratelimit-Reset" ))
78- self .retry_after = int (headers .get ("Retry-After" , "0" ))
98+ try :
99+ headers = response .headers
100+ if "X-Ratelimit-Limit" in headers :
101+ self .rate_limit_points = int (headers .get ("X-Ratelimit-Limit" ))
102+ self .rate_limit_remaining = int (headers .get ("X-Ratelimit-Remaining" ))
103+ self .rate_limit_reset = int (headers .get ("X-Ratelimit-Reset" ))
104+ self .retry_after = int (headers .get ("Retry-After" , "0" ))
105+ except asyncio .CancelledError :
106+ self ._shutdown = True
107+ raise
79108
80109 async def send (self , path : str , method : str = "POST" , ** kwargs ):
81110 @backoff .on_exception (
82111 backoff .expo , (RateLimitError ,), max_tries = self .retry_total
83112 )
84113 async def _send ():
85- session = await self ._get_session ()
86- headers = {"Authorization" : f"Bearer { self .token } " }
87-
88- if "headers" in kwargs :
89- kwargs ["headers" ].update (headers )
90- else :
91- kwargs ["headers" ] = headers
92-
93- url = f"{ self .api_url } /{ self .version } /{ path .lstrip ('/' )} "
94-
95- while True :
96- if method == "POST" :
97- response = await session .post (url , ** kwargs )
98- elif method == "DELETE" :
99- response = await session .delete (url , ** kwargs )
100- else :
101- response = await session .get (url , ** kwargs )
102-
103- self ._set_rate_limit (response )
104-
105- if response .status == 429 :
106- logging .warning (
107- f"Too many requests, you can do { self .rate_limit_points } requests per minute..."
108- )
109- raise RateLimitError ()
110- else :
111- break
112-
113- if response .status == 403 :
114- logging .error (self .token_error )
115-
116- response .raise_for_status ()
117- return response
114+ try :
115+ # If we're shutting down, don't start new requests
116+ if self ._shutdown :
117+ raise asyncio .CancelledError ()
118+
119+ # Increment pending requests counter
120+ self ._pending_requests += 1
121+
122+ try :
123+ session = await self ._get_session ()
124+ headers = {"Authorization" : f"Bearer { self .token } " }
125+
126+ if "headers" in kwargs :
127+ kwargs ["headers" ].update (headers )
128+ else :
129+ kwargs ["headers" ] = headers
130+
131+ url = f"{ self .api_url } /{ self .version } /{ path .lstrip ('/' )} "
132+
133+ while True :
134+ if method == "POST" :
135+ response = await session .post (url , ** kwargs )
136+ elif method == "DELETE" :
137+ response = await session .delete (url , ** kwargs )
138+ else :
139+ response = await session .get (url , ** kwargs )
140+
141+ self ._set_rate_limit (response )
142+
143+ if response .status == 429 :
144+ logging .warning (
145+ f"Too many requests, you can do { self .rate_limit_points } requests per minute..."
146+ )
147+ raise RateLimitError ()
148+ else :
149+ break
150+
151+ if response .status == 403 :
152+ logging .error (self .token_error )
153+
154+ response .raise_for_status ()
155+ return response
156+ finally :
157+ # Decrement pending requests counter
158+ self ._pending_requests -= 1
159+ except asyncio .CancelledError :
160+ self ._shutdown = True
161+ raise
118162
119163 return await _send ()
120164
121165 async def post (self , path : str , ** kwargs ) -> aiohttp .ClientResponse :
122166 """Send a POST request to the Tinybird API."""
123- return await self .send (path , method = "POST" , ** kwargs )
167+ try :
168+ return await self .send (path , method = "POST" , ** kwargs )
169+ except asyncio .CancelledError :
170+ self ._shutdown = True
171+ raise
124172
125173 async def get (self , path : str , ** kwargs ) -> aiohttp .ClientResponse :
126174 """Send a GET request to the Tinybird API."""
127- return await self .send (path , method = "GET" , ** kwargs )
175+ try :
176+ return await self .send (path , method = "GET" , ** kwargs )
177+ except asyncio .CancelledError :
178+ self ._shutdown = True
179+ raise
128180
129181 async def delete (self , path : str , ** kwargs ) -> aiohttp .ClientResponse :
130182 """Send a DELETE request to the Tinybird API."""
131- return await self .send (path , method = "DELETE" , ** kwargs )
183+ try :
184+ return await self .send (path , method = "DELETE" , ** kwargs )
185+ except asyncio .CancelledError :
186+ self ._shutdown = True
187+ raise
132188
133189 async def get_json (self , path : str , ** kwargs ) -> Dict [str , Any ]:
134190 """Send a GET request and return the JSON response."""
135- response = await self .get (path , ** kwargs )
136- return await response .json ()
191+ try :
192+ response = await self .get (path , ** kwargs )
193+ return await response .json ()
194+ except asyncio .CancelledError :
195+ self ._shutdown = True
196+ raise
137197
138198 async def post_json (self , path : str , ** kwargs ) -> Dict [str , Any ]:
139199 """Send a POST request and return the JSON response."""
140- response = await self .post (path , ** kwargs )
141- return await response .json ()
200+ try :
201+ response = await self .post (path , ** kwargs )
202+ return await response .json ()
203+ except asyncio .CancelledError :
204+ self ._shutdown = True
205+ raise
142206
143207 async def initialize (self ) -> None :
144208 """Initialize the API by checking the token validity."""
@@ -148,3 +212,6 @@ async def initialize(self) -> None:
148212 if e .status == 403 :
149213 logging .error (self .token_error )
150214 sys .exit (- 1 )
215+ except asyncio .CancelledError :
216+ self ._shutdown = True
217+ raise
0 commit comments