Skip to content

Commit 7983ad0

Browse files
authored
Merge pull request #4 from tinybirdco/async
Async
2 parents 2660627 + 53ee653 commit 7983ad0

12 files changed

Lines changed: 532 additions & 34 deletions

File tree

README.md

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,38 @@ with Datasource(datasource_name, token) as ds:
1515
ds << {'key': 'value', 'key1': 'value1'}
1616
```
1717

18-
```python
19-
from tb.datasource import Datasource
20-
21-
with Datasource(datasource_name, token, api_url='https://api.us-east.tinybird.co') as ds:
22-
ds << {'key': 'value', 'key1': 'value1'}
23-
```
24-
25-
Alternatively you can do:
18+
You can also use the async version:
2619

2720
```python
28-
from tb.datasource import Datasource
29-
30-
ds = Datasource(datasource_name, token)
31-
for json_obj in list_of_json:
32-
ds << json_obj
21+
from tb.a.datasource import AsyncDatasource
3322

34-
# just remember to flush the remaining json_obj at the end
35-
ds.flush()
23+
async with AsyncDatasource(datasource_name, token, api_url='https://api.us-east.tinybird.co') as ds:
24+
await ds << {'key': 'value', 'key1': 'value1'}
3625
```
3726

3827
Notes:
39-
- The `Datasource` object does some in-memory buffering and uses the [events API](https://docs.tinybird.co/api-reference/datasource-api.html#post-v0-events).
28+
- The `Datasource` object does some in-memory buffering and uses the [events API](https://www.tinybird.co/docs/v2/get-data-in/events-api).
4029
- It only supports `ndjson` data
41-
- It automatically handles [Rate Limits](https://docs.tinybird.co/api-reference/api-reference.html#limits)
30+
- It automatically handles [Rate Limits](https://www.tinybird.co/docs/get-started/plans/limits#ingestion-limits-api)
4231

4332
## Ingest using an API instance
4433

4534
```python
4635

47-
from tb.api import API
48-
49-
api = API(token, api_url)
50-
api.post('/v0/datasources', params={
51-
'name': 'datasource_name',
52-
'mode': 'append',
53-
'format': 'ndjson',
54-
'url': 'https://storage.googleapis.com/davidm-wadus/events.ndjson',
55-
})
36+
from tb.a.api import AsyncAPI
37+
38+
async with AsyncAPI(token, api_url) as api:
39+
await api.post('datasources',
40+
params={
41+
'name': 'datasource_name',
42+
'mode': 'append',
43+
'format': 'ndjson',
44+
'url': 'https://storage.googleapis.com/davidm-wadus/events.ndjson',
45+
}
46+
)
5647
```
5748

58-
- It automatically handle [Rate Limits](https://docs.tinybird.co/api-reference/api-reference.html#limits)
49+
- It automatically handles [Rate Limits](https://docs.tinybird.co/api-reference/api-reference.html#limits)
5950
- Works with any Tinybird API
6051
- The `post`, `get`, `send` methods signatures are equivalent to the [requests](https://docs.python-requests.org/en/latest/) library.
6152

@@ -105,3 +96,31 @@ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(messag
10596
handler.setFormatter(formatter)
10697
logger.addHandler(handler)
10798
```
99+
100+
## Logging from Litellm to a Tinybird Data Source
101+
102+
Install the `ai` extra:
103+
104+
```
105+
pip install tinybird-python-sdk[ai]
106+
```
107+
108+
Then use the following handler:
109+
110+
```python
111+
from tb.litellm.handler import TinybirdLitellmHandler
112+
113+
customHandler = TinybirdLitellmHandler(
114+
api_url="https://api.us-east.aws.tinybird.co",
115+
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
116+
datasource_name="litellm"
117+
)
118+
119+
litellm.callbacks = [customHandler]
120+
121+
response = await acompletion(
122+
model="gpt-3.5-turbo",
123+
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
124+
stream=True
125+
)
126+
```

