Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public static Collection<Object[]> data() {
new Object[][] {
{LastCacheLoadStrategy.CLEAN_ALL},
{LastCacheLoadStrategy.UPDATE},
{LastCacheLoadStrategy.UPDATE_NO_BLOB},
{LastCacheLoadStrategy.CLEAN_DEVICE}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
import org.apache.iotdb.rpc.TSStatusCode;
Expand All @@ -49,7 +50,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1001,4 +1004,34 @@ public void testNegativeTimestamp() throws Exception {
TableModelUtils.assertData("test", "test", -200, 100, receiverEnv, handleFailure);
}
}

@Test
public void testHistoryDataWithEmptyField() {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"CREATE DATABASE iot_table_stream_attr",
"USE iot_table_stream_attr",
"CREATE TABLE table1 (region STRING TAG, device_id STRING TAG, model_id STRING ATTRIBUTE, maintenance STRING ATTRIBUTE COMMENT 'maintenance', temperature FLOAT FIELD COMMENT 'temperature', humidity STRING ATTRIBUTE COMMENT 'humidity', plant_id STRING TAG) COMMENT 'table1'",
String.format(
"create pipe test with source ('inclusion'='all') with sink('node-urls'='%s')",
receiverEnv.getDataNodeWrapper(0).getIpAndPortString()),
"select * from table1 order by time",
"INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('north', null, 'd101', 'red', null, '2025-11-26 13:38:00', 91.0, null), (null, '1003', null, null, 'maint-a', '2025-11-26 13:39:00', null, '36.2'), (null, null, null, 'green', 'maint-b', '2025-11-26 13:40:00', 88.8, '34.9')",
"INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('south', '1005', 'd105', null, null, '2025-11-26 13:41:00', 87.5, null)",
"INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('west', '1006', 'd106', 'blue', 'maint-c', '2025-11-26 13:42:00', null, '36.8')"),
BaseEnv.TABLE_SQL_DIALECT);
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from iot_table_stream_attr.table1 order by time",
"time,region,device_id,model_id,maintenance,temperature,humidity,plant_id,",
new HashSet<>(
Arrays.asList(
"2025-11-26T13:38:00.000Z,north,d101,red,null,91.0,null,null,",
"2025-11-26T13:39:00.000Z,null,null,null,maint-a,null,36.2,1003,",
"2025-11-26T13:40:00.000Z,null,null,green,maint-b,88.8,34.9,null,",
"2025-11-26T13:41:00.000Z,south,d105,null,null,87.5,null,1005,",
"2025-11-26T13:42:00.000Z,west,d106,blue,maint-c,null,36.8,1006,")),
(String) null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4313,9 +4313,7 @@ public void setLastCacheLoadStrategy(LastCacheLoadStrategy lastCacheLoadStrategy
}

public boolean isCacheLastValuesForLoad() {
return (lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE
|| lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE_NO_BLOB)
&& cacheLastValuesForLoad;
return lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE && cacheLastValuesForLoad;
}

public void setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2529,10 +2529,15 @@ private void loadLoadTsFileProps(TrimProperties properties) {
"load_disk_select_strategy_for_pipe_and_iotv2",
ILoadDiskSelector.LoadDiskSelectorType.INHERIT_LOAD.getValue()));

conf.setLastCacheLoadStrategy(
LastCacheLoadStrategy.valueOf(
properties.getProperty(
"last_cache_operation_on_load", LastCacheLoadStrategy.UPDATE.name())));
final String lastCacheOperationOnLoad =
properties.getProperty("last_cache_operation_on_load", LastCacheLoadStrategy.UPDATE.name());
if ("UPDATE_NO_BLOB".equals(lastCacheOperationOnLoad)) {
LOGGER.warn(
"last_cache_operation_on_load=UPDATE_NO_BLOB is deprecated and treated as UPDATE.");
conf.setLastCacheLoadStrategy(LastCacheLoadStrategy.UPDATE);
} else {
conf.setLastCacheLoadStrategy(LastCacheLoadStrategy.valueOf(lastCacheOperationOnLoad));
}

conf.setCacheLastValuesForLoad(
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
private List<ColumnCategory> columnTypes;
private List<String> measurementList;
private List<TSDataType> dataTypeList;
private List<IMeasurementSchema> fieldSchemaList;
private int deviceIdSize;

private List<ModsOperationUtil.ModsInfo> modsInfoList;
Expand Down Expand Up @@ -194,7 +195,7 @@

long size = 0;
List<AbstractAlignedChunkMetadata> iChunkMetadataList =
reader.getAlignedChunkMetadata(pair.left, true);
reader.getAlignedChunkMetadata(pair.left, false);

Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
iChunkMetadataList.iterator();
Expand All @@ -213,27 +214,7 @@
continue;
}

