Skip to content

Commit da7a9e1

Browse files
authored
fix(migration): introduce ServiceLoader SPI for extension migration providers (#27760)
1 parent 3754109 commit da7a9e1

5 files changed

Lines changed: 113 additions & 38 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcess.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@
4040
* Even for pure Java migrations, the directory structure MUST exist - SQL files can be empty but the
4141
* directory structure is mandatory.
4242
*
43-
* <p>Java migrations must follow this naming convention:
43+
* <p>Native OpenMetadata Java migrations must follow this naming convention:
4444
* {@code org.openmetadata.service.migration.[dbPackageName].[versionPackage].Migration}
4545
* Example: {@code org.openmetadata.service.migration.postgres.v120.Migration}
4646
*
47-
* In collate:
48-
* {@code io.collate.service.migration.[dbPackageName].[versionPackage].Migration}
47+
* <p>Migrations that ship outside of OpenMetadata (extension migration directories) are resolved
48+
* by implementations of {@link MigrationProcessExtensionProvider} registered via
49+
* {@code java.util.ServiceLoader}. When no provider handles a given extension version, the
50+
* workflow falls back to {@link MigrationProcessImpl} (SQL changes only, no Java data migration).
4951
*
5052
* <p>Java migrations should extend {@code MigrationProcessImpl} and override required methods,
5153
* particularly {@code runDataMigration()} and {@code getMigrationOps()}.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2021 Collate
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
14+
package org.openmetadata.service.migration.api;
15+
16+
import java.util.Optional;
17+
import org.openmetadata.service.migration.utils.MigrationFile;
18+
19+
/**
20+
* SPI for resolving a {@link MigrationProcess} for migration directories that ship outside of
21+
* OpenMetadata. Commercial distributions or downstream forks register implementations via
22+
* {@code META-INF/services/org.openmetadata.service.migration.api.MigrationProcessExtensionProvider}
23+
* to plug in their own Java migration classes without OpenMetadata having to know about them.
24+
*
25+
* <p>The migration workflow only consults providers for files where {@code MigrationFile.isExtension}
26+
* is true. If no provider returns a present value, the workflow falls back to {@link
27+
* MigrationProcessImpl}, which runs the SQL changes from the version's directory and performs no
28+
* Java-level data migration.
29+
*/
30+
public interface MigrationProcessExtensionProvider {
31+
32+
/**
33+
* Resolve a migration process for the given extension migration file.
34+
*
35+
* @param file the extension migration file (guaranteed {@code file.isExtension == true})
36+
* @return the {@link MigrationProcess} to run for this version, or {@link Optional#empty()} if
37+
* this provider does not handle the version (the workflow will try the next provider, or
38+
* fall back to {@link MigrationProcessImpl}).
39+
*/
40+
Optional<MigrationProcess> provide(MigrationFile file);
41+
}

openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
import java.util.Map;
1515
import java.util.Objects;
1616
import java.util.Optional;
17+
import java.util.ServiceLoader;
1718
import java.util.Set;
1819
import java.util.UUID;
1920
import java.util.regex.Matcher;
2021
import java.util.regex.Pattern;
2122
import java.util.stream.Stream;
23+
import java.util.stream.StreamSupport;
2224
import lombok.Getter;
2325
import lombok.extern.slf4j.Slf4j;
2426
import org.flywaydb.core.api.configuration.ClassicConfiguration;
@@ -162,6 +164,7 @@ public List<MigrationFile> getMigrationFilesFromPath(
162164
private List<MigrationProcess> filterAndGetMigrationsToRun(
163165
List<MigrationFile> availableMigrations) {
164166
List<MigrationFile> applyMigrations = resolveApplyMigrations(availableMigrations);
167+
List<MigrationProcessExtensionProvider> extensionProviders = loadExtensionProviders();
165168
List<MigrationProcess> processes = new ArrayList<>();
166169
try {
167170
for (MigrationFile file : applyMigrations) {
@@ -172,29 +175,37 @@ private List<MigrationProcess> filterAndGetMigrationsToRun(
172175
file.version);
173176
continue;
174177
}
175-
String extClazzName = null;
176-
if (file.version.contains("collate")) {
177-
extClazzName = file.getMigrationProcessExtClassName();
178-
}
179-
if (extClazzName != null) {
180-
MigrationProcess collateProcess =
181-
(MigrationProcess)
182-
Class.forName(extClazzName).getConstructor(MigrationFile.class).newInstance(file);
183-
processes.add(collateProcess);
184-
} else {
185-
String clazzName = file.getMigrationProcessClassName();
186-
MigrationProcess openMetadataProcess =
187-
(MigrationProcess)
188-
Class.forName(clazzName).getConstructor(MigrationFile.class).newInstance(file);
189-
processes.add(openMetadataProcess);
190-
}
178+
processes.add(resolveMigrationProcess(file, extensionProviders));
191179
}
192180
} catch (Exception e) {
193181
LOG.error("Failed to list and add migrations to run due to ", e);
194182
}
195183
return processes;
196184
}
197185

186+
private MigrationProcess resolveMigrationProcess(
187+
MigrationFile file, List<MigrationProcessExtensionProvider> extensionProviders)
188+
throws ReflectiveOperationException {
189+
if (file.isExtension) {
190+
// No provider handled this extension version: run SQL only, skip Java data migration.
191+
// Critical: do not fall through to OM's same-version native migration class.
192+
return extensionProviders.stream()
193+
.map(provider -> provider.provide(file))
194+
.flatMap(Optional::stream)
195+
.findFirst()
196+
.orElseGet(() -> new MigrationProcessImpl(file));
197+
}
198+
String clazzName = file.getMigrationProcessClassName();
199+
return (MigrationProcess)
200+
Class.forName(clazzName).getConstructor(MigrationFile.class).newInstance(file);
201+
}
202+
203+
private List<MigrationProcessExtensionProvider> loadExtensionProviders() {
204+
return StreamSupport.stream(
205+
ServiceLoader.load(MigrationProcessExtensionProvider.class).spliterator(), false)
206+
.toList();
207+
}
208+
198209
private static int compareVersions(String version1, String version2) {
199210
int[] v1Parts = parseVersion(version1);
200211
int[] v2Parts = parseVersion(version2);

openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/MigrationFile.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,12 @@ public String getMigrationProcessClassName() {
119119
return clazzName;
120120
}
121121

122-
public String getMigrationProcessExtClassName() {
123-
String clazzName =
124-
String.format(
125-
"io.collate.service.migration.%s.%s.Migration", dbPackageName, getVersionPackageName());
126-
try {
127-
Class.forName(clazzName);
128-
} catch (ClassNotFoundException e) {
129-
return null;
122+
public String getVersionPackageName() {
123+
StringBuilder arrayAsString = new StringBuilder();
124+
for (int versionNumber : versionNumbers) {
125+
arrayAsString.append(versionNumber);
130126
}
131-
return clazzName;
127+
return "v" + arrayAsString;
132128
}
133129

134130
public String getMigrationsFilePath() {
@@ -189,14 +185,6 @@ private int compareVersionNumbers(int[] another) {
189185
return 0;
190186
}
191187

192-
private String getVersionPackageName() {
193-
StringBuilder arrayAsString = new StringBuilder();
194-
for (int versionNumber : versionNumbers) {
195-
arrayAsString.append(versionNumber);
196-
}
197-
return "v" + arrayAsString;
198-
}
199-
200188
public boolean isReprocessing() {
201189
return reprocessing;
202190
}

openmetadata-service/src/test/java/org/openmetadata/service/migration/api/MigrationWorkflowTest.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,35 @@ void loadMigrationsReprocessesCurrentExtensionDespiteHigherExecutedCoreVersions(
218218
assertEquals(List.of("0.0.1", "0.0.1-collate"), getMigrationVersions(workflow));
219219
}
220220

221+
@Test
222+
void loadMigrationsFallsBackToNoOpWhenNoExtensionProviderHandlesVersion() throws Exception {
223+
// Regression: a Collate version directory without a registered Java provider must resolve to
224+
// MigrationProcessImpl (no-op data migration), NOT to OM's same-numeric-version migration
225+
// class. Otherwise OM data migrations like migrateThreadTasksToTaskEntity would be re-run for
226+
// every Collate version that shares its major.minor.patch.
227+
Path nativeRoot = Files.createDirectories(tempDir.resolve("native"));
228+
Path extensionRoot = Files.createDirectories(tempDir.resolve("extension"));
229+
createMigrationDir(extensionRoot, "1.12.1-collate", "SELECT 22;");
230+
when(migrationDAO.getMigrationVersions()).thenReturn(List.of());
231+
232+
MigrationWorkflow workflow =
233+
new MigrationWorkflow(
234+
jdbi,
235+
nativeRoot.toString(),
236+
ConnectionType.POSTGRES,
237+
extensionRoot.toString(),
238+
null,
239+
config,
240+
false);
241+
242+
workflow.loadMigrations();
243+
244+
List<MigrationProcess> resolved = getMigrations(workflow);
245+
assertEquals(1, resolved.size());
246+
assertEquals("1.12.1-collate", resolved.get(0).getVersion());
247+
assertEquals(MigrationProcessImpl.class, resolved.get(0).getClass());
248+
}
249+
221250
@Test
222251
void loadMigrationsFallsBackToRunningEverythingWhenMigrationLookupFails() throws Exception {
223252
Path nativeRoot = Files.createDirectories(tempDir.resolve("native"));
@@ -724,10 +753,14 @@ private void createMigrationDir(Path root, String version, String sql) throws Ex
724753

725754
@SuppressWarnings("unchecked")
726755
private List<String> getMigrationVersions(MigrationWorkflow workflow) throws Exception {
756+
return getMigrations(workflow).stream().map(MigrationProcess::getVersion).toList();
757+
}
758+
759+
@SuppressWarnings("unchecked")
760+
private List<MigrationProcess> getMigrations(MigrationWorkflow workflow) throws Exception {
727761
Field field = MigrationWorkflow.class.getDeclaredField("migrations");
728762
field.setAccessible(true);
729-
List<MigrationProcess> migrations = (List<MigrationProcess>) field.get(workflow);
730-
return migrations.stream().map(MigrationProcess::getVersion).toList();
763+
return (List<MigrationProcess>) field.get(workflow);
731764
}
732765

733766
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)