examples/datasource.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
import os
3+
from tb.a.datasource import AsyncDatasource
4+
5+
6+
# Datasource example usage
7+
async def example():
8+
token = os.getenv("TINYBIRD_TOKEN")
9+
async with AsyncDatasource(
10+
datasource_name="my_datasource", token=token
11+
) as datasource:
12+
await datasource.append({"key": "value1"})
13+
14+
await (datasource << {"key": "value3"})
15+
16+
for i in range(100):
17+
if i % 2 == 0:
18+
await datasource.append({"index": i, "value": f"append_{i}"})
19+
else:
20+
await (datasource << {"index": i, "value": f"lshift_{i}"})
21+
22+
datasource = AsyncDatasource(datasource_name="my_datasource", token=token)
23+
try:
24+
await datasource.append({"method": "append"})
25+
await (datasource << {"method": "lshift"})
26+
27+
await datasource.flush()
28+
finally:
29+
await datasource.close()
30+
31+
32+
if __name__ == "__main__":
33+
asyncio.run(example())

examples/litellm_async.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
import litellm
3+
from litellm import acompletion
4+
import os
5+
from tb.litellm.handler import TinybirdLitellmHandler
6+
7+
8+
async def main():
9+
# Set up the handler
10+
customHandler = TinybirdLitellmHandler(
11+
api_url="https://api.us-east.aws.tinybird.co",
12+
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
13+
datasource_name="litellm",
14+
)
15+
16+
litellm.callbacks = [customHandler]
17+
18+
print("Running async example...")
19+
response = await acompletion(
20+
model="gpt-3.5-turbo",
21+
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
22+
stream=True,
23+
)
24+
25+
async for chunk in response:
26+
print(chunk)
27+
28+
# Wait for callbacks to complete
29+
await asyncio.sleep(2)
30+
31+
32+
if __name__ == "__main__":
33+
asyncio.run(main())

examples/litellm_sync.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import time
2+
import litellm
3+
from litellm import completion
4+
import os
5+
from tb.litellm.handler import TinybirdLitellmHandler
6+
7+
customHandler = TinybirdLitellmHandler(
8+
api_url="https://api.us-east.aws.tinybird.co",
9+
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
10+
datasource_name="litellm",
11+
)
12+
13+
litellm.callbacks = [customHandler]
14+
15+
print("Running synchronous example...")
16+
response = completion(
17+
model="gpt-3.5-turbo",
18+
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
19+
stream=True,
20+
)
21+
22+
for chunk in response:
23+
print(chunk)
24+
25+
time.sleep(2)
26+
27+
print("\nTo run the async example, use the litellm_async.py script")

pyproject.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,21 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "tinybird-python-sdk"
7-
version = "0.1.7"
7+
version = "0.2.0"
88
description = "Python SDK for Tinybird"
99
readme = "README.md"
1010
authors = [
1111
{name = "tinybird.co", email = "support@tinybird.co"},
1212
]
1313
dependencies = [
1414
"requests>=2.32.3",
15+
"aiohttp>=3.11.2",
16+
"backoff>=2.2.0",
1517
]
1618

