From 1479cfc924b309c2d1165148b188fc0d330b13ad Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:26:42 +0800 Subject: [PATCH 1/4] Fixed multiple bugs of insertion (#17570) * insert-fix * source/sink * source/sink-2 * sptls * fix * sink * compile --- .../sink/protocol/IoTDBConfigRegionSink.java | 9 +- .../db/pipe/agent/task/PipeDataNodeTask.java | 28 +- .../execution/PipeSinkSubtaskExecutor.java | 2 +- .../task/stage/PipeTaskProcessorStage.java | 16 +- .../agent/task/stage/PipeTaskSinkStage.java | 13 +- .../agent/task/stage/PipeTaskSourceStage.java | 19 +- .../airgap/IoTDBSchemaRegionAirGapSink.java | 7 +- .../PipeTransferTabletBatchEventHandler.java | 10 +- ...eTransferTabletInsertNodeEventHandler.java | 2 +- ...peTransferTabletInsertionEventHandler.java | 7 +- .../PipeTransferTabletRawEventHandler.java | 6 +- .../handler/PipeTransferTrackableHandler.java | 24 +- .../handler/PipeTransferTsFileHandler.java | 27 +- .../realtime/assigner/DisruptorQueue.java | 4 +- .../PipeInsertionDataNodeListener.java | 38 +-- .../plan/node/write/InsertTabletNode.java | 140 +++++++++- .../storageengine/dataregion/DataRegion.java | 248 +++++++++++++----- .../task/stage/SubscriptionTaskSinkStage.java | 11 +- .../dataregion/DataRegionTest.java | 226 ++++++++++++++++ .../commons/client/ClientPoolFactory.java | 4 +- .../iotdb/commons/concurrent/ThreadName.java | 12 +- .../iotdb/commons/conf/CommonConfig.java | 93 ++++--- .../commons/pipe/config/PipeDescriptor.java | 8 +- 23 files changed, 692 insertions(+), 262 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java index 5ee983d7945bd..b5f62ef4ccf48 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java @@ -88,7 +88,7 @@ protected IoTDBSyncClientManager constructClient( protected PipeTransferFilePieceReq getTransferSingleFilePieceReq( final String fileName, final long position, final byte[] payLoad) { throw new UnsupportedOperationException( - "The config region connector does not support transferring single file piece req."); + "The config region sink does not support transferring single file piece req."); } @Override @@ -105,13 +105,13 @@ protected void mayLimitRateAndRecordIO(final long requiredBytes) { @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( - "IoTDBConfigRegionConnector can't transfer TabletInsertionEvent."); + "IoTDBConfigRegionSink can't transfer TabletInsertionEvent."); } @Override public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception { throw new UnsupportedOperationException( - "IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent."); + "IoTDBConfigRegionSink can't transfer TsFileInsertionEvent."); } @Override @@ -121,8 +121,7 @@ public void transfer(final Event event) throws Exception { } else if (event instanceof PipeConfigRegionSnapshotEvent) { doTransferWrapper((PipeConfigRegionSnapshotEvent) event); } else if (!(event instanceof PipeHeartbeatEvent)) { - LOGGER.warn( - "IoTDBConfigRegionConnector does not support transferring generic event: {}.", event); + LOGGER.warn("IoTDBConfigRegionSink does not support transferring generic event: {}.", event); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java index d33ec44a86e53..0d0b955c2109d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java @@ -32,32 +32,32 @@ public class PipeDataNodeTask implements PipeTask { private final String pipeName; private final int regionId; - private final PipeTaskStage extractorStage; + private final PipeTaskStage sourceStage; private final PipeTaskStage processorStage; - private final PipeTaskStage connectorStage; + private final PipeTaskStage sinkStage; private volatile boolean isCompleted = false; public PipeDataNodeTask( final String pipeName, final int regionId, - final PipeTaskStage extractorStage, + final PipeTaskStage sourceStage, final PipeTaskStage processorStage, - final PipeTaskStage connectorStage) { + final PipeTaskStage sinkStage) { this.pipeName = pipeName; this.regionId = regionId; - this.extractorStage = extractorStage; + this.sourceStage = sourceStage; this.processorStage = processorStage; - this.connectorStage = connectorStage; + this.sinkStage = sinkStage; } @Override public void create() { final long startTime = System.currentTimeMillis(); - extractorStage.create(); + sourceStage.create(); processorStage.create(); - connectorStage.create(); + sinkStage.create(); LOGGER.info( "Create pipe DN task {} successfully within {} ms", this, @@ -67,9 +67,9 @@ public void create() { @Override public void drop() { final long startTime = System.currentTimeMillis(); - extractorStage.drop(); + sourceStage.drop(); processorStage.drop(); - connectorStage.drop(); + sinkStage.drop(); LOGGER.info( "Drop pipe DN task {} successfully within {} ms", this, @@ -79,9 +79,9 @@ public void drop() { @Override public void start() { final long startTime = System.currentTimeMillis(); - extractorStage.start(); + sourceStage.start(); processorStage.start(); - connectorStage.start(); + sinkStage.start(); LOGGER.info( "Start pipe DN task {} successfully within {} ms", this, @@ -91,9 +91,9 @@ public void start() { @Override public void stop() { final long startTime = System.currentTimeMillis(); - extractorStage.stop(); + sourceStage.stop(); processorStage.stop(); - connectorStage.stop(); + sinkStage.stop(); LOGGER.info( "Stop pipe DN task {} successfully within {} ms", this, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java index 9a88ad74d7ccb..4abc8daed3002 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java @@ -31,7 +31,7 @@ public class PipeSinkSubtaskExecutor extends PipeSubtaskExecutor { public PipeSinkSubtaskExecutor() { super( PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(), - ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL.getName() + "-" + id.get(), + ThreadName.PIPE_SINK_EXECUTOR_POOL.getName() + "-" + id.get(), ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName() + "-" + id.getAndIncrement(), true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java index ddc194716d2d1..2809ec9ec93fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java @@ -54,8 +54,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage { * @param creationTime pipe creation time * @param pipeProcessorParameters used to create {@link PipeProcessor} * @param regionId {@link DataRegion} id - * @param pipeExtractorInputEventSupplier used to input {@link Event}s from {@link PipeExtractor} - * @param pipeConnectorOutputPendingQueue used to output {@link Event}s to {@link PipeConnector} + * @param pipeSourceInputEventSupplier used to input {@link Event}s from {@link PipeExtractor} + * @param pipeSinkOutputPendingQueue used to output {@link Event}s to {@link PipeConnector} * @throws PipeException if failed to {@link PipeProcessor#validate(PipeParameterValidator)} or * {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)}} */ @@ -64,8 +64,8 @@ public PipeTaskProcessorStage( final long creationTime, final PipeParameters pipeProcessorParameters, final int regionId, - final EventSupplier pipeExtractorInputEventSupplier, - final UnboundedBlockingPendingQueue pipeConnectorOutputPendingQueue, + final EventSupplier pipeSourceInputEventSupplier, + final UnboundedBlockingPendingQueue pipeSinkOutputPendingQueue, final PipeProcessorSubtaskExecutor executor, final PipeTaskMeta pipeTaskMeta, final boolean forceTabletFormat, @@ -99,9 +99,9 @@ public PipeTaskProcessorStage( // removed, the new subtask will have the same pipeName and regionId as the // old one, so we need creationTime to make their hash code different in the map. final String taskId = pipeName + "_" + regionId + "_" + creationTime; - final PipeEventCollector pipeConnectorOutputEventCollector = + final PipeEventCollector pipeSinkOutputEventCollector = new PipeEventCollector( - pipeConnectorOutputPendingQueue, + pipeSinkOutputPendingQueue, creationTime, regionId, forceTabletFormat, @@ -112,9 +112,9 @@ public PipeTaskProcessorStage( pipeName, creationTime, regionId, - pipeExtractorInputEventSupplier, + pipeSourceInputEventSupplier, pipeProcessor, - pipeConnectorOutputEventCollector); + pipeSinkOutputEventCollector); this.executor = executor; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java index a22fbb536d704..88eac560cde2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java @@ -38,7 +38,7 @@ public class PipeTaskSinkStage extends PipeTaskStage { protected final int regionId; protected final Supplier executor; - protected String connectorSubtaskId; + protected String sinkSubtaskId; public PipeTaskSinkStage( String pipeName, @@ -56,7 +56,7 @@ public PipeTaskSinkStage( } protected void registerSubtask() { - this.connectorSubtaskId = + this.sinkSubtaskId = PipeSinkSubtaskManager.instance() .register( executor, @@ -71,21 +71,20 @@ public void createSubtask() throws PipeException { @Override public void startSubtask() throws PipeException { - PipeSinkSubtaskManager.instance().start(connectorSubtaskId); + PipeSinkSubtaskManager.instance().start(sinkSubtaskId); } @Override public void stopSubtask() throws PipeException { - PipeSinkSubtaskManager.instance().stop(connectorSubtaskId); + PipeSinkSubtaskManager.instance().stop(sinkSubtaskId); } @Override public void dropSubtask() throws PipeException { - PipeSinkSubtaskManager.instance() - .deregister(pipeName, creationTime, regionId, connectorSubtaskId); + PipeSinkSubtaskManager.instance().deregister(pipeName, creationTime, regionId, sinkSubtaskId); } public UnboundedBlockingPendingQueue getPipeSinkPendingQueue() { - return PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId); + return PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(sinkSubtaskId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java index 6acc0fc3d4af2..240b5499e92b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java @@ -44,32 +44,31 @@ public class PipeTaskSourceStage extends PipeTaskStage { public PipeTaskSourceStage( String pipeName, long creationTime, - PipeParameters extractorParameters, + PipeParameters sourceParameters, int regionId, PipeTaskMeta pipeTaskMeta) { pipeExtractor = StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) - ? PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters) - : PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters); + ? PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters) + : PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters); - // Validate and customize should be called before createSubtask. this allows extractor exposing + // Validate and customize should be called before createSubtask. this allows source exposing // exceptions in advance. try { - // 1. Validate extractor parameters - pipeExtractor.validate(new PipeParameterValidator(extractorParameters)); + // 1. Validate source parameters + pipeExtractor.validate(new PipeParameterValidator(sourceParameters)); - // 2. Customize extractor + // 2. Customize source final PipeTaskRuntimeConfiguration runtimeConfiguration = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime, regionId, pipeTaskMeta)); - pipeExtractor.customize(extractorParameters, runtimeConfiguration); + pipeExtractor.customize(sourceParameters, runtimeConfiguration); } catch (Exception e) { try { pipeExtractor.close(); } catch (Exception closeException) { LOGGER.warn( - "Failed to close extractor after failed to initialize extractor. " - + "Ignore this exception.", + "Failed to close source after failed to initialize source. " + "Ignore this exception.", closeException); } throw new PipeException(e.getMessage(), e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index 8280709446a3b..6b7787ba16282 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -47,13 +47,13 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink { @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( - "IoTDBSchemaRegionAirGapConnector can't transfer TabletInsertionEvent."); + "IoTDBSchemaRegionAirGapSink can't transfer TabletInsertionEvent."); } @Override public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception { throw new UnsupportedOperationException( - "IoTDBSchemaRegionAirGapConnector can't transfer TsFileInsertionEvent."); + "IoTDBSchemaRegionAirGapSink can't transfer TsFileInsertionEvent."); } @Override @@ -68,8 +68,7 @@ public void transfer(final Event event) throws Exception { doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event); } else if (!(event instanceof PipeHeartbeatEvent)) { LOGGER.warn( - "IoTDBSchemaRegionAirGapConnector does not support transferring generic event: {}.", - event); + "IoTDBSchemaRegionAirGapSink does not support transferring generic event: {}.", event); } } catch (final IOException e) { isSocketAlive.set(socketIndex, false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 110d3cb645027..8bcb9d47009c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -69,7 +69,7 @@ public PipeTransferTabletBatchEventHandler( public void transfer(final AsyncPipeDataTransferServiceClient client) throws TException { for (final Map.Entry, Long> entry : pipeName2BytesAccumulated.entrySet()) { - connector.rateLimitIfNeeded( + sink.rateLimitIfNeeded( entry.getKey().getLeft(), entry.getKey().getRight(), client.getEndPoint(), @@ -92,13 +92,11 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) { // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - connector - .statusHandler() - .handle(status, response.getStatus().getMessage(), events.toString()); + sink.statusHandler().handle(status, response.getStatus().getMessage(), events.toString()); } for (final Pair redirectPair : LeaderCacheUtils.parseRecommendedRedirections(status)) { - connector.updateLeaderCache(redirectPair.getLeft(), redirectPair.getRight()); + sink.updateLeaderCache(redirectPair.getLeft(), redirectPair.getRight()); } events.forEach( @@ -123,7 +121,7 @@ protected void onErrorInternal(final Exception exception) { events.size(), events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet())); } finally { - connector.addFailureEventsToRetryQueue(events, exception); + sink.addFailureEventsToRetryQueue(events, exception); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java index 70ba7f4cfc5b7..912a1e724f748 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java @@ -46,7 +46,7 @@ protected void doTransfer( @Override protected void updateLeaderCache(final TSStatus status) { - connector.updateLeaderCache( + sink.updateLeaderCache( ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), status.getRedirectNode()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java index 66a1f4a013b22..a8f1136b897d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java @@ -54,7 +54,7 @@ protected PipeTransferTabletInsertionEventHandler( public void transfer(final AsyncPipeDataTransferServiceClient client) throws TException { if (event instanceof EnrichedEvent) { - connector.rateLimitIfNeeded( + sink.rateLimitIfNeeded( ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) event).getCreationTime(), client.getEndPoint(), @@ -77,8 +77,7 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) { // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - connector - .statusHandler() + sink.statusHandler() .handle(response.getStatus(), response.getStatus().getMessage(), event.toString()); } if (event instanceof EnrichedEvent) { @@ -109,7 +108,7 @@ protected void onErrorInternal(final Exception exception) { event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitterKey() : null, event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitIds() : null); } finally { - connector.addFailureEventToRetryQueue(event, exception); + sink.addFailureEventToRetryQueue(event, exception); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java index ff1daa05c2859..b64e446827aff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java @@ -32,8 +32,8 @@ public class PipeTransferTabletRawEventHandler extends PipeTransferTabletInserti public PipeTransferTabletRawEventHandler( final PipeRawTabletInsertionEvent event, final TPipeTransferReq req, - final IoTDBDataRegionAsyncSink connector) { - super(event, req, connector); + final IoTDBDataRegionAsyncSink sink) { + super(event, req, sink); } @Override @@ -45,7 +45,7 @@ protected void doTransfer( @Override protected void updateLeaderCache(final TSStatus status) { - connector.updateLeaderCache( + sink.updateLeaderCache( ((PipeRawTabletInsertionEvent) event).getDeviceId(), status.getRedirectNode()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index 21f7c144bed22..a8b4a3b7a79a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -36,18 +36,18 @@ public abstract class PipeTransferTrackableHandler implements AsyncMethodCallback, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class); - protected final IoTDBDataRegionAsyncSink connector; + protected final IoTDBDataRegionAsyncSink sink; protected volatile AsyncPipeDataTransferServiceClient client; - public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink connector) { - this.connector = connector; + public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) { + this.sink = sink; } @Override public void onComplete(final TPipeTransferResp response) { - if (connector.isClosed()) { + if (sink.isClosed()) { clearEventsReferenceCount(); - connector.eliminateHandler(this, true); + sink.eliminateHandler(this, true); return; } @@ -56,7 +56,7 @@ public void onComplete(final TPipeTransferResp response) { // completed // NOTE: We should not clear the reference count of events, as this would cause the // `org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3` test to fail. - connector.eliminateHandler(this, false); + sink.eliminateHandler(this, false); } } @@ -67,14 +67,14 @@ public void onError(final Exception exception) { client.setPrintLogWhenEncounterException(false); } - if (connector.isClosed()) { + if (sink.isClosed()) { clearEventsReferenceCount(); - connector.eliminateHandler(this, true); + sink.eliminateHandler(this, true); return; } onErrorInternal(exception); - connector.eliminateHandler(this, false); + sink.eliminateHandler(this, false); } /** @@ -93,10 +93,10 @@ protected boolean tryTransfer( this.client = client; } // track handler before checking if connector is closed - connector.trackHandler(this); - if (connector.isClosed()) { + sink.trackHandler(this); + if (sink.isClosed()) { clearEventsReferenceCount(); - connector.eliminateHandler(this, true); + sink.eliminateHandler(this, true); client.setShouldReturnSelf(true); client.returnSelf( (e) -> { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index a9426ed7b8b3b..6742199ac1043 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -159,7 +159,7 @@ public void transfer( if (client == null) { LOGGER.warn( "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", - connector.isClosed() ? "CLOSED" : "NOT CLOSED", + sink.isClosed() ? "CLOSED" : "NOT CLOSED", tsFile); return; } @@ -168,7 +168,7 @@ public void transfer( client.setTimeoutDynamically(clientManager.getConnectionTimeout()); PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize); - if (connector.isEnableSendTsFileLimit()) { + if (sink.isEnableSendTsFileLimit()) { TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize); } final int readLength = reader.read(readBuffer); @@ -192,11 +192,11 @@ public void transfer( ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq( modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()) : PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()); - final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq); + final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq); pipeName2WeightMap.forEach( (pipePair, weight) -> - connector.rateLimitIfNeeded( + sink.rateLimitIfNeeded( pipePair.getLeft(), pipePair.getRight(), client.getEndPoint(), @@ -219,11 +219,11 @@ public void transfer( currentFile.getName(), position, payload) : PipeTransferTsFilePieceReq.toTPipeTransferReq( currentFile.getName(), position, payload); - final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq); + final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq); pipeName2WeightMap.forEach( (pipePair, weight) -> - connector.rateLimitIfNeeded( + sink.rateLimitIfNeeded( pipePair.getLeft(), pipePair.getRight(), client.getEndPoint(), @@ -241,7 +241,7 @@ public void onComplete(final TPipeTransferResp response) { try { super.onComplete(response); } finally { - if (connector.isClosed()) { + if (sink.isClosed()) { returnClientIfNecessary(); } } @@ -255,8 +255,7 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) { // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - connector - .statusHandler() + sink.statusHandler() .handle( status, String.format( @@ -330,9 +329,7 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) { // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - connector - .statusHandler() - .handle(status, response.getStatus().getMessage(), tsFile.getName()); + sink.statusHandler().handle(status, response.getStatus().getMessage(), tsFile.getName()); } } @@ -404,7 +401,7 @@ protected void onErrorInternal(final Exception exception) { returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { - connector.addFailureEventsToRetryQueue(events, exception); + sink.addFailureEventsToRetryQueue(events, exception); } } } @@ -415,7 +412,7 @@ private void returnClientIfNecessary() { return; } - if (connector.isClosed()) { + if (sink.isClosed()) { closeClient(); } @@ -439,7 +436,7 @@ protected void doTransfer( if (client == null) { LOGGER.warn( "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", - connector.isClosed() ? "CLOSED" : "NOT CLOSED", + sink.isClosed() ? "CLOSED" : "NOT CLOSED", tsFile); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 52ac137ae4e6f..2019eba85603b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -35,13 +35,13 @@ import java.util.function.Consumer; -import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR; +import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_SOURCE_DISRUPTOR; public class DisruptorQueue { private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorQueue.class); private static final IoTDBDaemonThreadFactory THREAD_FACTORY = - new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName()); + new IoTDBDaemonThreadFactory(PIPE_SOURCE_DISRUPTOR.getName()); private final PipeMemoryBlock allocatedMemoryBlock; private final Disruptor disruptor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index aaa98220178bb..882d4aff0d8eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -41,34 +41,34 @@ * *

All events extracted by this listener will be first published to different * PipeEventDataRegionAssigners (identified by data region id), and then PipeEventDataRegionAssigner - * will filter events and assign them to different PipeRealtimeEventDataRegionExtractors. + * will filter events and assign them to different PipeRealtimeEventDataRegionSources. */ public class PipeInsertionDataNodeListener { private final ConcurrentMap dataRegionId2Assigner = new ConcurrentHashMap<>(); - private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0); - private final AtomicInteger listenToInsertNodeExtractorCount = new AtomicInteger(0); + private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0); + private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0); //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( - String dataRegionId, PipeRealtimeDataRegionSource extractor) { + final String dataRegionId, final PipeRealtimeDataRegionSource source) { dataRegionId2Assigner .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId)) - .startAssignTo(extractor); + .startAssignTo(source); - if (extractor.isNeedListenToTsFile()) { - listenToTsFileExtractorCount.incrementAndGet(); + if (source.isNeedListenToTsFile()) { + listenToTsFileSourceCount.incrementAndGet(); } - if (extractor.isNeedListenToInsertNode()) { - listenToInsertNodeExtractorCount.incrementAndGet(); + if (source.isNeedListenToInsertNode()) { + listenToInsertNodeSourceCount.incrementAndGet(); } } public synchronized void stopListenAndAssign( - final String dataRegionId, final PipeRealtimeDataRegionSource extractor) { + final String dataRegionId, final PipeRealtimeDataRegionSource source) { PipeDataRegionAssigner assignerToClose = null; synchronized (this) { @@ -77,13 +77,13 @@ public synchronized void stopListenAndAssign( return; } - assigner.stopAssignTo(extractor); + assigner.stopAssignTo(source); - if (extractor.isNeedListenToTsFile()) { - listenToTsFileExtractorCount.decrementAndGet(); + if (source.isNeedListenToTsFile()) { + listenToTsFileSourceCount.decrementAndGet(); } - if (extractor.isNeedListenToInsertNode()) { - listenToInsertNodeExtractorCount.decrementAndGet(); + if (source.isNeedListenToInsertNode()) { + listenToInsertNodeSourceCount.decrementAndGet(); } if (assigner.notMoreSourceNeededToBeAssigned()) { @@ -104,8 +104,8 @@ public synchronized void stopListenAndAssign( public void listenToTsFile( final String dataRegionId, final TsFileResource tsFileResource, final boolean isLoaded) { - // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on purpose - // because extractors may use tsfile events when some exceptions occur in the + // We don't judge whether listenToTsFileSourceCount.get() == 0 here on purpose + // because sources may use tsfile events when some exceptions occur in the // insert nodes listening process. final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); @@ -120,8 +120,8 @@ public void listenToTsFile( } public void listenToInsertNode( - String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) { - if (listenToInsertNodeExtractorCount.get() == 0) { + final String dataRegionId, final InsertNode insertNode, final TsFileResource tsFileResource) { + if (listenToInsertNodeSourceCount.get() == 0) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index a366aed71b3e1..1deb5515eb1e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -1076,68 +1076,184 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitInsertTablet(this, context); } - public TimeValuePair composeLastTimeValuePair(int measurementIndex) { + public TimeValuePair composeLastTimeValuePair( + int measurementIndex, int startOffset, int endOffset) { if (measurementIndex >= columns.length || Objects.isNull(dataTypes[measurementIndex])) { return null; } // get non-null value - int lastIdx = rowCount - 1; + int lastIdx = Math.min(endOffset - 1, rowCount - 1); if (bitMaps != null && bitMaps[measurementIndex] != null) { BitMap bitMap = bitMaps[measurementIndex]; - while (lastIdx >= 0) { + while (lastIdx >= startOffset) { if (!bitMap.isMarked(lastIdx)) { break; } lastIdx--; } } - if (lastIdx < 0) { + if (lastIdx < startOffset) { + return null; + } + return composeTimeValuePair(measurementIndex, lastIdx); + } + + public TimeValuePair composeLastTimeValuePair(int measurementIndex) { + return composeLastTimeValuePair(measurementIndex, 0, rowCount); + } + + protected TimeValuePair composeLastTimeValuePair( + final int measurementIndex, + final TSStatus[] results, + final int startOffset, + final int endOffset) { + if (results == null) { + return composeLastTimeValuePair(measurementIndex, startOffset, endOffset); + } + if (measurementIndex >= columns.length || Objects.isNull(dataTypes[measurementIndex])) { return null; } + final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex]; + int lastIdx = Math.min(endOffset - 1, rowCount - 1); + while (lastIdx >= startOffset) { + if (results[lastIdx] != null + && results[lastIdx].getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + lastIdx--; + continue; + } + if (bitMap != null && bitMap.isMarked(lastIdx)) { + lastIdx--; + continue; + } + break; + } + return lastIdx < startOffset ? null : composeTimeValuePair(measurementIndex, lastIdx); + } + + private TimeValuePair composeTimeValuePair(final int measurementIndex, final int rowIndex) { TsPrimitiveType value; switch (dataTypes[measurementIndex]) { case INT32: case DATE: int[] intValues = (int[]) columns[measurementIndex]; - value = new TsPrimitiveType.TsInt(intValues[lastIdx]); + value = new TsPrimitiveType.TsInt(intValues[rowIndex]); break; case INT64: case TIMESTAMP: long[] longValues = (long[]) columns[measurementIndex]; - value = new TsPrimitiveType.TsLong(longValues[lastIdx]); + value = new TsPrimitiveType.TsLong(longValues[rowIndex]); break; case FLOAT: float[] floatValues = (float[]) columns[measurementIndex]; - value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]); + value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]); break; case DOUBLE: double[] doubleValues = (double[]) columns[measurementIndex]; - value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]); + value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]); break; case BOOLEAN: boolean[] boolValues = (boolean[]) columns[measurementIndex]; - value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]); + value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]); break; case TEXT: case BLOB: case STRING: Binary[] binaryValues = (Binary[]) columns[measurementIndex]; - value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]); + value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]); break; default: throw new UnSupportedDataTypeException( String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex])); } - return new TimeValuePair(times[lastIdx], value); + return new TimeValuePair(times[rowIndex], value); + } + + public IDeviceID getDeviceID(int rowIdx) { + if (deviceID != null) { + return deviceID; + } + deviceID = DeviceIDFactory.getInstance().getDeviceID(targetPath); + return deviceID; + } + + protected static class PartitionSplitInfo { + + // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ... + List ranges = new ArrayList<>(); + List timePartitionSlots = new ArrayList<>(); + List replicaSets; + } + + /** + * Split the tablet of the given range according to Table deviceID. + * + * @param start inclusive + * @param end exclusive + * @return each the number in the pair is the end offset of a device + */ + public List> splitByDevice(int start, int end) { + return Collections.singletonList(new Pair<>(getDeviceID(), end)); + } + + /** + * @param results insertion result of each row + * @param ttl the ttl + * @return the position of the first alive row + * @throws OutOfTTLException if all rows have expired the TTL + */ + public int checkTTL(TSStatus[] results, long ttl) throws OutOfTTLException { + return checkTTLInternal(results, ttl, true); + } + + protected int checkTTLInternal(TSStatus[] results, long ttl, boolean breakOnFirstAlive) + throws OutOfTTLException { + + /* + * assume that batch has been sorted by client + */ + int loc = 0; + int firstAliveLoc = -1; + while (loc < getRowCount()) { + long currTime = getTimes()[loc]; + // skip points that do not satisfy TTL + if (!isAlive(currTime, ttl)) { + results[loc] = + RpcUtils.getStatus( + TSStatusCode.OUT_OF_TTL, + String.format( + "Insertion time [%s] is less than ttl time bound [%s]", + DateTimeUtils.convertLongToDate(currTime), + DateTimeUtils.convertLongToDate(CommonDateTimeUtils.currentTime() - ttl))); + } else { + if (firstAliveLoc == -1) { + firstAliveLoc = loc; + } + if (breakOnFirstAlive) { + break; + } + } + loc++; + } + + if (firstAliveLoc == -1) { + // no alive data + throw new OutOfTTLException( + getTimes()[getTimes().length - 1], (CommonDateTimeUtils.currentTime() - ttl)); + } + return firstAliveLoc; } public void updateLastCache(final String databaseName) { + updateLastCache(databaseName, null); + } + + public void updateLastCache(final String databaseName, final TSStatus[] results) { final String[] rawMeasurements = getRawMeasurements(); final TimeValuePair[] timeValuePairs = new TimeValuePair[rawMeasurements.length]; for (int i = 0; i < rawMeasurements.length; i++) { - timeValuePairs[i] = composeLastTimeValuePair(i); + timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount); } DataNodeSchemaCache.getInstance() .updateLastCacheIfExists( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 3cad57962e51e..791d01f970911 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1193,7 +1193,7 @@ public void insertTablet(InsertTabletNode insertTabletNode) if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { // disable updating last cache on follower startTime = System.nanoTime(); - tryToUpdateInsertTabletLastCache(insertTabletNode); + tryToUpdateInsertTabletLastCache(insertTabletNode, results); PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost( System.nanoTime() - startTime); } @@ -1261,13 +1261,13 @@ private boolean insertTabletToTsFileProcessor( return true; } - TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); - if (tsFileProcessor == null) { + final TsFileProcessor tsFileProcessor; + try { + tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); + } catch (WriteProcessException e) { + final TSStatus failureStatus = RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); for (int i = start; i < end; i++) { - results[i] = - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, - "can not create TsFileProcessor, timePartitionId: " + timePartitionId); + results[i] = failureStatus; } return false; } @@ -1289,8 +1289,9 @@ private boolean insertTabletToTsFileProcessor( return true; } - private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) { - node.updateLastCache(getDatabaseName()); + private void tryToUpdateInsertTabletLastCache( + final InsertTabletNode node, final TSStatus[] results) { + node.updateLastCache(getDatabaseName(), results); } private TsFileProcessor insertToTsFileProcessor( @@ -1300,9 +1301,6 @@ private TsFileProcessor insertToTsFileProcessor( return null; } TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); - if (tsFileProcessor == null) { - return null; - } long[] costsForMetrics = new long[4]; tsFileProcessor.insert(insertRowNode, costsForMetrics); PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); @@ -1325,9 +1323,11 @@ private List insertToTsFileProcessors( if (insertRowNode.allMeasurementFailed()) { continue; } - TsFileProcessor tsFileProcessor = - getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]); - if (tsFileProcessor == null) { + final TsFileProcessor tsFileProcessor; + try { + tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]); + } catch (WriteProcessException e) { + insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); continue; } int finalI = i; @@ -1335,43 +1335,28 @@ private List insertToTsFileProcessors( tsFileProcessor, (k, v) -> { if (v == null) { - v = new InsertRowsNode(insertRowsNode.getPlanNodeId()); - v.setSearchIndex(insertRowNode.getSearchIndex()); - v.setAligned(insertRowNode.isAligned()); - if (insertRowNode.isGeneratedByPipe()) { - v.markAsGeneratedByPipe(); - } - if (insertRowNode.isGeneratedByRemoteConsensusLeader()) { - v.markAsGeneratedByRemoteConsensusLeader(); - } + v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode); } - if (v.isAligned() != insertRowNode.isAligned()) { - v.setMixingAlignment(true); - } - v.addOneInsertRowNode(insertRowNode, finalI); - v.updateProgressIndex(insertRowNode.getProgressIndex()); + appendInsertRowNode(v, insertRowNode, finalI); return v; }); } List executedInsertRowNodeList = new ArrayList<>(); for (Map.Entry entry : tsFileProcessorMap.entrySet()) { - TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); + List insertedProcessors = + insertRowsWithTypeConsistencyCheck(entry.getKey(), subInsertRowsNode, costsForMetrics); + executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList()); + for (TsFileProcessor tsFileProcessor : insertedProcessors) { + // check memtable size and may asyncTryToFlush the work memtable + if (tsFileProcessor.shouldFlush()) { + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + } + } } catch (WriteProcessException e) { - insertRowsNode - .getResults() - .put( - subInsertRowsNode.getInsertRowNodeIndexList().get(0), - RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); - } - executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList()); - - // check memtable size and may asyncTryToFlush the work memtable - if (entry.getKey().shouldFlush()) { - fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e); } } @@ -1382,6 +1367,127 @@ private List insertToTsFileProcessors( return executedInsertRowNodeList; } + private List insertRowsWithTypeConsistencyCheck( + TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, long[] costsForMetrics) + throws WriteProcessException { + try { + tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics); + return Collections.singletonList(tsFileProcessor); + } catch (DataTypeInconsistentException e) { + InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0); + long timePartitionId = TimePartitionUtils.getTimePartitionId(firstRow.getTime()); + // flush both MemTables so that the new type can be inserted into a new MemTable + flushWorkingProcessorsForTimePartition(timePartitionId); + return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId, costsForMetrics); + } + } + + private InsertRowsNode createGroupedInsertRowsNode( + final InsertRowsNode sourceInsertRowsNode, final InsertRowNode firstInsertRowNode) { + final InsertRowsNode groupedInsertRowsNode = sourceInsertRowsNode.emptyClone(); + initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode); + return groupedInsertRowsNode; + } + + private InsertRowsNode createGroupedInsertRowsNode( + final InsertRowsOfOneDeviceNode sourceInsertRowsNode, + final InsertRowNode firstInsertRowNode) { + final InsertRowsNode groupedInsertRowsNode = + new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId()); + initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode); + return groupedInsertRowsNode; + } + + private void initializeGroupedInsertRowsNode( + final InsertRowsNode groupedInsertRowsNode, final InsertRowNode firstInsertRowNode) { + groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex()); + groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned()); + if (firstInsertRowNode.isGeneratedByPipe()) { + groupedInsertRowsNode.markAsGeneratedByPipe(); + } + if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) { + groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader(); + } + } + + private void appendInsertRowNode( + final InsertRowsNode groupedInsertRowsNode, + final InsertRowNode insertRowNode, + final int insertRowNodeIndex) { + if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) { + groupedInsertRowsNode.setMixingAlignment(true); + } + groupedInsertRowsNode.addOneInsertRowNode(insertRowNode, insertRowNodeIndex); + groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex()); + } + + private void flushWorkingProcessorsForTimePartition(final long timePartitionId) { + TsFileProcessor workSequenceProcessor = workSequenceTsFileProcessors.get(timePartitionId); + if (workSequenceProcessor != null) { + fileFlushPolicy.apply(this, workSequenceProcessor, workSequenceProcessor.isSequence()); + } + TsFileProcessor workUnsequenceProcessor = workUnsequenceTsFileProcessors.get(timePartitionId); + if (workUnsequenceProcessor != null) { + fileFlushPolicy.apply(this, workUnsequenceProcessor, workUnsequenceProcessor.isSequence()); + } + } + + private List retryInsertRowsAfterFlush( + final InsertRowsNode subInsertRowsNode, + final long timePartitionId, + final long[] costsForMetrics) + throws WriteProcessException { + final Map retriedProcessorMap = new HashMap<>(); + for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) { + final InsertRowNode insertRowNode = subInsertRowsNode.getInsertRowNodeList().get(i); + final boolean isSequence = + config.isEnableSeparateData() + && insertRowNode.getTime() + > lastFlushTimeMap.getFlushedTime(timePartitionId, insertRowNode.getDeviceID()); + final TsFileProcessor retriedProcessor = + getOrCreateTsFileProcessor(timePartitionId, isSequence); + final int insertRowNodeIndex = subInsertRowsNode.getInsertRowNodeIndexList().get(i); + retriedProcessorMap.compute( + retriedProcessor, + (k, v) -> { + if (v == null) { + v = createGroupedInsertRowsNode(subInsertRowsNode, insertRowNode); + } + appendInsertRowNode(v, insertRowNode, insertRowNodeIndex); + return v; + }); + } + + final List insertedProcessors = new ArrayList<>(retriedProcessorMap.size()); + for (Entry retriedEntry : retriedProcessorMap.entrySet()) { + final TsFileProcessor retriedProcessor = retriedEntry.getKey(); + retriedProcessor.insertRows(retriedEntry.getValue(), costsForMetrics); + insertedProcessors.add(retriedProcessor); + } + return insertedProcessors; + } + + private void recordInsertRowsFailure( + final InsertRowsNode sourceInsertRowsNode, + final InsertRowsNode failedInsertRowsNode, + final WriteProcessException exception) { + final TSStatus failureStatus = + RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage()); + for (Integer failedIndex : failedInsertRowsNode.getInsertRowNodeIndexList()) { + sourceInsertRowsNode.getResults().put(failedIndex, failureStatus); + } + } + + private void recordInsertRowsFailure( + final InsertRowsOfOneDeviceNode sourceInsertRowsNode, + final InsertRowsNode failedInsertRowsNode, + final WriteProcessException exception) { + final TSStatus failureStatus = + RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage()); + for (Integer failedIndex : failedInsertRowsNode.getInsertRowNodeIndexList()) { + sourceInsertRowsNode.getResults().put(failedIndex, failureStatus); + } + } private void tryToUpdateInsertRowsLastCache(List nodeList) { for (InsertRowNode node : nodeList) { node.updateLastCache(databaseName); @@ -1440,7 +1546,8 @@ public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) { } } - private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) { + protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) + throws WriteProcessException { TsFileProcessor tsFileProcessor = null; int retryCnt = 0; do { @@ -1466,7 +1573,7 @@ private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean seq "disk space is insufficient when creating TsFile processor, change system mode to read-only", e); CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); - break; + throw new WriteProcessException(e.getMessage(), e.getErrorCode(), true); } catch (IOException e) { if (retryCnt < 3) { logger.warn("meet IOException when creating TsFileProcessor, retry it again", e); @@ -1475,11 +1582,15 @@ private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean seq logger.error( "meet IOException when creating TsFileProcessor, change system mode to error", e); CommonDescriptor.getInstance().getConfig().handleUnrecoverableError(); - break; + throw new WriteProcessException( + String.format( + "Failed to create TsFileProcessor for database %s, timePartitionId %s", + databaseName, timeRangeId), + e); } } catch (ExceedQuotaException e) { logger.error(e.getMessage()); - break; + throw new WriteProcessException(e.getMessage(), e.getErrorCode(), true); } } while (tsFileProcessor == null); return tsFileProcessor; @@ -3663,8 +3774,13 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) config.isEnableSeparateData() && insertRowNode.getTime() > lastFlushTimeMap.getFlushedTime(timePartitionId, insertRowNode.getDeviceID()); - TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, isSequence); - if (tsFileProcessor == null) { + final TsFileProcessor tsFileProcessor; + try { + tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, isSequence); + } catch (WriteProcessException e) { + insertRowsOfOneDeviceNode + .getResults() + .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); continue; } int finalI = i; @@ -3672,39 +3788,27 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) tsFileProcessor, (k, v) -> { if (v == null) { - v = new InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId()); - v.setSearchIndex(insertRowNode.getSearchIndex()); - v.setAligned(insertRowNode.isAligned()); - if (insertRowNode.isGeneratedByPipe()) { - v.markAsGeneratedByPipe(); - } - if (insertRowNode.isGeneratedByRemoteConsensusLeader()) { - v.markAsGeneratedByRemoteConsensusLeader(); - } + v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode, insertRowNode); } - v.addOneInsertRowNode(insertRowNode, finalI); - v.updateProgressIndex(insertRowNode.getProgressIndex()); + appendInsertRowNode(v, insertRowNode, finalI); return v; }); } List executedInsertRowNodeList = new ArrayList<>(); for (Map.Entry entry : tsFileProcessorMap.entrySet()) { - TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { - tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); + List insertedProcessors = + insertRowsWithTypeConsistencyCheck(entry.getKey(), subInsertRowsNode, costsForMetrics); + executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList()); + for (TsFileProcessor tsFileProcessor : insertedProcessors) { + // check memtable size and may asyncTryToFlush the work memtable + if (tsFileProcessor.shouldFlush()) { + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + } + } } catch (WriteProcessException e) { - insertRowsOfOneDeviceNode - .getResults() - .put( - subInsertRowsNode.getInsertRowNodeIndexList().get(0), - RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); - } - executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList()); - - // check memtable size and may asyncTryToFlush the work memtable - if (tsFileProcessor.shouldFlush()) { - fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + recordInsertRowsFailure(insertRowsOfOneDeviceNode, subInsertRowsNode, e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java index 73fca57a1fd69..45c6d6f86cfc8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java @@ -41,7 +41,7 @@ public SubscriptionTaskSinkStage( @Override protected void registerSubtask() { - this.connectorSubtaskId = + this.sinkSubtaskId = SubscriptionSinkSubtaskManager.instance() .register( executor.get(), @@ -56,22 +56,21 @@ public void createSubtask() throws PipeException { @Override public void startSubtask() throws PipeException { - SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId); + SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId); } @Override public void stopSubtask() throws PipeException { - SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId); + SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId); } @Override public void dropSubtask() throws PipeException { SubscriptionSinkSubtaskManager.instance() - .deregister(pipeName, creationTime, regionId, connectorSubtaskId); + .deregister(pipeName, creationTime, regionId, sinkSubtaskId); } public UnboundedBlockingPendingQueue getPipeSinkPendingQueue() { - return SubscriptionSinkSubtaskManager.instance() - .getPipeConnectorPendingQueue(connectorSubtaskId); + return SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index 5082b8fc4f09a..8eab3e6be49c7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.DataRegionId; @@ -27,14 +28,17 @@ import org.apache.iotdb.commons.exception.ShutdownException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -62,6 +66,8 @@ import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.PlainDeviceID; @@ -78,6 +84,7 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +94,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode; +import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; public class DataRegionTest { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -126,6 +142,7 @@ public void tearDown() throws Exception { dataRegion.syncDeleteDataFiles(); StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0)); } + DataNodeSchemaCache.getInstance().cleanUp(); EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR); CompactionTaskManager.getInstance().stop(); EnvironmentUtils.cleanEnv(); @@ -975,6 +992,189 @@ public void testInsertUnSequenceRows() dataRegion1.syncDeleteDataFiles(); } + @Test + public void testInsertRowPropagatesTsFileProcessorCreationFailure() + throws IllegalPathException, DataRegionException, TsFileProcessorException { + final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, "root.fail_row"); + dataRegion1.setTsFileProcessorSupplier( + (timePartitionId, sequence) -> { + throw new WriteProcessRejectException("mock creation failure"); + }); + + final TSRecord record = new TSRecord(1, "root.fail_row"); + record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1))); + final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record); + + try { + dataRegion1.insert(insertRowNode); + Assert.fail("Expected WriteProcessRejectException"); + } catch (WriteProcessRejectException e) { + Assert.assertTrue(e.getMessage().contains("mock creation failure")); + } catch (WriteProcessException e) { + Assert.fail("Expected WriteProcessRejectException but got " + e.getClass().getSimpleName()); + } finally { + dataRegion1.syncDeleteDataFiles(); + } + } + + @Test + public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws Exception { + final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, "root.fail_rows"); + final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class); + Mockito.doThrow(new WriteProcessException("mock insert rows failure")) + .when(processor) + .insertRows(any(InsertRowsNode.class), any(long[].class)); + Mockito.when(processor.shouldFlush()).thenReturn(false); + Mockito.when(processor.isSequence()).thenReturn(true); + dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> processor); + + final List indexList = Arrays.asList(0, 1); + final List nodes = new ArrayList<>(); + for (long time : new long[] {1, 2}) { + final TSRecord record = new TSRecord(time, "root.fail_rows"); + record.addTuple( + DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(time))); + nodes.add(buildInsertRowNodeByTSRecord(record)); + } + final InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""), indexList, nodes); + + try { + dataRegion1.insert(insertRowsNode); + Assert.fail("Expected BatchProcessException"); + } catch (BatchProcessException e) { + Assert.assertEquals(2, insertRowsNode.getResults().size()); + Assert.assertEquals( + TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(), + insertRowsNode.getResults().get(0).getCode()); + Assert.assertEquals( + TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(), + insertRowsNode.getResults().get(1).getCode()); + } finally { + dataRegion1.syncDeleteDataFiles(); + } + } + + @Test + public void testInsertRowsLastCacheSkipsFailedRows() throws Exception { + final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable(); + COMMON_CONFIG.setLastCacheEnable(true); + + final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, "root.cache_rows"); + final TsFileProcessor successProcessor = Mockito.mock(TsFileProcessor.class); + Mockito.when(successProcessor.shouldFlush()).thenReturn(false); + Mockito.when(successProcessor.isSequence()).thenReturn(true); + final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1; + final long failingPartitionId = TimePartitionUtils.getTimePartitionId(failingTime); + dataRegion1.setTsFileProcessorSupplier( + (timePartitionId, sequence) -> { + if (timePartitionId == failingPartitionId) { + throw new WriteProcessException("mock row failure"); + } + return successProcessor; + }); + + final MeasurementPath lastCachePath = + new MeasurementPath( + "root.cache_rows", + measurementId, + new MeasurementSchema( + measurementId, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED)); + DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(), lastCachePath); + + final List indexList = Arrays.asList(0, 1); + final List nodes = new ArrayList<>(); + final long[] times = new long[] {1, failingTime}; + final int[] values = new int[] {10, 20}; + for (int i = 0; i < times.length; i++) { + final long time = times[i]; + final TSRecord record = new TSRecord(time, "root.cache_rows"); + record.addTuple( + DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(values[i]))); + nodes.add(buildInsertRowNodeByTSRecord(record)); + } + final InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""), indexList, nodes); + + try { + dataRegion1.insert(insertRowsNode); + Assert.fail("Expected BatchProcessException"); + } catch (BatchProcessException e) { + final TimeValuePair lastCache = + DataNodeSchemaCache.getInstance().getLastCache(lastCachePath); + Assert.assertNotNull(lastCache); + Assert.assertEquals(1, lastCache.getTimestamp()); + Assert.assertEquals(10, lastCache.getValue().getInt()); + } finally { + dataRegion1.syncDeleteDataFiles(); + COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable); + } + } + + @Test + public void testInsertTabletLastCacheSkipsFailedRows() throws Exception { + final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable(); + COMMON_CONFIG.setLastCacheEnable(true); + + final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, "root.cache_tablet"); + final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class); + Mockito.doAnswer( + invocation -> { + TSStatus[] results = invocation.getArgument(3); + results[0] = RpcUtils.SUCCESS_STATUS; + results[1] = + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), "mock row failure"); + throw new WriteProcessException("mock tablet failure"); + }) + .when(processor) + .insertTablet(any(InsertTabletNode.class), anyInt(), anyInt(), any(TSStatus[].class)); + Mockito.when(processor.shouldFlush()).thenReturn(false); + Mockito.when(processor.isSequence()).thenReturn(true); + dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> processor); + + final MeasurementPath lastCachePath = + new MeasurementPath( + "root.cache_tablet", + measurementId, + new MeasurementSchema( + measurementId, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED)); + DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(), lastCachePath); + + final String[] measurements = new String[] {measurementId}; + final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32}; + final MeasurementSchema[] measurementSchemas = + new MeasurementSchema[] { + new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN) + }; + final long[] times = new long[] {1, 2}; + final Object[] columns = new Object[] {new int[] {10, 20}}; + final InsertTabletNode insertTabletNode = + new InsertTabletNode( + new QueryId("test_write").genPlanNodeId(), + new PartialPath("root.cache_tablet"), + false, + measurements, + dataTypes, + measurementSchemas, + times, + null, + columns, + times.length); + + try { + dataRegion1.insertTablet(insertTabletNode); + Assert.fail("Expected BatchProcessException"); + } catch (BatchProcessException e) { + final TimeValuePair lastCache = + DataNodeSchemaCache.getInstance().getLastCache(lastCachePath); + Assert.assertNotNull(lastCache); + Assert.assertEquals(1, lastCache.getTimestamp()); + Assert.assertEquals(10, lastCache.getValue().getInt()); + } finally { + dataRegion1.syncDeleteDataFiles(); + COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable); + } + } + @Test public void testSmallReportProportionInsertRow() throws WriteProcessException, @@ -1490,6 +1690,32 @@ public DummyDataRegion(String systemInfoDir, String storageGroupName) } } + private interface TsFileProcessorSupplier { + TsFileProcessor get(long timePartitionId, boolean sequence) throws WriteProcessException; + } + + private static class HookedDataRegion extends DummyDataRegion { + private TsFileProcessorSupplier tsFileProcessorSupplier; + + private HookedDataRegion(String systemInfoDir, String storageGroupName) + throws DataRegionException { + super(systemInfoDir, storageGroupName); + } + + private void setTsFileProcessorSupplier(TsFileProcessorSupplier tsFileProcessorSupplier) { + this.tsFileProcessorSupplier = tsFileProcessorSupplier; + } + + @Override + protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) + throws WriteProcessException { + if (tsFileProcessorSupplier != null) { + return tsFileProcessorSupplier.get(timeRangeId, sequence); + } + return super.getOrCreateTsFileProcessor(timeRangeId, sequence); + } + } + // -- test for deleting data directly // -- delete data and file only when: // 1. tsfile is closed diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index 3ff47a2c5e1d6..46280ede66cb1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -284,7 +284,7 @@ public GenericKeyedObjectPool cre .setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled()) .setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber()) .build(), - ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()), + ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber()) .build() @@ -310,7 +310,7 @@ public GenericKeyedObjectPool cre .setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber()) .setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException()) .build(), - ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()), + ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber()) .build() diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 696b6b8ce07fb..9a8beac5a4380 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -129,10 +129,10 @@ public enum ThreadName { GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"), GROUP_MANAGEMENT("groupManagement"), // -------------------------- Compute -------------------------- - PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"), + PIPE_SOURCE_DISRUPTOR("Pipe-Source-Disruptor"), PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"), - PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"), PIPE_CONSENSUS_EXECUTOR_POOL("Pipe-Consensus-Executor-Pool"), + PIPE_SINK_EXECUTOR_POOL("Pipe-Sink-Executor-Pool"), PIPE_CONFIGNODE_EXECUTOR_POOL("Pipe-ConfigNode-Executor-Pool"), PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"), PIPE_TSFILE_ASYNC_SEND_POOL("Pipe-TsFile-Async-Send-Pool"), @@ -142,7 +142,7 @@ public enum ThreadName { PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"), PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER( "Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"), - PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"), + PIPE_ASYNC_SINK_CLIENT_POOL("Pipe-Async-Sink-Client-Pool"), PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"), PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"), PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"), @@ -289,9 +289,9 @@ public enum ThreadName { private static final Set computeThreadNames = new HashSet<>( Arrays.asList( - PIPE_EXTRACTOR_DISRUPTOR, + PIPE_SOURCE_DISRUPTOR, PIPE_PROCESSOR_EXECUTOR_POOL, - PIPE_CONNECTOR_EXECUTOR_POOL, + PIPE_SINK_EXECUTOR_POOL, PIPE_CONSENSUS_EXECUTOR_POOL, PIPE_CONFIGNODE_EXECUTOR_POOL, PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL, @@ -300,7 +300,7 @@ public enum ThreadName { PIPE_RUNTIME_PROCEDURE_SUBMITTER, PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR, PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER, - PIPE_ASYNC_CONNECTOR_CLIENT_POOL, + PIPE_ASYNC_SINK_CLIENT_POOL, PIPE_RECEIVER_AIR_GAP_AGENT, PIPE_AIR_GAP_RECEIVER, PIPE_PARALLEL_EXECUTION_POOL, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ec9ce06fd1d41..3b7229927d1f4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -277,11 +277,11 @@ public class CommonConfig { private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20; private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30; private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500; - private int pipeAsyncConnectorSelectorNumber = + private int pipeAsyncSinkSelectorNumber = Math.max(4, Runtime.getRuntime().availableProcessors() / 2); - private int pipeAsyncConnectorMaxClientNumber = + private int pipeAsyncSinkMaxClientNumber = Math.max(32, Runtime.getRuntime().availableProcessors() * 2); - private int pipeAsyncConnectorMaxTsFileClientNumber = + private int pipeAsyncSinkMaxTsFileClientNumber = Math.max(16, Runtime.getRuntime().availableProcessors()); private boolean printLogWhenEncounterException = false; @@ -289,8 +289,7 @@ public class CommonConfig { private double pipeAllSinksRateLimitBytesPerSecond = -1; private int rateLimiterHotReloadCheckIntervalMs = 1000; - private int pipeConnectorRequestSliceThresholdBytes = - (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8); + private int pipeSinkRequestSliceThresholdBytes = (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8); private boolean isSeperatedPipeHeartbeatEnabled = true; private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3; @@ -1002,7 +1001,7 @@ public int getPipeSinkHandshakeTimeoutMs() { } public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) { - final int fPipeConnectorHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs; + final int fPipeSinkHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs; try { this.pipeSinkHandshakeTimeoutMs = Math.toIntExact(pipeSinkHandshakeTimeoutMs); } catch (ArithmeticException e) { @@ -1010,7 +1009,7 @@ public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) { logger.warn( "Given pipe connector handshake timeout is too large, set to {} ms.", Integer.MAX_VALUE); } finally { - if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) { + if (fPipeSinkHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) { logger.info("pipeSinkHandshakeTimeoutMs is set to {}.", this.pipeSinkHandshakeTimeoutMs); } } @@ -1041,16 +1040,16 @@ public int getPipeSinkTransferTimeoutMs() { } public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) { - final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs; + final int fPipeSinkTransferTimeoutMs = this.pipeSinkTransferTimeoutMs; try { this.pipeSinkTransferTimeoutMs = Math.toIntExact(pipeSinkTransferTimeoutMs); } catch (ArithmeticException e) { this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE; logger.warn( - "Given pipe connector transfer timeout is too large, set to {} ms.", Integer.MAX_VALUE); + "Given pipe sink transfer timeout is too large, set to {} ms.", Integer.MAX_VALUE); } finally { - if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) { - logger.info("pipeConnectorTransferTimeoutMs is set to {}.", pipeSinkTransferTimeoutMs); + if (fPipeSinkTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) { + logger.info("pipeSinkTransferTimeoutMs is set to {}.", pipeSinkTransferTimeoutMs); } } } @@ -1064,7 +1063,7 @@ public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) { return; } this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize; - logger.info("pipeConnectorReadFileBufferSize is set to {}.", pipeSinkReadFileBufferSize); + logger.info("pipeSinkReadFileBufferSize is set to {}.", pipeSinkReadFileBufferSize); } public boolean isPipeSinkReadFileBufferMemoryControlEnabled() { @@ -1164,60 +1163,58 @@ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() { } public int getPipeAsyncSinkSelectorNumber() { - return pipeAsyncConnectorSelectorNumber; + return pipeAsyncSinkSelectorNumber; } - public void setPipeAsyncConnectorSelectorNumber(int pipeAsyncConnectorSelectorNumber) { - if (pipeAsyncConnectorSelectorNumber <= 0) { + public void setPipeAsyncSinkSelectorNumber(int pipeAsyncSinkSelectorNumber) { + if (pipeAsyncSinkSelectorNumber <= 0) { logger.info( - "pipeAsyncConnectorSelectorNumber should be greater than 0, configuring it not to change."); + "pipeAsyncSinkSelectorNumber should be greater than 0, configuring it not to change."); return; } - pipeAsyncConnectorSelectorNumber = Math.max(4, pipeAsyncConnectorSelectorNumber); - if (this.pipeAsyncConnectorSelectorNumber == pipeAsyncConnectorSelectorNumber) { + pipeAsyncSinkSelectorNumber = Math.max(4, pipeAsyncSinkSelectorNumber); + if (this.pipeAsyncSinkSelectorNumber == pipeAsyncSinkSelectorNumber) { return; } - this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber; - logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", pipeAsyncConnectorSelectorNumber); + this.pipeAsyncSinkSelectorNumber = pipeAsyncSinkSelectorNumber; + logger.info("pipeAsyncSinkSelectorNumber is set to {}.", pipeAsyncSinkSelectorNumber); } public int getPipeAsyncSinkMaxClientNumber() { - return pipeAsyncConnectorMaxClientNumber; + return pipeAsyncSinkMaxClientNumber; } - public void setPipeAsyncConnectorMaxClientNumber(int pipeAsyncConnectorMaxClientNumber) { - if (pipeAsyncConnectorMaxClientNumber <= 0) { + public void setPipeAsyncSinkMaxClientNumber(int pipeAsyncSinkMaxClientNumber) { + if (pipeAsyncSinkMaxClientNumber <= 0) { logger.info( - " pipeAsyncConnectorMaxClientNumber should be greater than 0, configuring it not to change."); + " pipeAsyncSinkMaxClientNumber should be greater than 0, configuring it not to change."); return; } - pipeAsyncConnectorMaxClientNumber = Math.max(32, pipeAsyncConnectorMaxClientNumber); - if (this.pipeAsyncConnectorMaxClientNumber == pipeAsyncConnectorMaxClientNumber) { + pipeAsyncSinkMaxClientNumber = Math.max(32, pipeAsyncSinkMaxClientNumber); + if (this.pipeAsyncSinkMaxClientNumber == pipeAsyncSinkMaxClientNumber) { return; } - this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber; - logger.info( - "pipeAsyncConnectorMaxClientNumber is set to {}.", pipeAsyncConnectorMaxClientNumber); + this.pipeAsyncSinkMaxClientNumber = pipeAsyncSinkMaxClientNumber; + logger.info("pipeAsyncSinkMaxClientNumber is set to {}.", pipeAsyncSinkMaxClientNumber); } public int getPipeAsyncSinkMaxTsFileClientNumber() { - return pipeAsyncConnectorMaxTsFileClientNumber; + return pipeAsyncSinkMaxTsFileClientNumber; } - public void setPipeAsyncConnectorMaxTsFileClientNumber( - int pipeAsyncConnectorMaxTsFileClientNumber) { - if (pipeAsyncConnectorMaxTsFileClientNumber <= 0) { + public void setPipeAsyncSinkMaxTsFileClientNumber(int pipeAsyncSinkMaxTsFileClientNumber) { + if (pipeAsyncSinkMaxTsFileClientNumber <= 0) { logger.info( - "pipeAsyncConnectorMaxTsFileClientNumber should be greater than 0, configuring it not to change."); + "pipeAsyncSinkMaxTsFileClientNumber should be greater than 0, configuring it not to change."); return; } - pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16, pipeAsyncConnectorMaxTsFileClientNumber); - if (this.pipeAsyncConnectorMaxTsFileClientNumber == pipeAsyncConnectorMaxTsFileClientNumber) { + pipeAsyncSinkMaxTsFileClientNumber = Math.max(16, pipeAsyncSinkMaxTsFileClientNumber); + if (this.pipeAsyncSinkMaxTsFileClientNumber == pipeAsyncSinkMaxTsFileClientNumber) { return; } - this.pipeAsyncConnectorMaxTsFileClientNumber = pipeAsyncConnectorMaxTsFileClientNumber; + this.pipeAsyncSinkMaxTsFileClientNumber = pipeAsyncSinkMaxTsFileClientNumber; logger.info( - "pipeAsyncConnectorMaxClientNumber is set to {}.", pipeAsyncConnectorMaxTsFileClientNumber); + "pipeAsyncSinkMaxTsFileClientNumber is set to {}.", pipeAsyncSinkMaxTsFileClientNumber); } public boolean isPrintLogWhenEncounterException() { @@ -1321,12 +1318,12 @@ public long getPipeSinkRetryIntervalMs() { return pipeSinkRetryIntervalMs; } - public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) { - if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) { + public void setPipeSinkRetryIntervalMs(long pipeSinkRetryIntervalMs) { + if (this.pipeSinkRetryIntervalMs == pipeSinkRetryIntervalMs) { return; } - this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs; - logger.info("pipeSinkRetryIntervalMs is set to {}", pipeConnectorRetryIntervalMs); + this.pipeSinkRetryIntervalMs = pipeSinkRetryIntervalMs; + logger.info("pipeSinkRetryIntervalMs is set to {}", pipeSinkRetryIntervalMs); } public boolean isPipeSinkRetryLocallyForConnectionError() { @@ -2127,18 +2124,16 @@ public void setRateLimiterHotReloadCheckIntervalMs(int rateLimiterHotReloadCheck } public int getPipeSinkRequestSliceThresholdBytes() { - return pipeConnectorRequestSliceThresholdBytes; + return pipeSinkRequestSliceThresholdBytes; } - public void setPipeConnectorRequestSliceThresholdBytes( - int pipeConnectorRequestSliceThresholdBytes) { - if (this.pipeConnectorRequestSliceThresholdBytes == pipeConnectorRequestSliceThresholdBytes) { + public void setPipeSinkRequestSliceThresholdBytes(int pipeSinkRequestSliceThresholdBytes) { + if (this.pipeSinkRequestSliceThresholdBytes == pipeSinkRequestSliceThresholdBytes) { return; } - this.pipeConnectorRequestSliceThresholdBytes = pipeConnectorRequestSliceThresholdBytes; + this.pipeSinkRequestSliceThresholdBytes = pipeSinkRequestSliceThresholdBytes; logger.info( - "pipeConnectorRequestSliceThresholdBytes is set to {}", - pipeConnectorRequestSliceThresholdBytes); + "pipeConnectorRequestSliceThresholdBytes is set to {}", pipeSinkRequestSliceThresholdBytes); } public long getTwoStageAggregateMaxCombinerLiveTimeInMs() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 02284671803d5..83db711e5ec63 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -443,7 +443,7 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr "rate_limiter_hot_reload_check_interval_ms", String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs())))); - config.setPipeConnectorRequestSliceThresholdBytes( + config.setPipeSinkRequestSliceThresholdBytes( Integer.parseInt( properties.getProperty( "pipe_connector_request_slice_threshold_bytes", @@ -613,7 +613,7 @@ public static void loadPipeExternalConfig( "pipe_async_connector_selector_number", isHotModify); if (value != null) { - config.setPipeAsyncConnectorSelectorNumber(Integer.parseInt(value)); + config.setPipeAsyncSinkSelectorNumber(Integer.parseInt(value)); } value = @@ -623,7 +623,7 @@ public static void loadPipeExternalConfig( "pipe_async_connector_max_client_number", isHotModify); if (value != null) { - config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value)); + config.setPipeAsyncSinkMaxClientNumber(Integer.parseInt(value)); } value = @@ -633,7 +633,7 @@ public static void loadPipeExternalConfig( "pipe_async_connector_max_tsfile_client_number", isHotModify); if (value != null) { - config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value)); + config.setPipeAsyncSinkMaxTsFileClientNumber(Integer.parseInt(value)); } value = From 05b61214a4c89ee12e79c7029c4dd2f013f406ca Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 16:09:59 +0800 Subject: [PATCH 2/4] fix-bug --- .../task/stage/PipeTaskProcessorStage.java | 6 +- .../plan/node/write/InsertTabletNode.java | 77 +------------------ .../storageengine/dataregion/DataRegion.java | 4 +- .../dataregion/DataRegionTest.java | 20 ++--- 4 files changed, 13 insertions(+), 94 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java index 2809ec9ec93fa..c5f58a248e265 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java @@ -101,11 +101,7 @@ public PipeTaskProcessorStage( final String taskId = pipeName + "_" + regionId + "_" + creationTime; final PipeEventCollector pipeSinkOutputEventCollector = new PipeEventCollector( - pipeSinkOutputPendingQueue, - creationTime, - regionId, - forceTabletFormat, - skipParsing); + pipeSinkOutputPendingQueue, creationTime, regionId, forceTabletFormat, skipParsing); this.pipeProcessorSubtask = new PipeProcessorSubtask( taskId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 1deb5515eb1e8..e4da2ea41242e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; @@ -37,6 +38,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.QueryDataSetUtils; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.NotImplementedException; @@ -1170,81 +1172,6 @@ private TimeValuePair composeTimeValuePair(final int measurementIndex, final int return new TimeValuePair(times[rowIndex], value); } - public IDeviceID getDeviceID(int rowIdx) { - if (deviceID != null) { - return deviceID; - } - deviceID = DeviceIDFactory.getInstance().getDeviceID(targetPath); - return deviceID; - } - - protected static class PartitionSplitInfo { - - // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ... - List ranges = new ArrayList<>(); - List timePartitionSlots = new ArrayList<>(); - List replicaSets; - } - - /** - * Split the tablet of the given range according to Table deviceID. - * - * @param start inclusive - * @param end exclusive - * @return each the number in the pair is the end offset of a device - */ - public List> splitByDevice(int start, int end) { - return Collections.singletonList(new Pair<>(getDeviceID(), end)); - } - - /** - * @param results insertion result of each row - * @param ttl the ttl - * @return the position of the first alive row - * @throws OutOfTTLException if all rows have expired the TTL - */ - public int checkTTL(TSStatus[] results, long ttl) throws OutOfTTLException { - return checkTTLInternal(results, ttl, true); - } - - protected int checkTTLInternal(TSStatus[] results, long ttl, boolean breakOnFirstAlive) - throws OutOfTTLException { - - /* - * assume that batch has been sorted by client - */ - int loc = 0; - int firstAliveLoc = -1; - while (loc < getRowCount()) { - long currTime = getTimes()[loc]; - // skip points that do not satisfy TTL - if (!isAlive(currTime, ttl)) { - results[loc] = - RpcUtils.getStatus( - TSStatusCode.OUT_OF_TTL, - String.format( - "Insertion time [%s] is less than ttl time bound [%s]", - DateTimeUtils.convertLongToDate(currTime), - DateTimeUtils.convertLongToDate(CommonDateTimeUtils.currentTime() - ttl))); - } else { - if (firstAliveLoc == -1) { - firstAliveLoc = loc; - } - if (breakOnFirstAlive) { - break; - } - } - loc++; - } - - if (firstAliveLoc == -1) { - // no alive data - throw new OutOfTTLException( - getTimes()[getTimes().length - 1], (CommonDateTimeUtils.currentTime() - ttl)); - } - return firstAliveLoc; - } - public void updateLastCache(final String databaseName) { updateLastCache(databaseName, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 791d01f970911..35d4d427209b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1488,6 +1488,7 @@ private void recordInsertRowsFailure( sourceInsertRowsNode.getResults().put(failedIndex, failureStatus); } } + private void tryToUpdateInsertRowsLastCache(List nodeList) { for (InsertRowNode node : nodeList) { node.updateLastCache(databaseName); @@ -3799,7 +3800,8 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) InsertRowsNode subInsertRowsNode = entry.getValue(); try { List insertedProcessors = - insertRowsWithTypeConsistencyCheck(entry.getKey(), subInsertRowsNode, costsForMetrics); + insertRowsWithTypeConsistencyCheck( + entry.getKey(), subInsertRowsNode, costsForMetrics); executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList()); for (TsFileProcessor tsFileProcessor : insertedProcessors) { // check memtable size and may asyncTryToFlush the work memtable diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index 8eab3e6be49c7..a4df661449d03 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -38,8 +38,8 @@ import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.QueryId; -import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; @@ -94,13 +94,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode; -import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -1079,7 +1073,8 @@ public void testInsertRowsLastCacheSkipsFailedRows() throws Exception { measurementId, new MeasurementSchema( measurementId, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED)); - DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(), lastCachePath); + DataNodeSchemaCache.getInstance() + .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath); final List indexList = Arrays.asList(0, 1); final List nodes = new ArrayList<>(); @@ -1098,8 +1093,7 @@ public void testInsertRowsLastCacheSkipsFailedRows() throws Exception { dataRegion1.insert(insertRowsNode); Assert.fail("Expected BatchProcessException"); } catch (BatchProcessException e) { - final TimeValuePair lastCache = - DataNodeSchemaCache.getInstance().getLastCache(lastCachePath); + final TimeValuePair lastCache = DataNodeSchemaCache.getInstance().getLastCache(lastCachePath); Assert.assertNotNull(lastCache); Assert.assertEquals(1, lastCache.getTimestamp()); Assert.assertEquals(10, lastCache.getValue().getInt()); @@ -1137,7 +1131,8 @@ public void testInsertTabletLastCacheSkipsFailedRows() throws Exception { measurementId, new MeasurementSchema( measurementId, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED)); - DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(), lastCachePath); + DataNodeSchemaCache.getInstance() + .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath); final String[] measurements = new String[] {measurementId}; final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32}; @@ -1164,8 +1159,7 @@ public void testInsertTabletLastCacheSkipsFailedRows() throws Exception { dataRegion1.insertTablet(insertTabletNode); Assert.fail("Expected BatchProcessException"); } catch (BatchProcessException e) { - final TimeValuePair lastCache = - DataNodeSchemaCache.getInstance().getLastCache(lastCachePath); + final TimeValuePair lastCache = DataNodeSchemaCache.getInstance().getLastCache(lastCachePath); Assert.assertNotNull(lastCache); Assert.assertEquals(1, lastCache.getTimestamp()); Assert.assertEquals(10, lastCache.getValue().getInt()); From 66aa32439d1a237803f24337d703f1e9624f1450 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 16:11:36 +0800 Subject: [PATCH 3/4] no-compose --- .../plan/planner/plan/node/write/InsertTabletNode.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index e4da2ea41242e..faacc10eccded 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -1101,10 +1101,6 @@ public TimeValuePair composeLastTimeValuePair( return composeTimeValuePair(measurementIndex, lastIdx); } - public TimeValuePair composeLastTimeValuePair(int measurementIndex) { - return composeLastTimeValuePair(measurementIndex, 0, rowCount); - } - protected TimeValuePair composeLastTimeValuePair( final int measurementIndex, final TSStatus[] results, From 998742e0048699b740519e038c08652400262c3f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 16:35:32 +0800 Subject: [PATCH 4/4] fix-13 --- .../db/storageengine/dataregion/DataRegion.java | 17 +++++------------ .../dataregion/DataRegionTest.java | 2 +- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 35d4d427209b9..d06362ffd3794 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1370,21 +1370,14 @@ private List insertToTsFileProcessors( private List insertRowsWithTypeConsistencyCheck( TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, long[] costsForMetrics) throws WriteProcessException { - try { - tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics); - return Collections.singletonList(tsFileProcessor); - } catch (DataTypeInconsistentException e) { - InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0); - long timePartitionId = TimePartitionUtils.getTimePartitionId(firstRow.getTime()); - // flush both MemTables so that the new type can be inserted into a new MemTable - flushWorkingProcessorsForTimePartition(timePartitionId); - return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId, costsForMetrics); - } + tsFileProcessor.insert(subInsertRowsNode, costsForMetrics); + return Collections.singletonList(tsFileProcessor); } private InsertRowsNode createGroupedInsertRowsNode( final InsertRowsNode sourceInsertRowsNode, final InsertRowNode firstInsertRowNode) { - final InsertRowsNode groupedInsertRowsNode = sourceInsertRowsNode.emptyClone(); + final InsertRowsNode groupedInsertRowsNode = + new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId()); initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode); return groupedInsertRowsNode; } @@ -1461,7 +1454,7 @@ private List retryInsertRowsAfterFlush( final List insertedProcessors = new ArrayList<>(retriedProcessorMap.size()); for (Entry retriedEntry : retriedProcessorMap.entrySet()) { final TsFileProcessor retriedProcessor = retriedEntry.getKey(); - retriedProcessor.insertRows(retriedEntry.getValue(), costsForMetrics); + retriedProcessor.insert(retriedEntry.getValue(), costsForMetrics); insertedProcessors.add(retriedProcessor); } return insertedProcessors; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index a4df661449d03..f7a79eb8bf8e5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -1017,7 +1017,7 @@ public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws Exception { final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class); Mockito.doThrow(new WriteProcessException("mock insert rows failure")) .when(processor) - .insertRows(any(InsertRowsNode.class), any(long[].class)); + .insert(any(InsertRowsNode.class), any(long[].class)); Mockito.when(processor.shouldFlush()).thenReturn(false); Mockito.when(processor.isSequence()).thenReturn(true); dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> processor);