diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java index e037a5493baf..83d817dcca0a 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java +++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java @@ -76,6 +76,11 @@ public void configure(Binder binder) .addBinding(SCHEME) .to(OssTimestampVersionedDataFinder.class) .in(LazySingleton.class); + + // Segment killer, mover and archiver use type "oss_zip" (instead of "oss") to align with OssLoadSpec. + // This means OssDataSegmentKiller cannot currently be injected as DataSegmentKiller + // and would only be invoked via OmniDataSegmentKiller. + // If needed in the future, the OssDataSegmentKiller would need to be bound to the "oss" scheme as well. Binders.dataSegmentKillerBinder(binder) .addBinding(SCHEME_ZIP) .to(OssDataSegmentKiller.class) diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssStorageDruidModuleTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssStorageDruidModuleTest.java index 153a5698cb96..22092a266b4a 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssStorageDruidModuleTest.java +++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssStorageDruidModuleTest.java @@ -24,6 +24,7 @@ import com.google.inject.Injector; import org.apache.druid.guice.ConfigModule; import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.jackson.JacksonModule; import org.apache.druid.segment.loading.OmniDataSegmentArchiver; import org.apache.druid.segment.loading.OmniDataSegmentKiller; @@ -78,6 +79,7 @@ private static Injector createInjector() new DruidGuiceExtensions(), new JacksonModule(), new ConfigModule(), + new LocalDataStorageDruidModule(), new OssStorageDruidModule(), binder -> { final Properties properties = new Properties(); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java index 22da901cf0b7..76b6dcfd1951 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java @@ -32,6 +32,7 @@ import org.apache.druid.guice.DruidGuiceExtensions; import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.jackson.JacksonModule; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.easymock.EasyMock; @@ -288,6 +289,7 @@ private Injector makeInjectorWithProperties(final Properties props) binder.bind(JsonConfigurator.class).in(LazySingleton.class); binder.bind(Properties.class).toInstance(props); }, + new LocalDataStorageDruidModule(), new AzureStorageDruidModule() )); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java index e45877c89f4d..b2185f009f54 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java @@ -20,13 +20,15 @@ package org.apache.druid.storage.google; import com.google.cloud.storage.Storage; -import com.google.common.collect.ImmutableList; import com.google.inject.Injector; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.junit.Assert; import org.junit.Test; +import java.util.List; + public class GoogleStorageDruidModuleTest { @Test @@ -37,7 +39,9 @@ public void testSegmentKillerBoundedSingleton() // HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded // lazily, the exception should end up thrown. // 2. That the same object is returned. - Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule())); + Injector injector = GuiceInjectors.makeStartupInjectorWithModules( + List.of(new LocalDataStorageDruidModule(), new GoogleStorageDruidModule()) + ); OmniDataSegmentKiller killer = injector.getInstance(OmniDataSegmentKiller.class); Assert.assertTrue(killer.getKillers().containsKey(GoogleStorageDruidModule.SCHEME)); Assert.assertSame( @@ -57,7 +61,7 @@ public void testLazyInstantiation() // HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded // lazily, the exception should end up thrown. // 2. That the same object is returned. - Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule())); + Injector injector = GuiceInjectors.makeStartupInjectorWithModules(List.of(new GoogleStorageDruidModule())); final GoogleStorage instance = injector.getInstance(GoogleStorage.class); Assert.assertSame(instance, injector.getInstance(GoogleStorage.class)); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java index a5111d2159c0..ea68ee0bb521 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java @@ -27,6 +27,7 @@ import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -89,6 +90,7 @@ private Injector makeInjectorWithProperties(final Properties props) binder.bind(Properties.class).toInstance(props); binder.bind(ServiceEmitter.class).toInstance(new ServiceEmitter("test", "localhost", new NoopEmitter())); }, + new LocalDataStorageDruidModule(), new HdfsStorageDruidModule() ) ); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index a0c7f9866558..c9c12f377f0d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -97,6 +97,11 @@ public void configure(Binder binder) .addBinding(SCHEME_S3N) .to(S3TimestampVersionedDataFinder.class) .in(LazySingleton.class); + + // Segment killer, mover and archiver use type "s3_zip" (instead of "s3") to align with S3LoadSpec. + // This means S3DataSegmentKiller cannot currently be injected as DataSegmentKiller + // and would only be invoked via OmniDataSegmentKiller. + // If needed in the future, the S3DataSegmentKiller would need to be bound to the "s3" scheme as well. Binders.dataSegmentKillerBinder(binder) .addBinding(SCHEME_S3_ZIP) .to(S3DataSegmentKiller.class) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java index 5d4fc4f188f2..52b1aac94dd1 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java @@ -23,6 +23,7 @@ import com.google.inject.Injector; import org.apache.druid.common.aws.AWSModule; import org.apache.druid.guice.DruidSecondaryModule; +import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.guice.ServerModule; import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.segment.loading.OmniDataSegmentArchiver; @@ -74,6 +75,7 @@ private static Injector createInjector() final Injector startupInjector = new StartupInjectorBuilder().forServer().build(); return Guice.createInjector( startupInjector.getInstance(DruidSecondaryModule.class), + new LocalDataStorageDruidModule(), new AWSModule(), new S3StorageDruidModule(), new ServerModule() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 2764c26c6af6..4f235bfa4f0f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -59,7 +59,6 @@ import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus; -import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -1837,13 +1836,11 @@ private Pair, Map> doGetRowStatsAndUnparseab public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { try { - toolbox.getDataSegmentKiller().killRecursively( - DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(getId()) - ); + toolbox.getIntermediaryDataManager().deletePartitions(getId()); } catch (Exception e) { // Best effort cleanup, do not fail the task if cleanup fails - LOG.warn(e, "Failed recursive deep storage cleanup for intermediary path for task[%s]", getId()); + LOG.warn(e, "Failed cleanup of intermediary data for task[%s]", getId()); } if (!isCompactionTask) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java index 336973cee822..892ea45d5c47 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java @@ -24,6 +24,8 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; @@ -38,6 +40,7 @@ public class DeepStorageIntermediaryDataManager implements IntermediaryDataManag { public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data"; private final DataSegmentPusher dataSegmentPusher; + private final DataSegmentKiller dataSegmentKiller; /** * Deep storage path to the directory that holds all shuffle intermediate files for {@code supervisorTaskId}, @@ -49,10 +52,22 @@ public static String retrieveShuffleDataStoragePath(String supervisorTaskId) return SHUFFLE_DATA_DIR_PREFIX + "/" + supervisorTaskId; } + /** + * Used by Guice to create an instance of {@link DeepStorageIntermediaryDataManager}. + * + * @param dataSegmentPusher Always non-null + * @param dataSegmentKiller Can be null in certain cases such as on MiddleManagers + * when using druid.storage.type=s3 since the respective + * S3DataSegmentKiller uses scheme "s3_zip" instead of "s3" + */ @Inject - public DeepStorageIntermediaryDataManager(DataSegmentPusher dataSegmentPusher) + public DeepStorageIntermediaryDataManager( + DataSegmentPusher dataSegmentPusher, + @Nullable DataSegmentKiller dataSegmentKiller + ) { this.dataSegmentPusher = dataSegmentPusher; + this.dataSegmentKiller = dataSegmentKiller; } @Override @@ -105,22 +120,13 @@ public Optional findPartitionFile(String supervisorTaskId, String su throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec"); } - /** - * Not implemented for deep storage mode. Unlike {@link LocalIntermediaryDataManager}, - * which can walk the local filesystem to find and delete files by supervisorTaskId, - * this manager has no way to discover what files were pushed: it has no - * {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not track pushed - * paths, and runs on short-lived peon processes whose state is lost on exit. - *