Iterator<IChunkMetadata> iChunkMetadataIterator =
alignedChunkMetadata.getValueChunkMetadataList().iterator();
while (iChunkMetadataIterator.hasNext()) {
IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next();
if (iChunkMetadata == null) {
iChunkMetadataIterator.remove();
continue;
}

if (!modifications.isEmpty()
&& ModsOperationUtil.isAllDeletedByMods(
pair.getLeft(),
iChunkMetadata.getMeasurementUid(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
iChunkMetadataIterator.remove();
}
}

if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) {
chunkMetadataIterator.remove();
continue;
}
Expand Down Expand Up @@ -267,6 +248,7 @@
dataTypeList = new ArrayList<>();
columnTypes = new ArrayList<>();
measurementList = new ArrayList<>();
fieldSchemaList = new ArrayList<>();

for (int i = 0; i < columnSchemaSize; i++) {
final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i);
Expand All @@ -280,6 +262,9 @@
measurementList.add(measurementName);
dataTypeList.add(schema.getType());
}
if (ColumnCategory.FIELD.equals(columnCategory)) {
fieldSchemaList.add(schema);
}
}
}
deviceIdSize = dataTypeList.size();
Expand Down Expand Up @@ -331,9 +316,9 @@
tablet =
new Tablet(
tableName,
measurementList,
dataTypeList,
columnTypes,
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(columnTypes),
rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
isFirstRow = false;
Expand Down Expand Up @@ -376,6 +361,20 @@
long size = timeChunkSize;

final List<Chunk> valueChunkList = new ArrayList<>();
final Map<String, IChunkMetadata> valueChunkMetadataMap =
alignedChunkMetadata.getValueChunkMetadataList().stream()
.filter(Objects::nonNull)
.filter(
metadata ->
!isFieldDeletedByMods(
metadata.getMeasurementUid(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime()))
.collect(
Collectors.toMap(
IChunkMetadata::getMeasurementUid,
metadata -> metadata,
(left, right) -> left));

// To ensure that the Tablet has the same alignedChunk column as the current one,
// you need to create a new Tablet to fill in the data.
Expand All @@ -392,50 +391,99 @@
measurementList.subList(deviceIdSize, measurementList.size()).clear();
dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();

for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) {
final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset);
boolean hasSelectedField = fieldSchemaList.isEmpty();
boolean hasSelectedNonNullChunk = false;
for (; offset < fieldSchemaList.size(); ++offset) {

Check warning on line 396 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ377Iy1BmOcEYzVqYQr&open=AZ377Iy1BmOcEYzVqYQr&pullRequest=17601
final IMeasurementSchema schema = fieldSchemaList.get(offset);
if (isFieldDeletedByMods(
schema.getMeasurementName(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime())) {
continue;
}

final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName());
Chunk chunk = null;
if (metadata != null) {
final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
if (valueChunkList.isEmpty()) {
chunk = reader.readMemChunk((ChunkMetadata) metadata);
final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
if (!hasSelectedNonNullChunk) {
// If the first chunk exceeds the memory limit, we need to allocate more memory
size = newSize;
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size);
columnTypes.add(ColumnCategory.FIELD);
measurementList.add(metadata.getMeasurementUid());
dataTypeList.add(metadata.getDataType());
valueChunkList.add(chunk);
++offset;
} else {
break;
}
break;
} else {
// Record the column information corresponding to Meta to fill in Tablet
columnTypes.add(ColumnCategory.FIELD);
measurementList.add(metadata.getMeasurementUid());
dataTypeList.add(metadata.getDataType());
valueChunkList.add(chunk);
size = newSize;
}
hasSelectedNonNullChunk = true;
}

columnTypes.add(ColumnCategory.FIELD);
measurementList.add(schema.getMeasurementName());
dataTypeList.add(schema.getType());
valueChunkList.add(chunk);
hasSelectedField = true;
}

if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
if (offset >= fieldSchemaList.size()) {
currentChunkMetadata = null;
}

if (!hasSelectedField) {
this.chunkReader = null;
this.batchData = null;
return;
}

this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
this.modsInfoList =
ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications);
}

private boolean areAllFieldsDeletedByMods(
final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) {
if (modifications.isEmpty() || fieldSchemaList.isEmpty()) {
return false;
}

for (final IMeasurementSchema schema : fieldSchemaList) {
if (!ModsOperationUtil.isAllDeletedByMods(
currentDeviceID,
schema.getMeasurementName(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
return false;
}
}
return true;
}

private boolean isFieldDeletedByMods(
final String measurementID, final long startTime, final long endTime) {
return !modifications.isEmpty()
&& ModsOperationUtil.isAllDeletedByMods(
deviceID, measurementID, startTime, endTime, modifications);
}

private boolean fillMeasurementValueColumns(

Check failure on line 472 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ377Iy1BmOcEYzVqYQs&open=AZ377Iy1BmOcEYzVqYQs&pullRequest=17601
final BatchData data, final Tablet tablet, final int rowIndex) {
final TsPrimitiveType[] primitiveTypes = data.getVector();
final TsPrimitiveType[] primitiveTypes =
Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0];
boolean needFillTime = false;
boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize;

for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
if (primitiveType == null
|| ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) {
final TsPrimitiveType primitiveType =
i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null;
final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i));
if (!isDeleted) {
hasNonDeletedField = true;
}
if (primitiveType == null || isDeleted) {
switch (dataTypeList.get(i)) {
case TEXT:
case BLOB:
Expand Down Expand Up @@ -480,7 +528,7 @@
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
}
}
return needFillTime;
return needFillTime || hasNonDeletedField;
}

private void fillDeviceIdColumns(
Expand Down
Loading
Loading