Skip to content
Open
27 changes: 27 additions & 0 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,33 @@ For connector-specific development, see [skills/README.md](skills/README.md).
- Run `mvn spotless:apply` before every commit
- Every new REST endpoint needs a corresponding `*IT.java` in `openmetadata-integration-tests/`

---

## Development with Dev Containers

OpenMetadata provides two Dev Container configurations to streamline your development environment setup.

### 1. Standard Development (`.devcontainer/dev`)
- **Purpose**: Optimized for backend or frontend development.
- **Includes**: Java 21, Maven, Python 3.11, Node.js 22.
- **Setup**: Uses `post-create.sh` to install ANTLR4, yarn dependencies, and set up the ingestion virtual environment.
- **Note**: You must start external services (MySQL, Elasticsearch) separately using `docker/development/docker-compose.yml`.

### 2. Full Stack Development (`.devcontainer/full-stack`)
- **Purpose**: Starts the entire OpenMetadata stack (Server, Ingestion/Airflow, MySQL, Elasticsearch) within the container environment.
- **Setup**: Uses the same `post-create.sh` for environment initialization.
- **Note**: Services are orchestrated via Docker Compose and are available as soon as the container is ready.

### Environment Initialization
The `post-create.sh` script (located in `.devcontainer/dev/`) is used by both configurations to:
1. Install ANTLR4 complete JAR.
2. Run `yarn install` for the UI.
3. Create a dedicated Python virtual environment (`.venv-devcontainer`) and run `make install_dev generate`.

Do NOT look for a `post-start` script; all initialization logic is consolidated in `post-create.sh` to ensure it only runs once during container creation.

---

### React/TypeScript Frontend

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,40 @@ public ClusterCapacity getClusterCapacity(SearchRepository searchRepository) {
}

