Skip to content

Commit 5a7b444

Browse files
committed
refactor and add litellm schema
1 parent 154f9fa commit 5a7b444

6 files changed

Lines changed: 160 additions & 53 deletions

File tree

README.md

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Tinybird Python SDK
22

3-
SDK around Tinybird APIs.
3+
SDK around [Tinybird](https://www.tinybird.co/) APIs.
44

55
If you want to manage Workspaces, Data Sources and Pipes you might be looking for the [tinybird-cli](https://pypi.org/project/tinybird-cli/).
66

@@ -11,7 +11,7 @@ The SDK is meant to programatically ingest `NDJSON` data or send any request to
1111
```python
1212
from tb.datasource import Datasource
1313

14-
with Datasource(datasource_name, token) as ds:
14+
with Datasource(datasource_name, tinybird_token) as ds:
1515
ds << {'key': 'value', 'key1': 'value1'}
1616
```
1717

@@ -20,7 +20,7 @@ You can also use the async version:
2020
```python
2121
from tb.a.datasource import AsyncDatasource
2222

23-
async with AsyncDatasource(datasource_name, token, api_url='https://api.us-east.tinybird.co') as ds:
23+
async with AsyncDatasource(datasource_name, tinybird_token, api_url='https://api.us-east.tinybird.co') as ds:
2424
await ds << {'key': 'value', 'key1': 'value1'}
2525
```
2626

@@ -35,7 +35,7 @@ Notes:
3535

3636
from tb.a.api import AsyncAPI
3737

38-
async with AsyncAPI(token, api_url) as api:
38+
async with AsyncAPI(tinybird_token, api_url) as api:
3939
await api.post('datasources',
4040
params={
4141
'name': 'datasource_name',
@@ -58,8 +58,8 @@ from tb.logger import TinybirdLoggingHandler
5858
from dotenv import load_dotenv
5959

6060
load_dotenv()
61-
TB_API_URL = os.getenv("TB_API_URL")
62-
TB_WRITE_TOKEN = os.getenv("TB_WRITE_TOKEN")
61+
TB_API_URL = os.getenv("TINYBIRD_API_URL")
62+
TB_WRITE_TOKEN = os.getenv("TINYBIRD_WRITE_TOKEN")
6363

6464
logger = logging.getLogger('your-logger-name')
6565
handler = TinybirdLoggingHandler(TB_API_URL, TB_WRITE_TOKEN, 'your-app-name')
@@ -87,8 +87,8 @@ from tb.logger import TinybirdLoggingQueueHandler
8787
from dotenv import load_dotenv
8888

8989
load_dotenv()
90-
TB_API_URL = os.getenv("TB_API_URL")
91-
TB_WRITE_TOKEN = os.getenv("TB_WRITE_TOKEN")
90+
TB_API_URL = os.getenv("TINYBIRD_API_URL")
91+
TB_WRITE_TOKEN = os.getenv("TINYBIRD_WRITE_TOKEN")
9292

9393
logger = logging.getLogger('your-logger-name')
9494
handler = TinybirdLoggingQueueHandler(Queue(-1), TB_API_URL, TB_WRITE_TOKEN, 'your-app-name', ds_name="your_tb_ds_name")
@@ -108,9 +108,9 @@ pip install tinybird-python-sdk[ai]
108108
Then use the following handler:
109109

110110
```python
111-
from tb.litellm.handler import TinybirdLitellmHandler
111+
from tb.litellm.handler import TinybirdLitellmAsyncHandler
112112

113-
customHandler = TinybirdLitellmHandler(
113+
customHandler = TinybirdLitellmAsyncHandler(
114114
api_url="https://api.us-east.aws.tinybird.co",
115115
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
116116
datasource_name="litellm"
@@ -124,3 +124,45 @@ response = await acompletion(
124124
stream=True
125125
)
126126
```
127+
128+
This is the schema for the `litellm` data source:
129+
130+
```sql
131+
SCHEMA >
132+
`model` LowCardinality(String) `json:$.model` DEFAULT 'unknown',
133+
`messages` Array(Map(String, String)) `json:$.messages[:]` DEFAULT [],
134+
`user` String `json:$.user` DEFAULT 'unknown',
135+
`start_time` DateTime `json:$.start_time` DEFAULT now(),
136+
`end_time` DateTime `json:$.end_time` DEFAULT now(),
137+
`id` String `json:$.id` DEFAULT '',
138+
`stream` Boolean `json:$.stream` DEFAULT false,
139+
`call_type` LowCardinality(String) `json:$.call_type` DEFAULT 'unknown',
140+
`provider` LowCardinality(String) `json:$.provider` DEFAULT 'unknown',
141+
`api_key` String `json:$.api_key` DEFAULT '',
142+
`log_event_type` LowCardinality(String) `json:$.log_event_type` DEFAULT 'unknown',
143+
`llm_api_duration_ms` Float32 `json:$.llm_api_duration_ms` DEFAULT 0,
144+
`cache_hit` Boolean `json:$.cache_hit` DEFAULT false,
145+
`response_status` LowCardinality(String) `json:$.standard_logging_object_status` DEFAULT 'unknown',
146+
`response_time` Float32 `json:$.standard_logging_object_response_time` DEFAULT 0,
147+
`proxy_metadata` String `json:$.proxy_metadata` DEFAULT '',
148+
`organization` String `json:$.proxy_metadata.organization` DEFAULT '',
149+
`environment` String `json:$.proxy_metadata.environment` DEFAULT '',
150+
`project` String `json:$.proxy_metadata.project` DEFAULT '',
151+
`chat_id` String `json:$.proxy_metadata.chat_id` DEFAULT '',
152+
`response` String `json:$.response` DEFAULT '',
153+
`response_id` String `json:$.response.id`,
154+
`response_object` String `json:$.response.object` DEFAULT 'unknown',
155+
`response_choices` Array(String) `json:$.response.choices[:]` DEFAULT [],
156+
`completion_tokens` UInt16 `json:$.response.usage.completion_tokens` DEFAULT 0,
157+
`prompt_tokens` UInt16 `json:$.response.usage.prompt_tokens` DEFAULT 0,
158+
`total_tokens` UInt16 `json:$.response.usage.total_tokens` DEFAULT 0,
159+
`cost` Float32 `json:$.cost` DEFAULT 0,
160+
`exception` String `json:$.exception` DEFAULT '',
161+
`traceback` String `json:$.traceback` DEFAULT '',
162+
`duration` Float32 `json:$.duration` DEFAULT 0
163+
164+
165+
ENGINE MergeTree
166+
ENGINE_SORTING_KEY start_time, organization, project, model
167+
ENGINE_PARTITION_KEY toYYYYMM(start_time)
168+
```

examples/litellm.datasource

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
SCHEMA >
2+
`model` LowCardinality(String) `json:$.model` DEFAULT 'unknown',
3+
`messages` Array(Map(String, String)) `json:$.messages[:]` DEFAULT [],
4+
`user` String `json:$.user` DEFAULT 'unknown',
5+
`start_time` DateTime `json:$.start_time` DEFAULT now(),
6+
`end_time` DateTime `json:$.end_time` DEFAULT now(),
7+
`id` String `json:$.id` DEFAULT '',
8+
`stream` Boolean `json:$.stream` DEFAULT false,
9+
`call_type` LowCardinality(String) `json:$.call_type` DEFAULT 'unknown',
10+
`provider` LowCardinality(String) `json:$.provider` DEFAULT 'unknown',
11+
`api_key` String `json:$.api_key` DEFAULT '',
12+
`log_event_type` LowCardinality(String) `json:$.log_event_type` DEFAULT 'unknown',
13+
`llm_api_duration_ms` Float32 `json:$.llm_api_duration_ms` DEFAULT 0,
14+
`cache_hit` Boolean `json:$.cache_hit` DEFAULT false,
15+
`response_status` LowCardinality(String) `json:$.standard_logging_object_status` DEFAULT 'unknown',
16+
`response_time` Float32 `json:$.standard_logging_object_response_time` DEFAULT 0,
17+
`proxy_metadata` String `json:$.proxy_metadata` DEFAULT '',
18+
`organization` String `json:$.proxy_metadata.organization` DEFAULT '',
19+
`environment` String `json:$.proxy_metadata.environment` DEFAULT '',
20+
`project` String `json:$.proxy_metadata.project` DEFAULT '',
21+
`chat_id` String `json:$.proxy_metadata.chat_id` DEFAULT '',
22+
`response` String `json:$.response` DEFAULT '',
23+
`response_id` String `json:$.response.id`,
24+
`response_object` String `json:$.response.object` DEFAULT 'unknown',
25+
`response_choices` Array(String) `json:$.response.choices[:]` DEFAULT [],
26+
`completion_tokens` UInt16 `json:$.response.usage.completion_tokens` DEFAULT 0,
27+
`prompt_tokens` UInt16 `json:$.response.usage.prompt_tokens` DEFAULT 0,
28+
`total_tokens` UInt16 `json:$.response.usage.total_tokens` DEFAULT 0,
29+
`cost` Float32 `json:$.cost` DEFAULT 0,
30+
`exception` String `json:$.exception` DEFAULT '',
31+
`traceback` String `json:$.traceback` DEFAULT '',
32+
`duration` Float32 `json:$.duration` DEFAULT 0
33+
34+
35+
ENGINE MergeTree
36+
ENGINE_SORTING_KEY start_time, organization, project, model
37+
ENGINE_PARTITION_KEY toYYYYMM(start_time)

examples/litellm_async.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
import litellm
33
from litellm import acompletion
44
import os
5-
from tb.litellm.handler import TinybirdLitellmHandler
5+
from tb.litellm.handler import TinybirdLitellmAsyncHandler
66

77

88
async def main():
99
# Set up the handler
10-
customHandler = TinybirdLitellmHandler(
10+
customHandler = TinybirdLitellmAsyncHandler(
1111
api_url="https://api.us-east.aws.tinybird.co",
1212
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
1313
datasource_name="litellm",
@@ -20,6 +20,8 @@ async def main():
2020
model="gpt-3.5-turbo",
2121
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
2222
stream=True,
23+
user="test_user",
24+
metadata={"organization": "tinybird", "environment": "dev", "project": "litellm_test", "chat_id": "1234567890"}
2325
)
2426

2527
async for chunk in response:

examples/litellm_sync.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
import litellm
33
from litellm import completion
44
import os
5-
from tb.litellm.handler import TinybirdLitellmHandler
5+
from tb.litellm.handler import TinybirdLitellmSyncHandler
66

7-
customHandler = TinybirdLitellmHandler(
7+
customHandler = TinybirdLitellmSyncHandler(
88
api_url="https://api.us-east.aws.tinybird.co",
99
tinybird_token=os.getenv("TINYBIRD_TOKEN"),
1010
datasource_name="litellm",
@@ -14,9 +14,11 @@
1414

1515
print("Running synchronous example...")
1616
response = completion(
17-
model="gpt-3.5-turbo",
18-
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
17+
model="anthropic/claude-3-5-sonnet-latest",
18+
messages=[{"role": "user", "content": "Hi 👋 - i'm claude"}],
1919
stream=True,
20+
user="test_user",
21+
metadata={"organization": "tinybird", "environment": "dev", "project": "litellm_test", "chat_id": "1234567890"}
2022
)
2123

2224
for chunk in response:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "tinybird-python-sdk"
7-
version = "0.2.2"
7+
version = "0.3.0"
88
description = "Python SDK for Tinybird"
99
readme = "README.md"
1010
authors = [

tb/litellm/handler.py

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
try:
2+
import litellm
23
from litellm.integrations.custom_logger import CustomLogger
34
except ImportError:
45
raise ImportError(
@@ -37,62 +38,85 @@ def __init__(
3738
self.token = tinybird_token
3839
self.api_url = api_url
3940
self.datasource_name = datasource_name
41+
self.api_version = api_version
4042
self.api = Tinybird(
41-
token=self.token, api_url=self.api_url, version=api_version
43+
token=self.token, api_url=self.api_url, version=self.api_version
4244
)
4345
self.async_api = AsyncTinybird(
44-
token=self.token, api_url=self.api_url, version=api_version
46+
token=self.token, api_url=self.api_url, version=self.api_version
4547
)
4648

49+
def _extract_data(self, kwargs, response_obj, start_time, end_time):
50+
data = {
51+
"model": kwargs.get("model"),
52+
"messages": kwargs.get("messages"),
53+
"user": kwargs.get("user"),
54+
"start_time": start_time,
55+
"end_time": end_time,
56+
"id": kwargs.get("litellm_call_id"),
57+
"stream": kwargs.get("stream", False),
58+
"call_type": kwargs.get("call_type", "completion"),
59+
"provider": kwargs.get("custom_llm_provider"),
60+
"api_key": kwargs.get("api_key"),
61+
"log_event_type": kwargs.get("log_event_type"),
62+
"llm_api_duration_ms": kwargs.get("llm_api_duration_ms"),
63+
"response_headers": kwargs.get("response_headers", {}),
64+
"cache_hit": kwargs.get("cache_hit", False),
65+
"standard_logging_object_id": kwargs.get("standard_logging_object", {}).get("id"),
66+
"standard_logging_object_status": kwargs.get("standard_logging_object", {}).get("status"),
67+
"standard_logging_object_response_time": kwargs.get("standard_logging_object", {}).get
68+
("response_time"),
69+
"standard_logging_object_saved_cache_cost": kwargs.get("standard_logging_object", {}).get("saved_cache_cost"),
70+
"standard_logging_object_hidden_params": kwargs.get("standard_logging_object", {}).get("status"),
71+
}
72+
73+
# response = litellm.completion(model="gpt-3.5-turbo", messages=messages, metadata={"hello": "world"})
74+
litellm_params = kwargs.get("litellm_params", {})
75+
data["proxy_metadata"] = litellm_params.get("metadata", {})
76+
data["response"] = response_obj.json()
77+
data["cost"] = litellm.completion_cost(completion_response=response_obj)
78+
79+
data["exception"] = kwargs.get("exception", None)
80+
data["traceback"] = kwargs.get("traceback_exception", None)
81+
if isinstance(start_time, datetime) and isinstance(end_time, datetime):
82+
duration = (end_time - start_time).total_seconds()
83+
else:
84+
duration = (end_time - start_time)
85+
data["duration"] = duration
86+
return safe_json_dumps(data)
87+
88+
class TinybirdLitellmSyncHandler(TinybirdLitellmHandler):
89+
def __init__(self, *args, **kwargs):
90+
super().__init__(*args, **kwargs)
91+
4792
def log_success_event(self, kwargs, response_obj, start_time, end_time):
93+
data = self._extract_data(kwargs, response_obj, start_time, end_time)
4894
self.api.send(
4995
f"events?name={self.datasource_name}",
50-
data=safe_json_dumps(
51-
{
52-
"kwargs": kwargs,
53-
"response_obj": response_obj,
54-
"start_time": start_time,
55-
"end_time": end_time,
56-
}
57-
),
96+
data=data
5897
)
5998

6099
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
100+
data = self._extract_data(kwargs, response_obj, start_time, end_time)
61101
self.api.send(
62102
f"events?name={self.datasource_name}",
63-
data=safe_json_dumps(
64-
{
65-
"kwargs": kwargs,
66-
"response_obj": response_obj,
67-
"start_time": start_time,
68-
"end_time": end_time,
69-
}
70-
),
103+
data=data
71104
)
72105

73-
#### ASYNC #### - for acompletion/aembeddings
106+
class TinybirdLitellmAsyncHandler(TinybirdLitellmHandler):
107+
def __init__(self, *args, **kwargs):
108+
super().__init__(*args, **kwargs)
109+
74110
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
111+
data = self._extract_data(kwargs, response_obj, start_time, end_time)
75112
await self.async_api.send(
76113
f"events?name={self.datasource_name}",
77-
data=safe_json_dumps(
78-
{
79-
"kwargs": kwargs,
80-
"response_obj": response_obj,
81-
"start_time": start_time,
82-
"end_time": end_time,
83-
}
84-
),
85-
)
114+
data=data
115+
)
86116

87117
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
118+
data = self._extract_data(kwargs, response_obj, start_time, end_time)
88119
await self.async_api.send(
89120
f"events?name={self.datasource_name}",
90-
data=safe_json_dumps(
91-
{
92-
"kwargs": kwargs,
93-
"response_obj": response_obj,
94-
"start_time": start_time,
95-
"end_time": end_time,
96-
}
97-
),
121+
data=data
98122
)

0 commit comments

Comments
 (0)