- * Deep storage shuffle cleanup is handled in {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanUp} - * via {@link org.apache.druid.segment.loading.DataSegmentKiller#killRecursively} on - * {@link #retrieveShuffleDataStoragePath(String)} (recursive delete of that directory). - */ @Override - public void deletePartitions(String supervisorTaskId) + public void deletePartitions(String supervisorTaskId) throws IOException { - throw new UnsupportedOperationException( - "Deep storage shuffle cleanup is handled by ParallelIndexSupervisorTask, not by the data manager" - ); + if (dataSegmentKiller == null) { + throw new ISE("No instance was bound for the DataSegmentKiller"); + } else { + dataSegmentKiller.killRecursively(retrieveShuffleDataStoragePath(supervisorTaskId)); + } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java index 17f898a6d32d..af7510e1042d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java @@ -75,10 +75,9 @@ public interface IntermediaryDataManager Optional findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId); /** - * Delete the partitions + * Deletes all intermediary partitions for the given supervisor task. * * @param supervisorTaskId - Supervisor task id of the partitions to delete - * */ void deletePartitions(String supervisorTaskId) throws IOException; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java index d37fc88d6a5b..614312521f0b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java @@ -112,6 +112,11 @@ public Response getPartition( } } + /** + * Not directly invoked by any Druid service. + * {@code IntermediaryDataManager.deletePartitions()} is invoked explicitly by + * the {@code ParallelIndexSupervisorTask.cleanup()}. + */ @DELETE @Path("/task/{supervisorTaskId}") public Response deletePartitions(@PathParam("supervisorTaskId") String supervisorTaskId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 19dc225b6c98..753d7fc9a8ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -37,7 +37,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.TuningConfigBuilder; -import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; +import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.rpc.HttpResponseException; @@ -47,7 +47,6 @@ import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec; @@ -503,9 +502,9 @@ public void testCompactionTaskDoesntCleanup() throws Exception // Compaction skips super.cleanUp but still runs killRecursively for intermediary deep-storage files. TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class); - final DataSegmentKiller killer = EasyMock.createNiceMock(DataSegmentKiller.class); - EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer).anyTimes(); - EasyMock.replay(toolbox, killer); + final IntermediaryDataManager intermediaryDataManager = EasyMock.createNiceMock(IntermediaryDataManager.class); + EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager).anyTimes(); + EasyMock.replay(toolbox, intermediaryDataManager); new ParallelIndexSupervisorTask( null, @@ -570,11 +569,11 @@ public void testCleanUpInvokesKillRecursivelyForIntermediates() throws Exception final String supervisorTaskId = "index_parallel_cleanup_supervisor_id"; TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class); - final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); - EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer); - killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId)); + final IntermediaryDataManager intermediaryDataManager = EasyMock.createStrictMock(IntermediaryDataManager.class); + EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager); + intermediaryDataManager.deletePartitions(supervisorTaskId); EasyMock.expectLastCall(); - EasyMock.replay(toolbox, killer); + EasyMock.replay(toolbox, intermediaryDataManager); new ParallelIndexSupervisorTask( supervisorTaskId, @@ -586,7 +585,7 @@ public void testCleanUpInvokesKillRecursivelyForIntermediates() throws Exception true ).cleanUp(toolbox, null); - EasyMock.verify(toolbox, killer); + EasyMock.verify(toolbox, intermediaryDataManager); } @Test @@ -597,14 +596,14 @@ public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws Exception final String supervisorTaskId = "index_parallel_ds_2024-01-01"; final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class); final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class); - final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + final IntermediaryDataManager intermediaryDataManager = EasyMock.createStrictMock(IntermediaryDataManager.class); - EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer); - killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId)); + EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager); + intermediaryDataManager.deletePartitions(supervisorTaskId); EasyMock.expectLastCall(); EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig); EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false); - EasyMock.replay(toolbox, taskConfig, killer); + EasyMock.replay(toolbox, taskConfig, intermediaryDataManager); new ParallelIndexSupervisorTask( supervisorTaskId, @@ -616,7 +615,7 @@ public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws Exception false ).cleanUp(toolbox, null); - EasyMock.verify(toolbox, taskConfig, killer); + EasyMock.verify(toolbox, taskConfig, intermediaryDataManager); } @Test @@ -627,14 +626,14 @@ public void testCleanUp_killRecursivelyFailureDoesNotAbortCleanUp() throws Excep final String supervisorTaskId = "index_parallel_deep_storage_cleanup_fail"; final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class); final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class); - final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + final IntermediaryDataManager intermediaryDataManager = EasyMock.createStrictMock(IntermediaryDataManager.class); - EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer); - killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId)); + EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager); + intermediaryDataManager.deletePartitions(supervisorTaskId); EasyMock.expectLastCall().andThrow(new IOException("deep storage cleanup failed")); EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig); EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false); - EasyMock.replay(toolbox, taskConfig, killer); + EasyMock.replay(toolbox, taskConfig, intermediaryDataManager); new ParallelIndexSupervisorTask( supervisorTaskId, @@ -646,7 +645,7 @@ public void testCleanUp_killRecursivelyFailureDoesNotAbortCleanUp() throws Excep false ).cleanUp(toolbox, null); - EasyMock.verify(toolbox, taskConfig, killer); + EasyMock.verify(toolbox, taskConfig, intermediaryDataManager); } private static ParallelIndexIngestionSpec buildParallelIngestionSpecForCleanUpTests() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java index 70e734f1b224..6179c8ef6d8f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java @@ -39,6 +39,7 @@ import org.apache.druid.rpc.indexing.NoopOverlordClient; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.loading.LoadSpec; +import org.apache.druid.segment.loading.LocalDataSegmentKiller; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; @@ -107,16 +108,18 @@ public void setup() throws IOException intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient); } else if (DEEPSTORE.equals(intermediateDataStore)) { localDeepStore = temporaryFolder.newFolder("localStorage"); + final LocalDataSegmentPusherConfig pusherConfig = new LocalDataSegmentPusherConfig() + { + @Override + public File getStorageDirectory() + { + return localDeepStore; + } + }; intermediaryDataManager = new DeepStorageIntermediaryDataManager( - new LocalDataSegmentPusher( - new LocalDataSegmentPusherConfig() - { - @Override - public File getStorageDirectory() - { - return localDeepStore; - } - })); + new LocalDataSegmentPusher(pusherConfig), + new LocalDataSegmentKiller(pusherConfig) + ); } intermediaryDataManager.start(); segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager); diff --git a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java index 29ae993d8e96..0088ba72e75b 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java @@ -94,16 +94,26 @@ default void killQuietly(DataSegment segment) } /** - * Like a nuke. Use wisely. Used by the 'reset-cluster' command, and of the built-in deep storage implementations, it - * is only implemented by local and HDFS. + * Deletes all files (segment or otherwise) under the configured deep storage root. + * + * @deprecated since this is a relatively risky operation and is currently not + * being used anywhere in the core Druid code. */ + @Deprecated void killAll() throws IOException; /** - * Recursively removes a directory (or object-store prefix) under the configured deep storage root. The path is - * relative to that root: no leading slash, no {@code ..} segments, no backslashes. If the path does not exist, this - * is a no-op. The default implementation does nothing; extensions that cannot recurse should keep the default. - * HDFS currently only implements this method. + * Recursively removes a directory (or object-store prefix) under the configured + * deep storage root. Currently used by the {@code IntermediaryDataManager} to + * clean up intermediary data at the end of an {@code ParallelIndexSupervisorTask}. + *

+ * The default implementation does nothing, extensions that cannot recurse + * should keep the default. {@code HdfsDataSegmentKiller} is the only variant + * that currently implements this method. + * + * @param relativePath Path to delete relative to the root and should not contain + * any leading slash, {@code ..} segments, or backslashes. + * If this path does not exist, the method is a no-op. * * @throws IOException if deletion fails */ diff --git a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java index 8f1d44c3fe38..16789b023930 100644 --- a/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java +++ b/server/src/main/java/org/apache/druid/guice/LocalDataStorageDruidModule.java @@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; +import com.google.inject.Provides; import com.google.inject.multibindings.MapBinder; +import com.google.inject.name.Named; import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.initialization.DruidModule; import org.apache.druid.segment.loading.DataSegmentKiller; @@ -36,12 +38,14 @@ import org.apache.druid.segment.loading.LocalLoadSpec; import java.util.List; +import java.util.Properties; /** */ public class LocalDataStorageDruidModule implements DruidModule { public static final String SCHEME = "local"; + public static final String STORAGE_TYPE_PROPERTY = "druid.storage.type"; @Override public void configure(Binder binder) @@ -50,19 +54,26 @@ public void configure(Binder binder) PolyBind.createChoice( binder, - "druid.storage.type", + STORAGE_TYPE_PROPERTY, Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class) ); PolyBind.createChoice( binder, - "druid.storage.type", + STORAGE_TYPE_PROPERTY, Key.get(DataSegmentKiller.class), Key.get(LocalDataSegmentKiller.class) ); } + @Provides + @Named(STORAGE_TYPE_PROPERTY) + public String getDeepStorageType(Properties properties) + { + return properties.getProperty(STORAGE_TYPE_PROPERTY, SCHEME); + } + private static void bindDeepStorageLocal(Binder binder) { MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class) diff --git a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java index f0afa5d19635..b20a4a8536b5 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java +++ b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java @@ -24,6 +24,9 @@ import com.google.common.base.Suppliers; import com.google.inject.Inject; import com.google.inject.Provider; +import com.google.inject.name.Named; +import org.apache.druid.guice.LocalDataStorageDruidModule; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.timeline.DataSegment; @@ -38,13 +41,16 @@ */ public class OmniDataSegmentKiller implements DataSegmentKiller { + private final String deepStorageType; private final Map> killers; @Inject public OmniDataSegmentKiller( + @Named(LocalDataStorageDruidModule.STORAGE_TYPE_PROPERTY) String deepStorageType, Map> killers ) { + this.deepStorageType = deepStorageType; this.killers = new HashMap<>(); for (Map.Entry> entry : killers.entrySet()) { String type = entry.getKey(); @@ -97,27 +103,26 @@ private DataSegmentKiller getKiller(DataSegment segment) throws SegmentLoadingEx @Override public void killAll() { + // Do not invoke killAll() for cloud-specific DataSegmentKiller implementations + // as this is a potentially dangerous operation throw new UnsupportedOperationException("not implemented"); } @Override public void killRecursively(String relativePath) throws IOException { - IOException firstFailure = null; - for (Supplier supplier : killers.values()) { - try { - supplier.get().killRecursively(relativePath); - } - catch (IOException e) { - if (firstFailure == null) { - firstFailure = e; - } else { - firstFailure.addSuppressed(e); - } - } - } - if (firstFailure != null) { - throw firstFailure; + // Delegate kill to the currently bound deep storage implementation only. + // It is unnecessary and also potentially unsafe to kill all files under the + // same folder name on inactive deep storage bindings provided by other extensions. + final Supplier killer = killers.get(deepStorageType); + + if (killer == null) { + throw new ISE( + "Unknown segment killer type[%s]. Known types are %s", + deepStorageType, killers.keySet() + ); + } else { + killer.get().killRecursively(relativePath); } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java index 97e9f4025544..816e5f80bb53 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java @@ -28,6 +28,7 @@ import org.apache.druid.guice.Binders; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LocalDataStorageDruidModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.TombstoneShardSpec; @@ -94,6 +95,7 @@ private static Injector createInjector(@Nullable DataSegmentKiller killer) { return GuiceInjectors.makeStartupInjectorWithModules( ImmutableList.of( + (Module) new LocalDataStorageDruidModule(), binder -> { MapBinder mapBinder = Binders.dataSegmentKillerBinder(binder); if (killer != null) { @@ -111,6 +113,7 @@ private static Injector createInjector(@Nullable DataSegmentKiller killer) private static Injector createInjectorFromMap(@NotNull Map killerMap) { ImmutableList.Builder moduleListBuilder = ImmutableList.builder(); + moduleListBuilder.add(new LocalDataStorageDruidModule()); for (Map.Entry typeToKiller : killerMap.entrySet()) { moduleListBuilder.add(binder -> { MapBinder mapBinder = Binders.dataSegmentKillerBinder(binder); @@ -141,7 +144,7 @@ public void testKillTombstone() throws Exception } @Test - public void testKillRecursively_delegatesToAllKillers() throws IOException + public void testKillRecursively_delegatesToOnlyBoundKiller() throws IOException { final DataSegmentKiller killerA = Mockito.mock(DataSegmentKiller.class); final DataSegmentKiller killerB = Mockito.mock(DataSegmentKiller.class); @@ -153,8 +156,8 @@ public void testKillRecursively_delegatesToAllKillers() throws IOException final String relativePath = "intermediate/batch_1"; segmentKiller.killRecursively(relativePath); - Mockito.verify(killerA).killRecursively(relativePath); - Mockito.verify(killerB).killRecursively(relativePath); + Mockito.verify(killerA, Mockito.never()).killRecursively(relativePath); + Mockito.verify(killerB, Mockito.never()).killRecursively(relativePath); } @Test