private ClusterCapacity getOpenSearchCapacity(OpenSearchClient client) {
if (client.isAoss()) {
LOG.debug("AWS OpenSearch Serverless detected, using conservative capacity estimate");
return getConservativeEstimate();
}
try {
var clusterStats = client.clusterStats();

int totalNodes = clusterStats.nodes().count().total();
int totalShards =
clusterStats.indices().shards().total() != null
? clusterStats.indices().shards().total().intValue()
: 0;
int totalNodes = 1;
int totalShards = 0;

if (clusterStats != null) {
totalNodes =
(clusterStats.nodes() != null && clusterStats.nodes().count() != null)
? clusterStats.nodes().count().total()
: 1;
totalShards =
(clusterStats.indices() != null
&& clusterStats.indices().shards() != null
&& clusterStats.indices().shards().total() != null)
? clusterStats.indices().shards().total().intValue()
: 0;
Comment thread
gitar-bot[bot] marked this conversation as resolved.
}

int maxShardsPerNode = getMaxShardsPerNode(client);
int maxShards = totalNodes * maxShardsPerNode;

double usagePercent = maxShards > 0 ? (double) totalShards / maxShards : 0;
int availableShards = maxShards - totalShards;

LOG.debug(
"OpenSearch cluster capacity: {} current shards, {} max shards ({} nodes x {} per node), {:.1f}% used",
totalShards, maxShards, totalNodes, maxShardsPerNode, usagePercent * 100);
totalShards,
maxShards,
totalNodes,
maxShardsPerNode,
usagePercent * 100);

return new ClusterCapacity(totalShards, maxShards, usagePercent, availableShards);
} catch (Exception e) {
Expand Down Expand Up @@ -127,6 +143,9 @@ private ClusterCapacity getElasticSearchCapacity(ElasticSearchClient client) {
}

private int getMaxShardsPerNode(OpenSearchClient client) {
if (client.isAoss()) {
return DEFAULT_MAX_SHARDS_PER_NODE;
}
try {
var settings = client.clusterSettings();
if (settings != null && settings.persistent() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,13 @@ public void storeRelationships(TestCase test) {
protected void postDelete(TestCase testCase, boolean hardDelete) {
super.postDelete(testCase, hardDelete);
updateTestSuite(testCase);
if (hardDelete) {
// Delete test case results and resolution statuses
Entity.getEntityTimeSeriesRepository(Entity.TEST_CASE_RESULT)
.delete(testCase.getFullyQualifiedName());
Entity.getEntityTimeSeriesRepository(Entity.TEST_CASE_RESOLUTION_STATUS)
.delete(testCase.getFullyQualifiedName());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ public interface SearchClient
boolean isNewClientAvailable();

ElasticSearchConfiguration.SearchType getSearchType();

default boolean isAoss() {
return false;
}

<T> T getHighLevelClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,30 @@ private static SearchClusterMetrics fetchOpenSearchMetrics(
OpenSearchClient osClient,
long totalEntities,
int maxDbConnections) {
if (osClient.isAoss()) {
LOG.debug("AWS OpenSearch Serverless detected, using conservative metrics");
return getConservativeDefaults(searchRepository, totalEntities, maxDbConnections);
}
try {
var clusterStats = osClient.clusterStats();
var nodesStats = osClient.nodesStats();
var clusterSettings = osClient.clusterSettings();

LOG.debug("ClusterStats response: {}", clusterStats);
LOG.debug("NodesStats response: {}", nodesStats);

int totalNodes = clusterStats.nodes().count().total();
int totalShards =
clusterStats.indices().shards().total() != null
? clusterStats.indices().shards().total().intValue()
: 0;
int totalNodes = 1;
int totalShards = 0;

if (clusterStats != null) {
totalNodes =
(clusterStats.nodes() != null && clusterStats.nodes().count() != null)
? clusterStats.nodes().count().total()
: 1;
totalShards =
(clusterStats.indices() != null
&& clusterStats.indices().shards() != null
&& clusterStats.indices().shards().total() != null)
? clusterStats.indices().shards().total().intValue()
: 0;
}

double cpuUsagePercent = osClient.averageCpuPercentFromNodesStats(nodesStats);
var jvmStats = osClient.extractJvmMemoryStats(nodesStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.json.JsonObject;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
import java.security.KeyStoreException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -103,13 +104,15 @@ public class OpenSearchClient implements SearchClient {
private final OpenSearchDataInsightAggregatorManager dataInsightAggregatorManager;
private final OpenSearchSearchManager searchManager;

private final boolean isAoss;
private final NLQService nlqService;

public OpenSearchClient(ElasticSearchConfiguration config) {
this(config, null);
}

public OpenSearchClient(ElasticSearchConfiguration config, NLQService nlqService) {
this.isAoss = checkIsAoss(config);
AwsConfiguration awsConfig = config != null ? config.getAws() : null;
boolean useIamAuth = isAwsIamAuthEnabled(awsConfig);

Expand Down Expand Up @@ -1075,6 +1078,51 @@ public SearchSchemaEntityRelationshipResult getSchemaEntityRelationship(
schemaFqn, queryFilter, includeSourceFields, offset, limit, from, size, deleted);
}

@Override
public boolean isAoss() {
return isAoss;
}

private static boolean checkIsAoss(ElasticSearchConfiguration config) {
if (config == null) {
return false;
}

// Secondary signal: Check AWS service name configuration
if (config.getAws() != null && "aoss".equalsIgnoreCase(config.getAws().getServiceName())) {
return true;
}

String hostConfig = config.getHost();
if (StringUtils.isBlank(hostConfig)) {
return false;
}
for (String host : hostConfig.split(",")) {
String trimmedHost = host.trim().toLowerCase();
if (trimmedHost.isEmpty()) {
continue;
}

String hostname = trimmedHost;
try {
// Add protocol if missing to make URI parsing easier
String uriString = trimmedHost.contains("://") ? trimmedHost : "https://" + trimmedHost;
Comment thread
gitar-bot[bot] marked this conversation as resolved.
URI uri = URI.create(uriString);
hostname = uri.getHost();
} catch (Exception e) {
// If URI parsing fails, strip port manually as fallback
if (hostname.contains(":")) {
hostname = hostname.split(":")[0];
}
}

if (hostname != null && hostname.endsWith(".aoss.amazonaws.com")) {
return true;
}
}
return false;
}

@Override
public void initializeLineageBuilders() {
if (lineageGraphBuilder == null && newClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,135 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.openmetadata.common.utils.CommonUtil;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.rds.RdsUtilities;
import software.amazon.awssdk.services.rds.model.GenerateAuthenticationTokenRequest;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

/**
* {@link DatabaseAuthenticationProvider} implementation for AWS RDS IAM Auth.
*
* @see <a href="https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.IAMDBAuth.Enabling.html"></a>
*/
public class AwsRdsDatabaseAuthenticationProvider implements DatabaseAuthenticationProvider {
public class AwsRdsDatabaseAuthenticationProvider
implements DatabaseAuthenticationProvider, AutoCloseable {

public static final String AWS_REGION = "awsRegion";
public static final String ALLOW_PUBLIC_KEY_RETRIEVAL = "allowPublicKeyRetrieval";
public static final String ASSUME_ROLE_ARN = "assumeRoleArn";
public static final String PROTOCOL = "https://";

private final Map<String, AwsCredentialsProvider> credentialsProviderCache =
new ConcurrentHashMap<>();
private final Map<String, StsClient> stsClientCache = new ConcurrentHashMap<>();
private final Map<String, RdsUtilities> rdsUtilitiesCache = new ConcurrentHashMap<>();
private static final AwsCredentialsProvider DEFAULT_CREDENTIALS_PROVIDER =
DefaultCredentialsProvider.create();

@Override
public String authenticate(String jdbcUrl, String username, String password) {
public String authenticate(final String jdbcUrl, final String username, final String password) {
try {

URI uri = URI.create(PROTOCOL + removeProtocolFrom(jdbcUrl));
Map<String, String> queryParams = parseQueryParams(uri.toURL());
final URI uri = URI.create(PROTOCOL + removeProtocolFrom(jdbcUrl));
final Map<String, String> queryParams = parseQueryParams(uri.toURL());

// Set
String awsRegion = queryParams.get(AWS_REGION);
String allowPublicKeyRetrieval = queryParams.get(ALLOW_PUBLIC_KEY_RETRIEVAL);
final String awsRegion = queryParams.get(AWS_REGION);
final String allowPublicKeyRetrieval = queryParams.get(ALLOW_PUBLIC_KEY_RETRIEVAL);
final String assumeRoleArn = queryParams.get(ASSUME_ROLE_ARN);

// Validate
Objects.requireNonNull(awsRegion, "Parameter `awsRegion` shall be provided in the jdbc url.");
Objects.requireNonNull(
allowPublicKeyRetrieval,
"Parameter `allowPublicKeyRetrieval` shall be provided in the jdbc url.");
if (CommonUtil.nullOrEmpty(awsRegion)) {
throw new DatabaseAuthenticationProviderException(
"Parameter `awsRegion` shall be provided in the jdbc url.");
}
if (CommonUtil.nullOrEmpty(allowPublicKeyRetrieval)) {
throw new DatabaseAuthenticationProviderException(
"Parameter `allowPublicKeyRetrieval` shall be provided in the jdbc url.");
}
Comment on lines +49 to +56
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommonUtil.nullOrEmpty only checks isEmpty() and will treat whitespace-only values as present. That means values like awsRegion=%20 or assumeRoleArn=%20 will pass validation and then fail later (e.g., Region.of(" ") or STS AssumeRole with an invalid ARN), producing a harder-to-diagnose error. Consider validating these parameters with a blank-aware check (e.g., trim + empty, or StringUtils.isBlank) so whitespace-only inputs are rejected as missing.

Copilot uses AI. Check for mistakes.

final AwsCredentialsProvider credentialsProvider =
getCredentialsProvider(awsRegion, assumeRoleArn);

// Prepare request
GenerateAuthenticationTokenRequest request =
final GenerateAuthenticationTokenRequest request =
GenerateAuthenticationTokenRequest.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.credentialsProvider(credentialsProvider)
.hostname(uri.getHost())
.port(uri.getPort())
.username(username)
.build();

// Return token
return RdsUtilities.builder()
.region(Region.of(awsRegion))
.build()
.generateAuthenticationToken(request);
return getRdsUtilities(awsRegion).generateAuthenticationToken(request);

} catch (MalformedURLException e) {
// Throw
throw new DatabaseAuthenticationProviderException(e);
} catch (Exception e) {
throw new DatabaseAuthenticationProviderException("Failed to generate AWS RDS IAM token", e);
}
}

private RdsUtilities getRdsUtilities(final String awsRegion) {
return rdsUtilitiesCache.computeIfAbsent(
awsRegion, region -> RdsUtilities.builder().region(Region.of(region)).build());
}

private AwsCredentialsProvider getCredentialsProvider(
final String awsRegion, final String assumeRoleArn) {
if (CommonUtil.nullOrEmpty(assumeRoleArn)) {
return DEFAULT_CREDENTIALS_PROVIDER;
}

final String cacheKey = awsRegion + ":" + assumeRoleArn;
return credentialsProviderCache.computeIfAbsent(
cacheKey,
k -> {
final StsClient stsClient =
stsClientCache.computeIfAbsent(
awsRegion,
region ->
StsClient.builder()
.region(Region.of(region))
.credentialsProvider(DEFAULT_CREDENTIALS_PROVIDER)
.build());

final AssumeRoleRequest assumeRoleRequest =
AssumeRoleRequest.builder()
.roleArn(assumeRoleArn)
.roleSessionName("OpenMetadata-RDS-IAM-Auth")
.build();

return StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequest)
.build();
});
}

@Override
public void close() {
credentialsProviderCache
.values()
.forEach(
p -> {
if (p instanceof AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception ignored) {
// Ignored
}
}
});
stsClientCache.values().forEach(StsClient::close);
credentialsProviderCache.clear();
stsClientCache.clear();
rdsUtilitiesCache.clear();
}
}
Loading