diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/DatabaseServiceResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/DatabaseServiceResourceIT.java index df2fbbd2c35d..2c7433c58a5c 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/DatabaseServiceResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/DatabaseServiceResourceIT.java @@ -979,4 +979,259 @@ private String addDomainsToTableRow(String csvLine, String newDomains) { } return String.join(",", parts); } + + @Test + void test_recursiveImportCustomPropertyExtension(TestNamespace ns) + throws IOException, InterruptedException { + String propName = ns.prefix("potato"); + String serverUrl = SdkClients.getServerUrl(); + String token = SdkClients.getAdminToken(); + com.fasterxml.jackson.databind.ObjectMapper mapper = + new com.fasterxml.jackson.databind.ObjectMapper(); + HttpClient client = HttpClient.newHttpClient(); + + HttpRequest getStringTypeReq = + HttpRequest.newBuilder() + .uri(URI.create(serverUrl + "/v1/metadata/types/name/string")) + .header("Authorization", "Bearer " + token) + .GET() + .build(); + HttpResponse stringTypeResp = + client.send(getStringTypeReq, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, stringTypeResp.statusCode(), "Should fetch string type"); + + HttpRequest getTableTypeReq = + HttpRequest.newBuilder() + .uri(URI.create(serverUrl + "/v1/metadata/types/name/table")) + .header("Authorization", "Bearer " + token) + .GET() + .build(); + HttpResponse tableTypeResp = + client.send(getTableTypeReq, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, tableTypeResp.statusCode(), "Should fetch table type"); + + com.fasterxml.jackson.databind.JsonNode stringTypeNode = mapper.readTree(stringTypeResp.body()); + com.fasterxml.jackson.databind.JsonNode tableTypeNode = mapper.readTree(tableTypeResp.body()); + String tableTypeId = tableTypeNode.get("id").asText(); + + java.util.Map propertyTypeRef = + java.util.Map.of( + "id", stringTypeNode.get("id").asText(), + "type", "type", + "name", stringTypeNode.get("name").asText(), + "fullyQualifiedName", stringTypeNode.get("fullyQualifiedName").asText()); + String customPropertyBody = + mapper.writeValueAsString( + java.util.Map.of( + "name", + propName, + "description", + "Test extension property for recursive import", + "propertyType", + propertyTypeRef)); + + HttpRequest registerPropReq = + HttpRequest.newBuilder() + .uri(URI.create(serverUrl + "/v1/metadata/types/" + tableTypeId)) + .header("Authorization", "Bearer " + token) + .header("Content-Type", "application/json") + .PUT(HttpRequest.BodyPublishers.ofString(customPropertyBody)) + .build(); + HttpResponse registerResp = + client.send(registerPropReq, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, registerResp.statusCode(), "Should register custom property on table type"); + + try { + DatabaseService service = + createEntity(createMinimalRequest(ns).withName(ns.prefix("ext_svc"))); + Database database = + SdkClients.adminClient() + .databases() + .create( + new CreateDatabase() + .withName(ns.prefix("ext_db")) + .withService(service.getFullyQualifiedName())); + DatabaseSchema schema = + SdkClients.adminClient() + .databaseSchemas() + .create( + new CreateDatabaseSchema() + .withName(ns.prefix("ext_schema")) + .withDatabase(database.getFullyQualifiedName())); + + String tableName = ns.prefix("ext_tbl"); + String tableFqn = schema.getFullyQualifiedName() + "." + tableName; + + String validCsv = + buildRecursiveCsv( + database, schema, tableName, tableFqn, "", propName + ":s3://bucket/file.csv"); + CsvImportResult validResult = + importCsvRecursive(service.getFullyQualifiedName(), validCsv, true); + assertEquals(ApiStatus.SUCCESS, validResult.getStatus(), validResult.getImportResultsCsv()); + assertEquals(0, validResult.getNumberOfRowsFailed()); + assertEquals(3, validResult.getNumberOfRowsProcessed()); + assertEquals(3, validResult.getNumberOfRowsPassed()); + + String badExtCsv = + buildRecursiveCsv( + database, schema, tableName, tableFqn, "", "unknown_prop_xyz_test:somevalue"); + CsvImportResult badResult = + importCsvRecursive(service.getFullyQualifiedName(), badExtCsv, true); + assertEquals(ApiStatus.PARTIAL_SUCCESS, badResult.getStatus()); + assertEquals(1, badResult.getNumberOfRowsFailed()); + assertEquals(3, badResult.getNumberOfRowsProcessed()); + assertEquals(2, badResult.getNumberOfRowsPassed()); + + String dedupCsv = + buildRecursiveCsv( + database, + schema, + tableName, + tableFqn, + "invalidownerformat", + "unknown_prop_xyz_test:somevalue"); + CsvImportResult dedupResult = + importCsvRecursive(service.getFullyQualifiedName(), dedupCsv, true); + assertEquals( + 1, + dedupResult.getNumberOfRowsFailed(), + "Multi-field failure on one row must count as 1 failed row"); + + } finally { + removeCustomPropertyFromType(tableTypeId, propName, token); + } + } + + private String buildRecursiveCsv( + Database database, + DatabaseSchema schema, + String tableName, + String tableFqn, + String tableOwner, + String tableExtension) { + String header = + "name*,displayName,description,owner,tags,glossaryTerms,tiers,certification," + + "retentionPeriod,sourceUrl,domains,extension,entityType*,fullyQualifiedName," + + "column.dataTypeDisplay,column.dataType,column.arrayDataType,column.dataLength," + + "storedProcedure.code,storedProcedure.language"; + String dbRow = + csvRow( + database.getName(), + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "database", + database.getFullyQualifiedName(), + "", + "", + "", + "", + "", + ""); + String schemaRow = + csvRow( + schema.getName(), + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "databaseSchema", + schema.getFullyQualifiedName(), + "", + "", + "", + "", + "", + ""); + String tableRow = + csvRow( + tableName, + "", + "", + tableOwner, + "", + "", + "", + "", + "", + "", + "", + tableExtension, + "table", + tableFqn, + "", + "", + "", + "", + "", + ""); + return header + "\n" + dbRow + "\n" + schemaRow + "\n" + tableRow + "\n"; + } + + private void removeCustomPropertyFromType(String typeId, String propName, String token) + throws IOException, InterruptedException { + com.fasterxml.jackson.databind.ObjectMapper localMapper = + new com.fasterxml.jackson.databind.ObjectMapper(); + HttpClient client = HttpClient.newHttpClient(); + String baseUrl = SdkClients.getServerUrl(); + String getUrl = baseUrl + "/v1/metadata/types/" + typeId + "?fields=customProperties"; + HttpRequest getReq = + HttpRequest.newBuilder() + .uri(URI.create(getUrl)) + .header("Authorization", "Bearer " + token) + .GET() + .build(); + HttpResponse getResp = client.send(getReq, HttpResponse.BodyHandlers.ofString()); + if (getResp.statusCode() != 200) { + return; + } + com.fasterxml.jackson.databind.JsonNode typeNode = localMapper.readTree(getResp.body()); + com.fasterxml.jackson.databind.JsonNode customProps = typeNode.get("customProperties"); + if (customProps == null || !customProps.isArray()) { + return; + } + for (int i = 0; i < customProps.size(); i++) { + if (propName.equals(customProps.get(i).path("name").asText())) { + String patchBody = "[{\"op\":\"remove\",\"path\":\"/customProperties/" + i + "\"}]"; + HttpRequest patchReq = + HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/v1/metadata/types/" + typeId)) + .header("Authorization", "Bearer " + token) + .header("Content-Type", "application/json-patch+json") + .method("PATCH", HttpRequest.BodyPublishers.ofString(patchBody)) + .build(); + client.send(patchReq, HttpResponse.BodyHandlers.ofString()); + break; + } + } + } + + private String csvRow(String... fields) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fields.length; i++) { + if (i > 0) sb.append(","); + String field = fields[i]; + if (field.contains(",") || field.contains("\"") || field.contains("\n")) { + sb.append('"').append(field.replace("\"", "\"\"")).append('"'); + } else { + sb.append(field); + } + } + return sb.toString(); + } } diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryResourceIT.java index 61a31f482257..e6bafbe091d1 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryResourceIT.java @@ -912,11 +912,8 @@ void test_bulkImportGlossaryTermsIncrementsVersion(TestNamespace ns) { importResult = JsonUtils.readValue(result, CsvImportResult.class); assertNotNull(importResult, "Should parse CsvImportResult from response"); assertEquals(ApiStatus.SUCCESS, importResult.getStatus(), "Import should succeed"); - // numberOfRowsProcessed = header row (1) + 3 data rows = 4 - assertEquals( - 4, importResult.getNumberOfRowsProcessed(), "Should process 4 rows (header + 3 data)"); - assertEquals( - 4, importResult.getNumberOfRowsPassed(), "All 4 rows should pass (header + 3 data)"); + assertEquals(3, importResult.getNumberOfRowsProcessed(), "Should process 3 data rows"); + assertEquals(3, importResult.getNumberOfRowsPassed(), "All 3 data rows should pass"); assertEquals(0, importResult.getNumberOfRowsFailed(), "No rows should fail"); assertFalse(importResult.getDryRun(), "Should not be a dry run"); } catch (Exception e) { diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TestCaseResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TestCaseResourceIT.java index 43605434bbc5..916040ad2e0a 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TestCaseResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TestCaseResourceIT.java @@ -2838,13 +2838,13 @@ void test_importCsvWithWildcardName_multipleTablesSucceeds(TestNamespace ns) { // Dry run with name="*" should succeed CsvImportResult dryRunResult = importCsvWithWildcard(client, csvData, true); assertEquals(ApiStatus.SUCCESS, dryRunResult.getStatus()); - assertEquals(3, dryRunResult.getNumberOfRowsProcessed()); + assertEquals(2, dryRunResult.getNumberOfRowsProcessed()); // Actual import with name="*" — previously failed because // processChangeEventForBulkImport would call getByName("*") CsvImportResult result = importCsvWithWildcard(client, csvData, false); assertEquals(ApiStatus.SUCCESS, result.getStatus()); - assertEquals(3, result.getNumberOfRowsProcessed()); + assertEquals(2, result.getNumberOfRowsProcessed()); // Verify test cases created on different tables TestCase tc1 = @@ -2905,7 +2905,7 @@ void test_importCsvWithWildcardName_explicitTestSuiteTracked(TestNamespace ns) { CsvImportResult result = importCsvWithWildcard(client, csvData, false); assertEquals(ApiStatus.SUCCESS, result.getStatus()); - assertEquals(2, result.getNumberOfRowsProcessed()); + assertEquals(1, result.getNumberOfRowsProcessed()); TestCase imported = client.testCases().getByName(table.getFullyQualifiedName() + "." + testName, "testSuite"); @@ -2966,7 +2966,7 @@ void test_importCsvWithWildcardName_dryRunDoesNotCreateEntities(TestNamespace ns CsvImportResult dryRunResult = importCsvWithWildcard(client, csvData, true); assertEquals(ApiStatus.SUCCESS, dryRunResult.getStatus()); - assertEquals(2, dryRunResult.getNumberOfRowsProcessed()); + assertEquals(1, dryRunResult.getNumberOfRowsProcessed()); // Entity should NOT exist after dry run String expectedFqn = table.getFullyQualifiedName() + "." + testName; diff --git a/openmetadata-service/src/main/java/org/openmetadata/csv/CsvUtil.java b/openmetadata-service/src/main/java/org/openmetadata/csv/CsvUtil.java index e5b0f662c20d..cde9f2c2fd8a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/csv/CsvUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/csv/CsvUtil.java @@ -353,12 +353,12 @@ public static List addExtension(List csvRecord, Object extension String extensionString = extensionMap.entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), formatValue(entry.getValue()))) + .filter(entry -> !entry.getValue().isBlank()) .map( - entry -> { - String key = entry.getKey(); - Object value = entry.getValue(); - return CsvUtil.quoteCsvField(key + ENTITY_TYPE_SEPARATOR + formatValue(value)); - }) + entry -> + CsvUtil.quoteCsvField( + entry.getKey() + ENTITY_TYPE_SEPARATOR + entry.getValue())) .collect(Collectors.joining(FIELD_SEPARATOR)); csvRecord.add(extensionString); diff --git a/openmetadata-service/src/main/java/org/openmetadata/csv/EntityCsv.java b/openmetadata-service/src/main/java/org/openmetadata/csv/EntityCsv.java index 5d9096a00b76..031671480764 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/csv/EntityCsv.java +++ b/openmetadata-service/src/main/java/org/openmetadata/csv/EntityCsv.java @@ -141,6 +141,8 @@ public abstract class EntityCsv { protected final Map dryRunCreatedEntities = new HashMap<>(); protected final String importedBy; protected int recordIndex = 0; + protected String rowEntityType = null; + private final Set countedFailureRecords = new HashSet<>(); // Queue for batching entity creates/updates - processed after each batch of CSV records protected final List pendingEntityOperations = new ArrayList<>(); @@ -223,7 +225,6 @@ public final CsvImportResult importCsv( if (!validateHeaders(records.get(recordIndex++))) { return importResult; } - importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); int totalRows = records.size() - 1; // Exclude header row int batchNumber = 0; @@ -603,10 +604,13 @@ public Map getExtension(CSVPrinter printer, CSVRecord csvRecord, String key = extensions.substring(0, separatorIndex); String value = extensions.substring(separatorIndex + 1); - if (key.isEmpty() || value.isEmpty()) { + if (key.isEmpty()) { deferredFailure(csvRecord, invalidExtension(fieldNumber, key, value)); return null; } + if (value.isEmpty()) { + continue; + } extensionMap.put(key, value); } @@ -614,20 +618,26 @@ public Map getExtension(CSVPrinter printer, CSVRecord csvRecord, return extensionMap; } + private String currentEntityType() { + return rowEntityType != null ? rowEntityType : entityType; + } + private void validateExtension( CSVPrinter printer, int fieldNumber, CSVRecord csvRecord, Map extensionMap) throws IOException { + String effectiveEntityType = currentEntityType(); for (Map.Entry entry : extensionMap.entrySet()) { String fieldName = entry.getKey(); Object fieldValue = entry.getValue(); - Schema jsonSchema = TypeRegistry.instance().getSchema(entityType, fieldName); + Schema jsonSchema = TypeRegistry.instance().getSchema(effectiveEntityType, fieldName); if (jsonSchema == null) { deferredFailure(csvRecord, invalidCustomPropertyKey(fieldNumber, fieldName)); return; } - String customPropertyType = TypeRegistry.getCustomPropertyType(entityType, fieldName); - String propertyConfig = TypeRegistry.getCustomPropertyConfig(entityType, fieldName); + String customPropertyType = + TypeRegistry.getCustomPropertyType(effectiveEntityType, fieldName); + String propertyConfig = TypeRegistry.getCustomPropertyConfig(effectiveEntityType, fieldName); switch (customPropertyType) { case "entityReference", "entityReferenceList" -> { @@ -979,10 +989,10 @@ private List padOrTrimColumns(List row) { } private boolean validateHeaders(CSVRecord csvRecord) { - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); if (expectedHeaders.equals(csvRecord.toList())) { return true; } + importResult.withNumberOfRowsProcessed(1); importResult.withNumberOfRowsFailed(1); documentFailure(invalidHeader(recordToString(expectedHeaders), recordToString(csvRecord))); return false; @@ -1089,7 +1099,7 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en } } catch (Exception ex) { pendingCsvResults.put(csvRecord, ex.getMessage()); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); importResult.setStatus(ApiStatus.FAILURE); return; @@ -1097,7 +1107,7 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en if (Response.Status.CREATED.equals(responseStatus)) { pendingCsvResults.put(csvRecord, ENTITY_CREATED); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); // For dry run, count as passed immediately since no batch operations occur // For actual import, will be counted after successful batch operations if (Boolean.TRUE.equals(importResult.getDryRun())) { @@ -1105,7 +1115,7 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en } } else { pendingCsvResults.put(csvRecord, ENTITY_UPDATED); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); // For dry run, count as passed immediately since no batch operations occur // For actual import, will be counted after successful batch operations if (Boolean.TRUE.equals(importResult.getDryRun())) { @@ -1180,7 +1190,7 @@ protected void createEntity( } } catch (Exception ex) { pendingCsvResults.put(csvRecord, ex.getMessage()); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); importResult.setStatus(ApiStatus.FAILURE); return; @@ -1188,7 +1198,7 @@ protected void createEntity( if (Response.Status.CREATED.equals(responseStatus)) { pendingCsvResults.put(csvRecord, ENTITY_CREATED); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); // For dry run, count as passed immediately since no batch operations occur // For actual import, will be counted after successful batch operations if (Boolean.TRUE.equals(importResult.getDryRun())) { @@ -1196,7 +1206,7 @@ protected void createEntity( } } else { pendingCsvResults.put(csvRecord, ENTITY_UPDATED); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); // For dry run, count as passed immediately since no batch operations occur // For actual import, will be counted after successful batch operations if (Boolean.TRUE.equals(importResult.getDryRun())) { @@ -1507,7 +1517,7 @@ protected void createUserEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, .submit(() -> createChangeEventForUserAndUpdateInES(response, importedBy)); } catch (Exception ex) { pendingCsvResults.put(csvRecord, ex.getMessage()); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); importResult.setStatus(ApiStatus.FAILURE); return; @@ -1864,7 +1874,7 @@ protected void createColumnEntity(CSVPrinter printer, CSVRecord csvRecord, Strin // Count this row as processed and passed - it will be persisted with the table if (processRecord) { pendingCsvResults.put(csvRecord, ENTITY_UPDATED); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); } return; @@ -1969,7 +1979,7 @@ protected void createColumnEntity(CSVPrinter printer, CSVRecord csvRecord, Strin tableContext.csvRecords.add(csvRecord); // Queue result for later - actual success/failure determined after batch patch pendingCsvResults.put(csvRecord, ENTITY_UPDATED); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); } } @@ -2243,15 +2253,17 @@ protected void importSuccess(CSVPrinter printer, CSVRecord inputRecord, String s List recordList = listOf(IMPORT_SUCCESS, successDetails); recordList.addAll(inputRecord.toList()); printer.printRecord(recordList); - importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber()); + importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber() - 1); importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); } /** Helper method for deferred error handling to maintain CSV record ordering */ private void deferredFailure(CSVRecord csvRecord, String errorMessage) { pendingCsvResults.put(csvRecord, errorMessage); - importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber()); - importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); + importResult.withNumberOfRowsProcessed((int) csvRecord.getRecordNumber() - 1); + if (countedFailureRecords.add(csvRecord.getRecordNumber())) { + importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); + } importResult.setStatus(ApiStatus.FAILURE); processRecord = false; } @@ -2261,8 +2273,10 @@ protected void importFailure(CSVPrinter printer, String failedReason, CSVRecord List recordList = listOf(IMPORT_FAILED, failedReason); recordList.addAll(inputRecord.toList()); printer.printRecord(recordList); - importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber()); - importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); + importResult.withNumberOfRowsProcessed((int) inputRecord.getRecordNumber() - 1); + if (countedFailureRecords.add(inputRecord.getRecordNumber())) { + importResult.withNumberOfRowsFailed(importResult.getNumberOfRowsFailed() + 1); + } processRecord = false; } @@ -2298,7 +2312,6 @@ public CsvImportResult importCsv( if (!validateHeaders(records.get(recordIndex++))) { return importResult; } - importResult.withNumberOfRowsPassed(importResult.getNumberOfRowsPassed() + 1); int totalRows = records.size() - 1; // Exclude header row int batchNumber = 0; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java index ff7edbc59f9f..f152e3847d7b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java @@ -682,6 +682,7 @@ protected void createEntityWithRecursion(CSVPrinter printer, List csv String entityType = csvRecord.size() > 12 ? csvRecord.get(12) : DATABASE_SCHEMA; String entityFQN = csvRecord.size() > 13 ? StringEscapeUtils.unescapeCsv(csvRecord.get(13)) : null; + rowEntityType = entityType; if (DATABASE_SCHEMA.equals(entityType)) { createSchemaEntity(printer, csvRecord, entityFQN); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java index c0d318d044ed..a929b4d94ecc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java @@ -828,6 +828,7 @@ protected void createEntityWithRecursion(CSVPrinter printer, List csv // Get entityType and fullyQualifiedName if provided String entityType = csvRecord.size() > 12 ? csvRecord.get(12) : TABLE; String entityFQN = csvRecord.size() > 13 ? csvRecord.get(13) : null; + rowEntityType = entityType; if (TABLE.equals(entityType)) { createTableEntity(printer, csvRecord, entityFQN); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseServiceRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseServiceRepository.java index c07ab5e7bb01..de18061482bb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseServiceRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseServiceRepository.java @@ -298,10 +298,14 @@ protected void createEntityWithoutRecursion(CSVPrinter printer, List protected void createEntityWithRecursion(CSVPrinter printer, List csvRecords) throws IOException { CSVRecord csvRecord = getNextRecord(printer, csvRecords); + if (csvRecord == null) { + return; + } // Get entityType and fullyQualifiedName if provided String entityType = csvRecord.size() > 12 ? csvRecord.get(12) : DATABASE; String entityFQN = csvRecord.size() > 13 ? csvRecord.get(13) : null; + rowEntityType = entityType; if (DATABASE.equals(entityType)) { createDatabaseEntity(printer, csvRecord, entityFQN); diff --git a/openmetadata-service/src/test/java/org/openmetadata/csv/CsvUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/csv/CsvUtilTest.java index c76b9bb79ecb..74b2b8d8a66e 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/csv/CsvUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/csv/CsvUtilTest.java @@ -14,6 +14,9 @@ package org.openmetadata.csv; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.openmetadata.common.utils.CommonUtil.listOf; import com.fasterxml.jackson.databind.JsonNode; @@ -21,7 +24,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Test; import org.openmetadata.schema.entity.type.CustomProperty; import org.openmetadata.schema.type.EntityReference; @@ -107,6 +112,137 @@ void testAddRecord() { assertEquals(expectedRecord, CsvUtil.addExtension(actualRecord, jsonNode)); } + @Test + void testFormattingAndSplitHelpersHandleQuotedAndBlankFields() { + assertEquals("", CsvUtil.recordToString((List) null)); + assertEquals( + "plain,\"with,comma\",\"with;semicolon\"", + CsvUtil.recordToString(List.of("plain", "with,comma", "with;semicolon"))); + assertEquals("alpha,beta", CsvUtil.recordToString(new String[] {"alpha", "beta"})); + assertEquals("\"quoted\"", CsvUtil.quote("quoted")); + assertEquals( + "plain;\"needs,quote\";\"needs;quote\"", + CsvUtil.quoteField(List.of("plain", "needs,quote", "needs;quote"))); + + List booleanRecord = new ArrayList<>(); + CsvUtil.addField(booleanRecord, Boolean.TRUE); + CsvUtil.addField(booleanRecord, (Boolean) null); + assertEquals(List.of("true", ""), booleanRecord); + + assertEquals(List.of("one", "two", "three"), CsvUtil.fieldToStrings(" one ; two ; three ")); + assertNull(CsvUtil.fieldToStrings(" ")); + + assertEquals(List.of("user", "alice"), CsvUtil.fieldToEntities(" user : alice ")); + assertNull(CsvUtil.fieldToEntities(null)); + + assertEquals( + List.of("one", "two", "three"), CsvUtil.fieldToInternalArray(" one | two | three ")); + assertEquals(List.of(), CsvUtil.fieldToInternalArray(" ")); + } + + @Test + void testFieldParsersRespectQuotedDelimiters() throws Exception { + assertEquals( + List.of("key1:value1", "key2:value2", "key3:value;with;semicolon"), + CsvUtil.fieldToExtensionStrings("key1:value1;key2:value2;\"key3:value;with;semicolon\"")); + assertEquals(List.of(), CsvUtil.fieldToExtensionStrings(" ")); + + assertEquals( + List.of("value1", "value2", "value,with,comma"), + CsvUtil.fieldToColumns("value1,value2,\"value,with,comma\"")); + assertEquals(List.of(), CsvUtil.fieldToColumns(" ")); + } + + @Test + void testTagAndReferenceExportHelpersFilterExpectedValues() { + List tags = + List.of( + new TagLabel() + .withTagFQN("PersonalData.Personal") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.MANUAL), + new TagLabel() + .withTagFQN("Tier.Tier1") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.MANUAL), + new TagLabel() + .withTagFQN("Glossary.Term") + .withSource(TagLabel.TagSource.GLOSSARY) + .withLabelType(TagLabel.LabelType.MANUAL), + new TagLabel() + .withTagFQN("Ignore.Derived") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.DERIVED)); + + List csvRecord = new ArrayList<>(); + CsvUtil.addTagLabels(csvRecord, tags); + CsvUtil.addGlossaryTerms(csvRecord, tags); + CsvUtil.addTagTiers(csvRecord, tags); + CsvUtil.addOwners( + csvRecord, + List.of( + new EntityReference().withType("user").withName("alice"), + new EntityReference().withType("team").withName("engineering"))); + CsvUtil.addDomains( + csvRecord, + List.of( + new EntityReference().withFullyQualifiedName("finance"), + new EntityReference().withFullyQualifiedName("marketing"))); + CsvUtil.addReviewers( + csvRecord, List.of(new EntityReference().withType("user").withName("bob"))); + + assertEquals( + List.of( + "PersonalData.Personal", + "Glossary.Term", + "Tier.Tier1", + "user:alice;team:engineering", + "finance;marketing", + "user:bob"), + csvRecord); + } + + @Test + void testAddExtensionFormatsStructuredValues() { + LinkedHashMap extension = new LinkedHashMap<>(); + extension.put("owner", Map.of("type", "user", "fullyQualifiedName", "alice")); + extension.put("window", Map.of("start", 100, "end", 200)); + extension.put( + "matrix", + Map.of( + "columns", + List.of("name", "value"), + "rows", + List.of( + Map.of("name", "alpha", "value", "beta"), + Map.of("name", "gamma", "value", "delta,with,comma")))); + extension.put( + "reviewers", + List.of( + Map.of("type", "user", "fullyQualifiedName", "alice"), + Map.of("type", "team", "fullyQualifiedName", "engineering"))); + extension.put("options", List.of("one", "two")); + extension.put("empty", List.of()); + extension.put("blank", " "); + extension.put("count", 5); + extension.put("metadata", Map.of("key", "value")); + + List csvRecord = new ArrayList<>(); + CsvUtil.addExtension(csvRecord, extension); + + String extensionField = csvRecord.get(0); + assertTrue(extensionField.contains("owner:user:alice")); + assertTrue(extensionField.contains("window:100:200")); + assertTrue(extensionField.contains("reviewers:user:alice|team:engineering")); + assertTrue(extensionField.contains("options:one|two")); + assertFalse(extensionField.contains("empty")); + assertFalse(extensionField.contains("blank")); + assertTrue(extensionField.contains("count:5")); + assertTrue(extensionField.contains("metadata:{key=value}")); + assertTrue(extensionField.contains("matrix:alpha,beta|gamma")); + assertTrue(extensionField.contains("delta,with,comma")); + } + public static void assertCsv(String expectedCsv, String actualCsv) { // Break a csv text into records, sort it and compare List expectedCsvRecords = listOf(expectedCsv.split(CsvUtil.LINE_SEPARATOR)); diff --git a/openmetadata-service/src/test/java/org/openmetadata/csv/EntityCsvTest.java b/openmetadata-service/src/test/java/org/openmetadata/csv/EntityCsvTest.java index b173e36d649d..6c4357c1b0f5 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/csv/EntityCsvTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/csv/EntityCsvTest.java @@ -173,8 +173,8 @@ void test_importCsvWithProgressCallback() throws IOException { CsvImportResult importResult = testCsv.importCsv(csv, true, callback); - // 4 rows: 1 header + 3 data rows (numberOfRowsProcessed = last record number = 4) - assertSummary(importResult, ApiStatus.SUCCESS, 4, 4, 0); + // 3 data rows (header excluded from counts) + assertSummary(importResult, ApiStatus.SUCCESS, 3, 3, 0); assertTrue(callbackCount.get() >= 1, "Callback should be called at least once"); assertFalse(progressValues.isEmpty(), "Progress values should be recorded"); assertEquals(3, totalValues.get(0), "Total rows should be 3 (excluding header)"); @@ -202,8 +202,8 @@ void test_importCsvWithProgressCallback_multipleBatches() throws IOException { CsvImportResult importResult = testCsv.importCsv(csv, true, callback); - // numberOfRowsProcessed = header row (1) + totalRecords data rows - int expectedRowsProcessed = totalRecords + 1; + // numberOfRowsProcessed = data rows only (header excluded) + int expectedRowsProcessed = totalRecords; assertSummary(importResult, ApiStatus.SUCCESS, expectedRowsProcessed, expectedRowsProcessed, 0); assertEquals(2, callbackCount.get(), "Callback should be called twice for 2 batches"); assertEquals(1, batchNumbers.get(0), "First batch number should be 1"); @@ -242,8 +242,8 @@ void test_importCsvWithNullCallback() throws IOException { TestCsv testCsv = new TestCsv(); CsvImportResult importResult = testCsv.importCsv(csv, true, null); - // 2 rows: 1 header + 1 data row - assertSummary(importResult, ApiStatus.SUCCESS, 2, 2, 0); + // 1 data row (header excluded from counts) + assertSummary(importResult, ApiStatus.SUCCESS, 1, 1, 0); } private static class TestCsv extends EntityCsv { diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkEditEntity.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkEditEntity.spec.ts index 19d6e1605b79..b0fe8982d5e2 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkEditEntity.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkEditEntity.spec.ts @@ -166,8 +166,8 @@ test.describe('Bulk Edit Entity', () => { await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); @@ -315,8 +315,8 @@ test.describe('Bulk Edit Entity', () => { await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); @@ -460,8 +460,8 @@ test.describe('Bulk Edit Entity', () => { await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); const updateButtonResponse = page.waitForResponse( @@ -582,8 +582,7 @@ test.describe('Bulk Edit Entity', () => { .press('ArrowDown', { delay: 100 }); await page.click('[type="button"] >> text="Next"', { force: true }); - // total column count +1 for header row - const count = `${tableEntity.entityLinkColumnsName.length + 1}`; + const count = `${tableEntity.entityLinkColumnsName.length}`; await validateImportStatus(page, { passed: count, processed: count, @@ -691,8 +690,8 @@ test.describe('Bulk Edit Entity', () => { await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); @@ -835,8 +834,8 @@ test.describe('Bulk Edit Entity', () => { await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkImport.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkImport.spec.ts index e9a8891dae55..57a4bf18c087 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkImport.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/BulkImport.spec.ts @@ -354,8 +354,8 @@ test.describe('Bulk Import Export', () => { await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '7', - processed: '7', + passed: '6', + processed: '6', failed: '0', }); const rowStatus = [ @@ -557,8 +557,8 @@ test.describe('Bulk Import Export', () => { }); await validateImportStatus(page, { - passed: '13', - processed: '13', + passed: '12', + processed: '12', failed: '0', }); @@ -743,8 +743,8 @@ test.describe('Bulk Import Export', () => { await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '5', - processed: '5', + passed: '4', + processed: '4', failed: '0', }); @@ -829,8 +829,8 @@ test.describe('Bulk Import Export', () => { await fillColumnDetails(columnDetails2, page); await page.getByRole('button', { name: 'Next' }).click(); - // total column count +1 for header row and +2 for newly added columns - const count = `${tableEntity.entityLinkColumnsName.length + 3}`; + // total column count +2 for newly added columns + const count = `${tableEntity.entityLinkColumnsName.length + 2}`; await validateImportStatus(page, { passed: count, processed: count, @@ -918,8 +918,8 @@ test.describe('Bulk Import Export', () => { await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '9', - processed: '9', + passed: '8', + processed: '8', failed: '0', }); @@ -987,8 +987,8 @@ test.describe('Bulk Import Export', () => { await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '10', - processed: '10', + passed: '9', + processed: '9', failed: '0', }); diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataAssetRulesDisabled.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataAssetRulesDisabled.spec.ts index 99bc30584013..fa94e684fea6 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataAssetRulesDisabled.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataAssetRulesDisabled.spec.ts @@ -374,8 +374,8 @@ test.describe( await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); @@ -519,8 +519,8 @@ test.describe( await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); @@ -661,8 +661,8 @@ test.describe( await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '2', - processed: '2', + passed: '1', + processed: '1', failed: '0', }); const updateButtonResponse = page.waitForResponse( diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataQuality/TestCaseImportExportBasic.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataQuality/TestCaseImportExportBasic.spec.ts index bc5ba4ad4de3..cc3bbe41471b 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataQuality/TestCaseImportExportBasic.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Features/DataQuality/TestCaseImportExportBasic.spec.ts @@ -216,8 +216,8 @@ test.describe( await test.step('Verify Import Status', async () => { await waitForImportAsyncResponse(page); await validateImportStatus(page, { - passed: '4', - processed: '4', + passed: '3', + processed: '3', failed: '0', }); }); diff --git a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/GlossaryImportExport.spec.ts b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/GlossaryImportExport.spec.ts index 2221d6032210..be12a0c2258c 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/GlossaryImportExport.spec.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/e2e/Pages/GlossaryImportExport.spec.ts @@ -207,8 +207,8 @@ test.describe('Glossary Bulk Import Export', () => { await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '3', - processed: '3', + passed: '2', + processed: '2', failed: '0', }); @@ -250,14 +250,14 @@ test.describe('Glossary Bulk Import Export', () => { (el) => el.textContent ); - expect(processedRow).toBe('3'); + expect(processedRow).toBe('2'); const passedRow = await page.$eval( '[data-testid="passed-row"]', (el) => el.textContent ); - expect(passedRow).toBe('3'); + expect(passedRow).toBe('2'); const failedRow = await page.$eval( '[data-testid="failed-row"]', @@ -357,8 +357,8 @@ ${circularRefGlossary.data.name}.parent,child,child,

child

,,,,,,user:admin await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '4', - processed: '4', + passed: '3', + processed: '3', failed: '0', }); @@ -424,8 +424,8 @@ ${circularRefGlossary.data.name}.parent,child,child,

