Skip to content

Commit 73464cc

Browse files
authored
Add K8s native scheduler (#24940)
1 parent 2220cb4 commit 73464cc

7 files changed

Lines changed: 3377 additions & 24 deletions

File tree

openmetadata-service/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,12 @@
641641
<version>${org.testcontainers.version}</version>
642642
<scope>test</scope>
643643
</dependency>
644+
<dependency>
645+
<groupId>org.testcontainers</groupId>
646+
<artifactId>k3s</artifactId>
647+
<version>${org.testcontainers.version}</version>
648+
<scope>test</scope>
649+
</dependency>
644650
<dependency>
645651
<groupId>org.opensearch</groupId>
646652
<artifactId>opensearch-testcontainers</artifactId>
@@ -926,6 +932,11 @@
926932
<artifactId>resilience4j-ratelimiter</artifactId>
927933
<version>${resilience4j-ratelimiter.version}</version>
928934
</dependency>
935+
<dependency>
936+
<groupId>io.github.resilience4j</groupId>
937+
<artifactId>resilience4j-retry</artifactId>
938+
<version>${resilience4j-ratelimiter.version}</version>
939+
</dependency>
929940
<dependency>
930941
<groupId>org.junit.platform</groupId>
931942
<artifactId>junit-platform-commons</artifactId>

openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/MeteredPipelineServiceClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ public MeteredPipelineServiceClient(PipelineServiceClientInterface decoratedClie
3434
this.decoratedClient = decoratedClient;
3535
}
3636

37+
/** Get the underlying decorated client. Used for testing to check the actual client type. */
38+
@com.google.common.annotations.VisibleForTesting
39+
public PipelineServiceClientInterface getDecoratedClient() {
40+
return decoratedClient;
41+
}
42+
3743
private <T> T executeWithMetering(String name, Supplier<T> operation) {
3844
try {
3945
T result = operation.get();

openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,39 +27,97 @@ private PipelineServiceClientFactory() {
2727
// Final class
2828
}
2929

30-
@Getter private static PipelineServiceClientInterface pipelineServiceClient;
30+
/** Lock for thread-safe client creation. */
31+
private static final Object LOCK = new Object();
3132

33+
/** Volatile to ensure visibility across threads. */
34+
@Getter private static volatile PipelineServiceClientInterface pipelineServiceClient;
35+
36+
/**
37+
* Reset the cached pipeline service client. This is used by tests that need to switch between
38+
* different pipeline client implementations (e.g., MockPipelineServiceClient vs
39+
* K8sPipelineClient).
40+
*/
41+
@com.google.common.annotations.VisibleForTesting
42+
public static void reset() {
43+
synchronized (LOCK) {
44+
pipelineServiceClient = null;
45+
}
46+
}
47+
48+
/**
49+
* Creates or returns the cached pipeline service client. Thread-safe with double-checked locking.
50+
*
51+
* @param config The pipeline service client configuration
52+
* @return The pipeline service client instance
53+
*/
3254
public static PipelineServiceClientInterface createPipelineServiceClient(
3355
PipelineServiceClientConfiguration config) {
34-
if (pipelineServiceClient != null || CommonUtil.nullOrEmpty(config)) {
56+
if (CommonUtil.nullOrEmpty(config)) {
3557
return pipelineServiceClient;
3658
}
3759

38-
if (Boolean.FALSE.equals(config.getEnabled())) {
39-
LOG.debug("Pipeline Service Client is disabled. Skipping initialization.");
40-
return null;
60+
// Fast path: check if we can reuse cached client without locking
61+
PipelineServiceClientInterface cached = pipelineServiceClient;
62+
if (cached != null && isSameClientType(cached, config.getClassName())) {
63+
return cached;
4164
}
4265

43-
String pipelineServiceClientClass = config.getClassName();
44-
LOG.debug("Registering PipelineServiceClient: {}", pipelineServiceClientClass);
66+
// Slow path: synchronized creation
67+
synchronized (LOCK) {
68+
// Double-check after acquiring lock
69+
cached = pipelineServiceClient;
70+
if (cached != null && isSameClientType(cached, config.getClassName())) {
71+
return cached;
72+
}
4573

46-
try {
47-
PipelineServiceClientInterface client =
48-
Class.forName(pipelineServiceClientClass)
49-
.asSubclass(PipelineServiceClient.class)
50-
.getConstructor(PipelineServiceClientConfiguration.class)
51-
.newInstance(config);
52-
pipelineServiceClient = new MeteredPipelineServiceClient(client);
53-
return pipelineServiceClient;
54-
} catch (ClassNotFoundException
55-
| NoSuchMethodException
56-
| InvocationTargetException
57-
| InstantiationException
58-
| IllegalAccessException e) {
59-
throw new PipelineServiceClientException(
60-
String.format(
61-
"Error trying to load PipelineServiceClient %s: %s",
62-
pipelineServiceClientClass, e.getCause()));
74+
if (cached != null) {
75+
LOG.info(
76+
"Requested PipelineServiceClient class {} differs from cached {}, creating new client",
77+
config.getClassName(),
78+
getActualClientClassName(cached));
79+
}
80+
81+
if (Boolean.FALSE.equals(config.getEnabled())) {
82+
LOG.debug("Pipeline Service Client is disabled. Skipping initialization.");
83+
return null;
84+
}
85+
86+
String pipelineServiceClientClass = config.getClassName();
87+
LOG.info("Creating PipelineServiceClient: {}", pipelineServiceClientClass);
88+
89+
try {
90+
PipelineServiceClientInterface client =
91+
Class.forName(pipelineServiceClientClass)
92+
.asSubclass(PipelineServiceClient.class)
93+
.getConstructor(PipelineServiceClientConfiguration.class)
94+
.newInstance(config);
95+
pipelineServiceClient = new MeteredPipelineServiceClient(client);
96+
return pipelineServiceClient;
97+
} catch (ClassNotFoundException
98+
| NoSuchMethodException
99+
| InvocationTargetException
100+
| InstantiationException
101+
| IllegalAccessException e) {
102+
throw new PipelineServiceClientException(
103+
String.format(
104+
"Error trying to load PipelineServiceClient %s: %s",
105+
pipelineServiceClientClass, e.getCause()));
106+
}
107+
}
108+
}
109+
110+
/** Check if the cached client is of the same type as requested. */
111+
private static boolean isSameClientType(
112+
PipelineServiceClientInterface cached, String requestedClassName) {
113+
return getActualClientClassName(cached).equals(requestedClassName);
114+
}
115+
116+
/** Get the actual client class name, unwrapping MeteredPipelineServiceClient if needed. */
117+
private static String getActualClientClassName(PipelineServiceClientInterface client) {
118+
if (client instanceof MeteredPipelineServiceClient) {
119+
return ((MeteredPipelineServiceClient) client).getDecoratedClient().getClass().getName();
63120
}
121+
return client.getClass().getName();
64122
}
65123
}

0 commit comments

Comments
 (0)