1719
[project.optional-dependencies]
18-
dev = [
19-
"black>=23.12.1",
20-
"pyproject-toml>=0.0.10",
20+
ai = [
21+
"litellm>=1.62.1"
2122
]
2223

2324
[project.urls]

requirements.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tb/a/__init__.py

Whitespace-only changes.

tb/a/api.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import asyncio
2+
import logging
3+
import sys
4+
import time
5+
from typing import Optional, Dict, Any
6+
7+
import aiohttp
8+
import backoff
9+
10+
11+
class RateLimitError(Exception):
12+
pass
13+
14+
15+
class AsyncAPI:
16+
def __init__(
17+
self,
18+
token: str,
19+
api_url: str = "https://api.tinybird.co",
20+
version: str = "v0",
21+
retry_total: int = 1,
22+
):
23+
self.api_url = api_url.rstrip("/")
24+
self.version = version
25+
TOKEN_ERROR = f"Token must be a valid Tinybird token for {self.api_url}. Check the `api_url` param is correct and the token has the right permissions. {self.ui_url()}/tokens"
26+
if not token:
27+
logging.critical(TOKEN_ERROR)
28+
sys.exit(-1)
29+
self.token = token
30+
self._session: Optional[aiohttp.ClientSession] = None
31+
32+
self.rate_limit_points = 6
33+
self.rate_limit_remaining = self.rate_limit_points
34+
self.rate_limit_reset = 0
35+
self.retry_after = 1
36+
self.retry_total = retry_total
37+
38+
self.token_error = TOKEN_ERROR
39+
40+
def ui_url(self) -> str:
41+
return self.api_url.replace("api", "ui")
42+
43+
async def _get_session(self) -> aiohttp.ClientSession:
44+
"""Get or create an aiohttp session."""
45+
if self._session is None or self._session.closed:
46+
timeout = aiohttp.ClientTimeout(total=60)
47+
self._session = aiohttp.ClientSession(timeout=timeout)
48+
return self._session
49+
50+
async def close(self) -> None:
51+
"""Close the aiohttp session."""
52+
if self._session and not self._session.closed:
53+
await self._session.close()
54+
55+
async def __aenter__(self):
56+
"""Support for async context manager."""
57+
return self
58+
59+
async def __aexit__(self, exc_type, exc_val, exc_tb):
60+
"""Support for async context manager."""
61+
await self.close()
62+
63+
async def _handle_rate_limit(self) -> None:
64+
"""Handle rate limiting by waiting if necessary."""
65+
if self.rate_limit_remaining == 0:
66+
time_to_sleep = min((self.rate_limit_reset - time.time()), 10)
67+
time_to_sleep = max(time_to_sleep, 1) + 1
68+
logging.info(f"Waiting {str(time_to_sleep)} seconds before retrying...")
69+
await asyncio.sleep(time_to_sleep)
70+
logging.info("Retrying")
71+
72+
def _set_rate_limit(self, response: aiohttp.ClientResponse) -> None:
73+
"""Update rate limit information from response headers."""
74+
headers = response.headers
75+
if "X-Ratelimit-Limit" in headers:
76+
self.rate_limit_points = int(headers.get("X-Ratelimit-Limit"))
77+
self.rate_limit_remaining = int(headers.get("X-Ratelimit-Remaining"))
78+
self.rate_limit_reset = int(headers.get("X-Ratelimit-Reset"))
79+
self.retry_after = int(headers.get("Retry-After", "0"))
80+
81+
async def send(self, path: str, method: str = "POST", **kwargs):
82+
@backoff.on_exception(
83+
backoff.expo, (RateLimitError,), max_tries=self.retry_total
84+
)
85+
async def _send():
86+
session = await self._get_session()
87+
headers = {"Authorization": f"Bearer {self.token}"}
88+
89+
if "headers" in kwargs:
90+
kwargs["headers"].update(headers)
91+
else:
92+
kwargs["headers"] = headers
93+
94+
url = f"{self.api_url}/{self.version}/{path.lstrip('/')}"
95+
96+
while True:
97+
if method == "POST":
98+
response = await session.post(url, **kwargs)
99+
elif method == "DELETE":
100+
response = await session.delete(url, **kwargs)
101+
else:
102+
response = await session.get(url, **kwargs)
103+
104+
self._set_rate_limit(response)
105+
106+
if response.status == 429:
107+
logging.warning(
108+
f"Too many requests, you can do {self.rate_limit_points} requests per minute..."
109+
)
110+
raise RateLimitError()
111+
else:
112+
break
113+
114+
if response.status == 403:
115+
logging.error(self.token_error)
116+
117+
response.raise_for_status()
118+
return response
119+
120+
return await _send()
121+
122+
async def post(self, path: str, **kwargs) -> aiohttp.ClientResponse:
123+
"""Send a POST request to the Tinybird API."""
124+
return await self.send(path, method="POST", **kwargs)
125+
126+
async def get(self, path: str, **kwargs) -> aiohttp.ClientResponse:
127+
"""Send a GET request to the Tinybird API."""
128+
return await self.send(path, method="GET", **kwargs)
129+
130+
async def delete(self, path: str, **kwargs) -> aiohttp.ClientResponse:
131+
"""Send a DELETE request to the Tinybird API."""
132+
return await self.send(path, method="DELETE", **kwargs)
133+
134+
async def get_json(self, path: str, **kwargs) -> Dict[str, Any]:
135+
"""Send a GET request and return the JSON response."""
136+
response = await self.get(path, **kwargs)
137+
return await response.json()
138+
139+
async def post_json(self, path: str, **kwargs) -> Dict[str, Any]:
140+
"""Send a POST request and return the JSON response."""
141+
response = await self.post(path, **kwargs)
142+
return await response.json()
143+
144+
async def initialize(self) -> None:
145+
"""Initialize the API by checking the token validity."""
146+
try:
147+
await self.get("/datasources")
148+
except aiohttp.ClientResponseError as e:
149+
if e.status == 403:
150+
logging.error(self.token_error)
151+
sys.exit(-1)

0 commit comments

Comments
 (0)