Skip to content

Commit 169cc86

Browse files
harishkotramohityadav766pmbrull
authored
Fixes #27549: Respect from/size pagination in /search/fieldQuery (#27590)
* Add scheduled workflow to sync fork main with upstream * Remove fork upstream sync workflow * Fix fieldQuery pagination by propagating from and size * Avoid truncating ingestion pipelines in DataInsights lookup --------- Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
1 parent 0fdb82d commit 169cc86

9 files changed

Lines changed: 53 additions & 20 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ public static String getLiveSearchIndex(String index) {
156156
*/
157157
private List<Map> getIngestionPipelineStatus(String serviceName) {
158158
List<Map> combinedStatus = new ArrayList<>();
159+
final int pageSize = 100;
160+
final int maxResults = 5000;
159161

160162
try {
161163
if (serviceName == null || serviceName.trim().isEmpty()) {
@@ -175,16 +177,27 @@ private List<Map> getIngestionPipelineStatus(String serviceName) {
175177
SearchClient searchClient = Entity.getSearchRepository().getSearchClient();
176178
if (searchClient != null) {
177179
try {
178-
// Search for ingestion pipelines with the service name
179-
var response =
180-
searchClient.searchByField(
181-
"service.name.keyword", serviceName, INGESTION_PIPELINE, false);
180+
// Search for ingestion pipelines with the service name, paging through all results.
181+
for (int from = 0; from < maxResults; from += pageSize) {
182+
var response =
183+
searchClient.searchByField(
184+
"service.name.keyword", serviceName, INGESTION_PIPELINE, false, from, pageSize);
185+
186+
if (response == null || response.getStatus() != 200) {
187+
break;
188+
}
182189

183-
if (response != null && response.getStatus() == 200) {
184-
// Parse the response to extract pipeline information
185190
String responseBody =
186191
(String) ((OutboundJaxrsResponse) response).getContext().getEntity();
187-
combinedStatus.addAll(parseIngestionPipelineResponse(responseBody));
192+
List<Map> pageStatuses = parseIngestionPipelineResponse(responseBody);
193+
if (pageStatuses.isEmpty()) {
194+
break;
195+
}
196+
197+
combinedStatus.addAll(pageStatuses);
198+
if (pageStatuses.size() < pageSize) {
199+
break;
200+
}
188201
}
189202
} catch (Exception e) {
190203
LOG.error("Error searching for ingestion pipelines for service: {}", serviceName, e);

openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,10 +584,18 @@ public Response searchByField(
584584
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
585585
@DefaultValue("false")
586586
@QueryParam("deleted")
587-
boolean deleted)
587+
boolean deleted,
588+
@Parameter(description = "From field to paginate the results, defaults to 0")
589+
@DefaultValue("0")
590+
@QueryParam("from")
591+
int from,
592+
@Parameter(description = "Size field to limit the no.of results returned, defaults to 10")
593+
@DefaultValue("10")
594+
@QueryParam("size")
595+
int size)
588596
throws IOException {
589597

590-
return searchRepository.searchByField(fieldName, fieldValue, index, deleted);
598+
return searchRepository.searchByField(fieldName, fieldValue, index, deleted, from, size);
591599
}
592600

593601
@GET

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchManagementClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,13 @@ Response previewSearch(
6767
* @param fieldValue the value to match (supports wildcards)
6868
* @param index the index to search in
6969
* @param deleted whether to include deleted entities
70+
* @param from starting position for pagination
71+
* @param size maximum number of results to return
7072
* @return response containing matching entities
7173
* @throws IOException if search execution fails
7274
*/
73-
Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
75+
Response searchByField(
76+
String fieldName, String fieldValue, String index, Boolean deleted, int from, int size)
7477
throws IOException;
7578

7679
/**

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2927,9 +2927,10 @@ public SearchLineageResult searchLineageForExport(
29272927
.withIsConnectedVia(isConnectedVia(entityType)));
29282928
}
29292929

2930-
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
2930+
public Response searchByField(
2931+
String fieldName, String fieldValue, String index, Boolean deleted, int from, int size)
29312932
throws IOException {
2932-
return searchClient.searchByField(fieldName, fieldValue, index, deleted);
2933+
return searchClient.searchByField(fieldName, fieldValue, index, deleted, from, size);
29332934
}
29342935

29352936
public Response aggregate(AggregationRequest request) throws IOException {

openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,9 +461,10 @@ public Response searchBySourceUrl(String sourceUrl) throws IOException {
461461
}
462462

463463
@Override
464-
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
464+
public Response searchByField(
465+
String fieldName, String fieldValue, String index, Boolean deleted, int from, int size)
465466
throws IOException {
466-
return searchManager.searchByField(fieldName, fieldValue, index, deleted);
467+
return searchManager.searchByField(fieldName, fieldValue, index, deleted, from, size);
467468
}
468469

469470
@Override

openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ public Response searchBySourceUrl(String sourceUrl) throws IOException {
169169
}
170170

171171
@Override
172-
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
172+
public Response searchByField(
173+
String fieldName, String fieldValue, String index, Boolean deleted, int from, int size)
173174
throws IOException {
174175
if (!isClientAvailable) {
175176
throw new IOException("Elasticsearch client is not available");
@@ -179,6 +180,8 @@ public Response searchByField(String fieldName, String fieldValue, String index,
179180
SearchRequest.of(
180181
s ->
181182
s.index(Entity.getSearchRepository().getIndexOrAliasName(index))
183+
.from(from)
184+
.size(size)
182185
.query(
183186
q ->
184187
q.bool(

openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,10 @@ public Response searchSchemaEntityRelationship(
447447
}
448448

449449
@Override
450-
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
450+
public Response searchByField(
451+
String fieldName, String fieldValue, String index, Boolean deleted, int from, int size)
451452
throws IOException {
452-
return searchManager.searchByField(fieldName, fieldValue, index, deleted);
453+
return searchManager.searchByField(fieldName, fieldValue, index, deleted, from, size);
453454
}
454455

455456
@Override

openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,8 @@ public Response searchBySourceUrl(String sourceUrl) throws IOException {
189189
}
190190

191191
@Override
192-
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
192+
public Response searchByField(
193+
String fieldName, String fieldValue, String index, Boolean deleted, int from, int size)
193194
throws IOException {
194195
if (!isClientAvailable) {
195196
throw new IOException("OpenSearch client is not available");
@@ -199,6 +200,8 @@ public Response searchByField(String fieldName, String fieldValue, String index,
199200
SearchRequest.of(
200201
s ->
201202
s.index(Entity.getSearchRepository().getIndexOrAliasName(index))
203+
.from(from)
204+
.size(size)
202205
.query(
203206
q ->
204207
q.bool(

openmetadata-service/src/test/java/org/openmetadata/service/search/SearchRepositoryBehaviorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2429,7 +2429,7 @@ void reportingWrappersDelegateToSearchClient() throws IOException {
24292429
.SearchSchemaEntityRelationshipResult();
24302430

24312431
when(filter.getCondition(Entity.TABLE)).thenReturn("deleted = false");
2432-
when(searchClient.searchByField("name", "orders", "table", false)).thenReturn(response);
2432+
when(searchClient.searchByField("name", "orders", "table", false, 0, 10)).thenReturn(response);
24332433
when(searchClient.aggregate("query", Entity.TABLE, searchAggregation, "deleted = false"))
24342434
.thenReturn(aggregationResult);
24352435
when(searchClient.genericAggregation("query", "table", searchAggregation)).thenReturn(report);
@@ -2449,7 +2449,7 @@ void reportingWrappersDelegateToSearchClient() throws IOException {
24492449
when(searchClient.getSchemaEntityRelationship("svc.db.schema", "{}", "*", 1, 2, 3, 4, false))
24502450
.thenReturn(schemaResult);
24512451

2452-
assertSame(response, repository.searchByField("name", "orders", "table", false));
2452+
assertSame(response, repository.searchByField("name", "orders", "table", false, 0, 10));
24532453
assertSame(
24542454
aggregationResult, repository.aggregate("query", Entity.TABLE, searchAggregation, filter));
24552455
assertSame(report, repository.genericAggregation("query", "table", searchAggregation));

0 commit comments

Comments
 (0)