child

,,,,,,user:admin await loader.waitFor({ state: 'hidden' }); await validateImportStatus(page, { - passed: '3', - processed: '4', + passed: '2', + processed: '3', failed: '1', }); @@ -496,8 +496,8 @@ ${circularRefGlossary.data.name}.parent,child,child,

child

,,,,,,user:admin // Should show failure due to missing required field await validateImportStatus(page, { - passed: '1', - processed: '2', + passed: '0', + processed: '1', failed: '1', }); } diff --git a/openmetadata-ui/src/main/resources/ui/playwright/utils/testCases.ts b/openmetadata-ui/src/main/resources/ui/playwright/utils/testCases.ts index 031053f98539..4e14c11778f1 100644 --- a/openmetadata-ui/src/main/resources/ui/playwright/utils/testCases.ts +++ b/openmetadata-ui/src/main/resources/ui/playwright/utils/testCases.ts @@ -648,8 +648,8 @@ export const performE2EExportImportFlow = async ( await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '3', - processed: '6', + passed: '2', + processed: '5', failed: '3', }); @@ -733,8 +733,8 @@ export const performE2EExportImportFlow = async ( await page.getByRole('button', { name: 'Next' }).click(); await validateImportStatus(page, { - passed: '3', - processed: '3', + passed: '2', + processed: '2', failed: '0', }); diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/EntityImport/BulkEntityImportPage/BulkEntityImportPage.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/EntityImport/BulkEntityImportPage/BulkEntityImportPage.tsx index 1c281cbea668..7c2f49032d66 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/EntityImport/BulkEntityImportPage/BulkEntityImportPage.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/EntityImport/BulkEntityImportPage/BulkEntityImportPage.tsx @@ -532,9 +532,14 @@ const BulkEntityImportPage = () => { if (websocketResponse.status === 'COMPLETED') { const importResults = websocketResponse.result; - // If the job is complete and the status is either failure or aborted - // then reset the validation data and active step - if (['failure', 'aborted'].includes(importResults?.status ?? '')) { + // If the job is aborted, or failed before processing any rows (e.g. malformed CSV), + // reset to upload step. If rows were processed but all failed, fall through to + // show the validation grid so the user can inspect and fix errors. + if ( + ['aborted'].includes(importResults?.status ?? '') || + (importResults?.status === 'failure' && + (importResults?.numberOfRowsProcessed ?? 0) === 0) + ) { setValidationData(importResults); handleActiveStepChange(VALIDATION_STEP.UPLOAD); @@ -643,7 +648,8 @@ const BulkEntityImportPage = () => { + })} + > {isBulkEdit ? ( { align="center" className="w-full justify-center p-lg text-center" direction="vertical" - size={16}> + size={16} + > + data-testid="abort-reason" + > {t('label.aborted')} {' '} @@ -734,7 +742,8 @@ const BulkEntityImportPage = () => { ghost data-testid="cancel-button" type="primary" - onClick={handleRetryCsvUpload}> + onClick={handleRetryCsvUpload} + > {t('label.back')} @@ -792,7 +801,8 @@ const BulkEntityImportPage = () => { className="m-l-sm" disabled={isValidating} type="primary" - onClick={handleValidate}> + onClick={handleValidate} + > {activeStep === 2 ? t('label.update') : t('label.next')} )}