Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void configure(Binder binder)
.addBinding(SCHEME)
.to(OssTimestampVersionedDataFinder.class)
.in(LazySingleton.class);

// Segment killer, mover and archiver use type "oss_zip" (instead of "oss") to align with OssLoadSpec.
// This means OssDataSegmentKiller cannot currently be injected as DataSegmentKiller
// and would only be invoked via OmniDataSegmentKiller.
// If needed in the future, the OssDataSegmentKiller would need to be bound to the "oss" scheme as well.
Binders.dataSegmentKillerBinder(binder)
.addBinding(SCHEME_ZIP)
.to(OssDataSegmentKiller.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.inject.Injector;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
Expand Down Expand Up @@ -78,6 +79,7 @@ private static Injector createInjector()
new DruidGuiceExtensions(),
new JacksonModule(),
new ConfigModule(),
new LocalDataStorageDruidModule(),
new OssStorageDruidModule(),
binder -> {
final Properties properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -288,6 +289,7 @@ private Injector makeInjectorWithProperties(final Properties props)
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
},
new LocalDataStorageDruidModule(),
new AzureStorageDruidModule()
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.apache.druid.storage.google;

import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;

public class GoogleStorageDruidModuleTest
{
@Test
Expand All @@ -37,7 +39,9 @@ public void testSegmentKillerBoundedSingleton()
// HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded
// lazily, the exception should end up thrown.
// 2. That the same object is returned.
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule()));
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
List.of(new LocalDataStorageDruidModule(), new GoogleStorageDruidModule())
);
OmniDataSegmentKiller killer = injector.getInstance(OmniDataSegmentKiller.class);
Assert.assertTrue(killer.getKillers().containsKey(GoogleStorageDruidModule.SCHEME));
Assert.assertSame(
Expand All @@ -57,7 +61,7 @@ public void testLazyInstantiation()
// HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded
// lazily, the exception should end up thrown.
// 2. That the same object is returned.
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule()));
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(List.of(new GoogleStorageDruidModule()));
final GoogleStorage instance = injector.getInstance(GoogleStorage.class);
Assert.assertSame(instance, injector.getInstance(GoogleStorage.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -89,6 +90,7 @@ private Injector makeInjectorWithProperties(final Properties props)
binder.bind(Properties.class).toInstance(props);
binder.bind(ServiceEmitter.class).toInstance(new ServiceEmitter("test", "localhost", new NoopEmitter()));
},
new LocalDataStorageDruidModule(),
new HdfsStorageDruidModule()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public void configure(Binder binder)
.addBinding(SCHEME_S3N)
.to(S3TimestampVersionedDataFinder.class)
.in(LazySingleton.class);

// Segment killer, mover and archiver use type "s3_zip" (instead of "s3") to align with S3LoadSpec.
// This means S3DataSegmentKiller cannot currently be injected as DataSegmentKiller
// and would only be invoked via OmniDataSegmentKiller.
// If needed in the future, the S3DataSegmentKiller would need to be bound to the "s3" scheme as well.
Binders.dataSegmentKillerBinder(binder)
.addBinding(SCHEME_S3_ZIP)
.to(S3DataSegmentKiller.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.inject.Injector;
import org.apache.druid.common.aws.AWSModule;
import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.guice.ServerModule;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
Expand Down Expand Up @@ -74,6 +75,7 @@ private static Injector createInjector()
final Injector startupInjector = new StartupInjectorBuilder().forServer().build();
return Guice.createInjector(
startupInjector.getInstance(DruidSecondaryModule.class),
new LocalDataStorageDruidModule(),
new AWSModule(),
new S3StorageDruidModule(),
new ServerModule()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -1837,13 +1836,11 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
{
try {
toolbox.getDataSegmentKiller().killRecursively(
DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(getId())
);
toolbox.getIntermediaryDataManager().deletePartitions(getId());
}
catch (Exception e) {
// Best effort cleanup, do not fail the task if cleanup fails
LOG.warn(e, "Failed recursive deep storage cleanup for intermediary path for task[%s]", getId());
LOG.warn(e, "Failed cleanup of intermediary data for task[%s]", getId());
}

if (!isCompactionTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
Expand All @@ -38,6 +40,7 @@ public class DeepStorageIntermediaryDataManager implements IntermediaryDataManag
{
public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
private final DataSegmentPusher dataSegmentPusher;
private final DataSegmentKiller dataSegmentKiller;

/**
* Deep storage path to the directory that holds all shuffle intermediate files for {@code supervisorTaskId},
Expand All @@ -49,10 +52,22 @@ public static String retrieveShuffleDataStoragePath(String supervisorTaskId)
return SHUFFLE_DATA_DIR_PREFIX + "/" + supervisorTaskId;
}

/**
* Used by Guice to create an instance of {@link DeepStorageIntermediaryDataManager}.
*
* @param dataSegmentPusher Always non-null
* @param dataSegmentKiller Can be null in certain cases such as on MiddleManagers
* when using druid.storage.type=s3 since the respective
* S3DataSegmentKiller uses scheme "s3_zip" instead of "s3"
*/
@Inject
public DeepStorageIntermediaryDataManager(DataSegmentPusher dataSegmentPusher)
public DeepStorageIntermediaryDataManager(
DataSegmentPusher dataSegmentPusher,
@Nullable DataSegmentKiller dataSegmentKiller
)
{
this.dataSegmentPusher = dataSegmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
}

@Override
Expand Down Expand Up @@ -105,22 +120,13 @@ public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String su
throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec");
}

/**
* Not implemented for deep storage mode. Unlike {@link LocalIntermediaryDataManager},
* which can walk the local filesystem to find and delete files by supervisorTaskId,
* this manager has no way to discover what files were pushed: it has no
* {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not track pushed
* paths, and runs on short-lived peon processes whose state is lost on exit.
* <p>
* Deep storage shuffle cleanup is handled in {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanUp}
* via {@link org.apache.druid.segment.loading.DataSegmentKiller#killRecursively} on
* {@link #retrieveShuffleDataStoragePath(String)} (recursive delete of that directory).
*/
@Override
public void deletePartitions(String supervisorTaskId)
public void deletePartitions(String supervisorTaskId) throws IOException
{
throw new UnsupportedOperationException(
"Deep storage shuffle cleanup is handled by ParallelIndexSupervisorTask, not by the data manager"
);
if (dataSegmentKiller == null) {
throw new ISE("No instance was bound for the DataSegmentKiller");
} else {
dataSegmentKiller.killRecursively(retrieveShuffleDataStoragePath(supervisorTaskId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ public interface IntermediaryDataManager
Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId);

/**
* Delete the partitions
* Deletes all intermediary partitions for the given supervisor task.
*
* @param supervisorTaskId - Supervisor task id of the partitions to delete
*
*/
void deletePartitions(String supervisorTaskId) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public Response getPartition(
}
}

/**
* Not directly invoked by any Druid service.
* {@code IntermediaryDataManager.deletePartitions()} is invoked explicitly by
* the {@code ParallelIndexSupervisorTask.cleanup()}.
*/
@DELETE
@Path("/task/{supervisorTaskId}")
public Response deletePartitions(@PathParam("supervisorTaskId") String supervisorTaskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.rpc.HttpResponseException;
Expand All @@ -47,7 +47,6 @@
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec;
Expand Down Expand Up @@ -503,9 +502,9 @@ public void testCompactionTaskDoesntCleanup() throws Exception

// Compaction skips super.cleanUp but still runs killRecursively for intermediary deep-storage files.
TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
final DataSegmentKiller killer = EasyMock.createNiceMock(DataSegmentKiller.class);
EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer).anyTimes();
EasyMock.replay(toolbox, killer);
final IntermediaryDataManager intermediaryDataManager = EasyMock.createNiceMock(IntermediaryDataManager.class);
EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager).anyTimes();
EasyMock.replay(toolbox, intermediaryDataManager);

new ParallelIndexSupervisorTask(
null,
Expand Down Expand Up @@ -570,11 +569,11 @@ public void testCleanUpInvokesKillRecursivelyForIntermediates() throws Exception

final String supervisorTaskId = "index_parallel_cleanup_supervisor_id";
TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);
EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
final IntermediaryDataManager intermediaryDataManager = EasyMock.createStrictMock(IntermediaryDataManager.class);
EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager);
intermediaryDataManager.deletePartitions(supervisorTaskId);
EasyMock.expectLastCall();
EasyMock.replay(toolbox, killer);
EasyMock.replay(toolbox, intermediaryDataManager);

new ParallelIndexSupervisorTask(
supervisorTaskId,
Expand All @@ -586,7 +585,7 @@ public void testCleanUpInvokesKillRecursivelyForIntermediates() throws Exception
true
).cleanUp(toolbox, null);

EasyMock.verify(toolbox, killer);
EasyMock.verify(toolbox, intermediaryDataManager);
}

@Test
Expand All @@ -597,14 +596,14 @@ public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws Exception
final String supervisorTaskId = "index_parallel_ds_2024-01-01";
final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);
final IntermediaryDataManager intermediaryDataManager = EasyMock.createStrictMock(IntermediaryDataManager.class);

EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager);
intermediaryDataManager.deletePartitions(supervisorTaskId);
EasyMock.expectLastCall();
EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
EasyMock.replay(toolbox, taskConfig, killer);
EasyMock.replay(toolbox, taskConfig, intermediaryDataManager);

new ParallelIndexSupervisorTask(
supervisorTaskId,
Expand All @@ -616,7 +615,7 @@ public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws Exception
false
).cleanUp(toolbox, null);

EasyMock.verify(toolbox, taskConfig, killer);
EasyMock.verify(toolbox, taskConfig, intermediaryDataManager);
}

@Test
Expand All @@ -627,14 +626,14 @@ public void testCleanUp_killRecursivelyFailureDoesNotAbortCleanUp() throws Excep
final String supervisorTaskId = "index_parallel_deep_storage_cleanup_fail";
final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);
final IntermediaryDataManager intermediaryDataManager = EasyMock.createStrictMock(IntermediaryDataManager.class);

EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
EasyMock.expect(toolbox.getIntermediaryDataManager()).andReturn(intermediaryDataManager);
intermediaryDataManager.deletePartitions(supervisorTaskId);
EasyMock.expectLastCall().andThrow(new IOException("deep storage cleanup failed"));
EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
EasyMock.replay(toolbox, taskConfig, killer);
EasyMock.replay(toolbox, taskConfig, intermediaryDataManager);

new ParallelIndexSupervisorTask(
supervisorTaskId,
Expand All @@ -646,7 +645,7 @@ public void testCleanUp_killRecursivelyFailureDoesNotAbortCleanUp() throws Excep
false
).cleanUp(toolbox, null);

EasyMock.verify(toolbox, taskConfig, killer);
EasyMock.verify(toolbox, taskConfig, intermediaryDataManager);
}

private static ParallelIndexIngestionSpec buildParallelIngestionSpecForCleanUpTests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
Expand Down Expand Up @@ -107,16 +108,18 @@ public void setup() throws IOException
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient);
} else if (DEEPSTORE.equals(intermediateDataStore)) {
localDeepStore = temporaryFolder.newFolder("localStorage");
final LocalDataSegmentPusherConfig pusherConfig = new LocalDataSegmentPusherConfig()
{
@Override
public File getStorageDirectory()
{
return localDeepStore;
}
};
intermediaryDataManager = new DeepStorageIntermediaryDataManager(
new LocalDataSegmentPusher(
new LocalDataSegmentPusherConfig()
{
@Override
public File getStorageDirectory()
{
return localDeepStore;
}
}));
new LocalDataSegmentPusher(pusherConfig),
new LocalDataSegmentKiller(pusherConfig)
);
}
intermediaryDataManager.start();
segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager);
Expand Down
Loading
Loading