Skip to content

Commit 0ecc413

Browse files
author
zhanglongbin
committed
Fix the bug of dataflow with ID #44
1 parent c0a6889 commit 0ecc413

1 file changed

Lines changed: 33 additions & 18 deletions

File tree

  • data_server/datasource/services/connectors

data_server/datasource/services/connectors/mongodb.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,48 @@
11
from pymongo import MongoClient
2-
from pymongo.errors import ConnectionFailure
2+
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
33
from data_server.datasource.schemas import DataSourceCreate
44

55
class MongoDBConnector:
66
def __init__(self, datasource:DataSourceCreate):
77
self.datasource = datasource
8+
# default_timeout_setting_milliseconds
9+
self.timeout_ms = 5000 # 5s
810

911
def test_connection(self):
12+
client = None
1013
try:
1114
host = self.datasource.host
1215
uri = host
13-
client = MongoClient(uri)
16+
client = MongoClient(
17+
uri,
18+
serverSelectionTimeoutMS=self.timeout_ms,
19+
connectTimeoutMS=self.timeout_ms,
20+
socketTimeoutMS=self.timeout_ms
21+
)
1422
client.server_info()
1523
return {"success": True, "message": "Connection successful"}
24+
except ServerSelectionTimeoutError as e:
25+
return {"error": False, "message": f"连接超时: {str(e)}"}
1626
except ConnectionFailure as e:
17-
return {"success": False, "message": str(e)}
27+
return {"error": False, "message": f"连接失败: {str(e)}"}
1828
except Exception as e:
19-
return {"success": False, "message": str(e)}
29+
return {"error": False, "message": f"未知错误: {str(e)}"}
30+
finally:
31+
if client:
32+
client.close()
2033

21-
def execute_query(self, query):
34+
def _get_client(self):
2235
host = self.datasource.host
2336
uri = host
24-
client = MongoClient(uri)
37+
return MongoClient(
38+
uri,
39+
serverSelectionTimeoutMS=self.timeout_ms,
40+
connectTimeoutMS=self.timeout_ms,
41+
socketTimeoutMS=self.timeout_ms
42+
)
43+
44+
def execute_query(self, query):
45+
client = self._get_client()
2546
db = client[self.datasource.database]
2647
try:
2748
collection = db[query['collection']]
@@ -40,19 +61,15 @@ def execute_query(self, query):
4061
client.close()
4162

4263
def get_tables(self):
43-
host = self.datasource.host
44-
uri = host
45-
client = MongoClient(uri)
64+
client = self._get_client()
4665
try:
4766
db = client[self.datasource.database]
4867
return db.list_collection_names()
4968
finally:
5069
client.close()
5170

5271
def get_tables_and_columns(self):
53-
host = self.datasource.host
54-
uri = host
55-
client = MongoClient(uri)
72+
client = self._get_client()
5673
try:
5774
db = client[self.datasource.database]
5875
collections = db.list_collection_names()
@@ -77,9 +94,7 @@ def get_collection_document_count(self, collection_name):
7794
:param collection_name: Name of the collection
7895
:return: Number of documents
7996
"""
80-
host = self.datasource.host
81-
uri = host
82-
client = MongoClient(uri)
97+
client = self._get_client()
8398
try:
8499
db = client[self.datasource.database]
85100
collection = db[collection_name]
@@ -102,16 +117,16 @@ def query_collection(self, collection_name: str, offset: int, limit: int) -> lis
102117
Returns:
103118
list: List of query results, where each element is a dictionary containing all fields and values of the documents in the collection
104119
"""
105-
host = self.datasource.host
106-
uri = host
107-
client = MongoClient(uri)
120+
client = self._get_client()
108121
try:
109122
db = client[self.datasource.database]
110123
collection = db[collection_name]
111124

112125
results = list(collection.find().skip(offset).limit(limit))
113126

114127
return results
128+
except ServerSelectionTimeoutError as e:
129+
raise ConnectionError(f"MongoDB连接超时: {str(e)}")
115130
except ConnectionFailure as e:
116131
raise ConnectionError(f"Failed to connect to MongoDB: {str(e)}")
117132
except Exception as e:

0 commit comments

Comments
 (0)