diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/Comparators.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/Comparators.java index 618698c4ac46..04e4f7f0fc05 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/Comparators.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/Comparators.java @@ -119,6 +119,34 @@ public int compare(Interval lhs, Interval rhs) } }; + private static final Comparator INTERVAL_BY_START = new Comparator<>() + { + private final DateTimeComparator dateTimeComp = DateTimeComparator.getInstance(); + + @Override + public int compare(Interval lhs, Interval rhs) + { + if (lhs.getChronology().equals(rhs.getChronology())) { + return Long.compare(lhs.getStartMillis(), rhs.getStartMillis()); + } + return dateTimeComp.compare(lhs.getStart(), rhs.getStart()); + } + }; + + private static final Comparator INTERVAL_BY_END = new Comparator<>() + { + private final DateTimeComparator dateTimeComp = DateTimeComparator.getInstance(); + + @Override + public int compare(Interval lhs, Interval rhs) + { + if (lhs.getChronology().equals(rhs.getChronology())) { + return Long.compare(lhs.getEndMillis(), rhs.getEndMillis()); + } + return dateTimeComp.compare(lhs.getEnd(), rhs.getEnd()); + } + }; + @Deprecated public static Comparator intervals() { @@ -135,4 +163,15 @@ public static Comparator intervalsByEndThenStart() return INTERVAL_BY_END_THEN_START; } + public static Comparator intervalsByStart() + { + return INTERVAL_BY_START; + } + + public static Comparator intervalsByEnd() + { + return INTERVAL_BY_END; + } + + } diff --git a/processing/src/main/java/org/apache/druid/timeline/IntervalTree.java b/processing/src/main/java/org/apache/druid/timeline/IntervalTree.java new file mode 100644 index 000000000000..98fc9e6cb858 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/timeline/IntervalTree.java @@ -0,0 +1,855 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.google.common.base.Predicate; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; +import org.joda.time.Interval; + +import java.util.AbstractMap; +import java.util.AbstractSet; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.function.BiConsumer; + +/** + * A variation of Interval Trees (https://en.wikipedia.org/wiki/Interval_tree) + * Custom optimizations for faster interval search and additional support for specific joda Interval comparator + * arithmetic used in the project + *

+ *

+ * Multiple different intervals can be added to the tree. It can then be searched to find all intervals matching a given + * interval. The user specifies the match condition, such as encompassing the given interval, overlapping, etc. The + * search can return multiple results as multiple intervals in the tree could match the criteria. + *

+ * Using the tree, reduces the search time from O(N) iterating through all the intervals, to roughly O(log2(N)). + * Furthermore, a value can be associated with each interval, which is also returned in the search result. + * + *

+ * The tree is a binary search tree sorted by interval start time. The intervals are stored as nodes in the tree. + * Additional state containing the minimum and maximum interval bounds of the entire subtree under a node is also + * stored in each node. This helps speed up the search for matching intervals by skipping unsuitable subtrees that will + * not contain a matching candidate interval. + *

+ * To optimize the balancing cost w.r.t the operation time, the tree is not balanced on every modification operation. + * Rather, a configurable imbalance tolerance from the theoretical ideal height of log2(N) is allowed, breaching which + * triggers the rebalance. + *

+ * Not thread safe. + *

+ */ +public class IntervalTree extends AbstractMap implements NavigableMap +{ + // The compartor for comparing the interval start timnes + private final Comparator startComparator; + // The comparator for comparing interval end times + private final Comparator endComparator; + + @VisibleForTesting + private Node root; + private int size; + + // Deviation allowed from ideal height for the maximum height on either side of the tree, expressed as a + // percentage of ideal height + private int imbalanceTolerance = 50; + + private final EntrySet entrySet = new EntrySet(); + + public IntervalTree(Comparator startComparator, Comparator endComparator) + { + this.startComparator = startComparator; + this.endComparator = endComparator; + } + + public int getImbalanceTolerance() + { + return imbalanceTolerance; + } + + public void setImbalanceTolerance(int imbalanceTolerance) + { + this.imbalanceTolerance = imbalanceTolerance; + } + + static class Node implements Map.Entry + { + Interval interval; + T value; + int height; + // The full interval range of the subtree formed by this Node + Interval range; + Node parent; + Node left; + Node right; + + private static final String PRINT_FORMAT = "{\n" + + "%sinterval = %s\n" + + "%svalue = %s\n" + + "%sheight = %d\n" + + "%srange = %s\n" + + "%sleft = %s\n" + + "%sright = %s\n" + + "%s}"; + + private String print(int level) + { + String prefix = "\t".repeat(level); + String eprefix = "\t".repeat(level - 1); + return String.format(Locale.ENGLISH, PRINT_FORMAT, + prefix, interval, prefix, value, prefix, height, + prefix, range, + prefix, (left != null) ? left.print(level + 1) : null, + prefix, (right != null) ? right.print(level + 1) : null, + eprefix + ); + } + + @Override + public Interval getKey() + { + return interval; + } + + @Override + public T getValue() + { + return value; + } + + @Override + public T setValue(T value) + { + T oldValue = this.value; + this.value = value; + return oldValue; + } + } + + @Override + public T put(Interval interval, T value) + { + //root = insert(root, interval, value); + T oldValue = insert(null, false, interval, value); + checkRebalance(); + return oldValue; + } + + private T insert(Node parent, boolean left, Interval interval, T value) + { + // Passing parent so that when a new node is created, it can be added to parent, and we can still use return value + // for another purpose, namely returning the old value if the key already exists in the tree + Node node; + if (parent == null) { + node = root; + } else if (left) { + node = parent.left; + } else { + node = parent.right; + } + + if (node == null) { + node = new Node<>(); + node.interval = interval; + node.value = value; + node.height = 0; + node.range = interval; + if (root == null) { + root = node; + } else if (left) { + setLeftNode(parent, node); + } else { + setRightNode(parent, node); + } + ++size; + return null; + } + + T oldValue; + + int cmp = compareInterval(interval, node.interval); + + // If exact interval already exists, just replace the value and return + if (cmp == 0) { + oldValue = node.value; + node.value = value; + return oldValue; + } + + if (cmp < 0) { + // Go to the left + oldValue = insert(node, true, interval, value); + } else { + // Go to the right + oldValue = insert(node, false, interval, value); + } + recomputeState(node); + + //return node; + return oldValue; + } + + @Override + public T get(Object key) + { + if (!Interval.class.isAssignableFrom(key.getClass())) { + throw new ClassCastException("key must be an instance of Interval"); + } + Interval interval = (Interval) key; + + T value = null; + Node node = root; + while (node != null) { + int cmp = compareInterval(node.getKey(), interval); + if (cmp == 0) { + value = node.value; + break; + } else if (cmp > 0) { + node = node.left; + } else { + node = node.right; + } + } + return value; + } + + private int compareInterval(Interval interval1, Interval interval2) + { + int cmp = startComparator.compare(interval1, interval2); + if (cmp == 0) { + return endComparator.compare(interval1, interval2); + } + return cmp; + } + + public Map findEncompassing(Interval interval) + { + return findMatching(i -> i.contains(interval)); + } + + public Map findOverlapping(Interval interval) + { + return findMatching(i -> i.overlaps(interval)); + } + + /** + * Get all entries matching a given condition + * @param condition The match condition + * + * This condition should not only return true when a node matches the condition but also when a child node range + * matches. It is a convenience method for {@link #forEachMatching(Predicate, Predicate, BiConsumer)} and see the + * method's documentation for more information. It calls the method with rangeCondition set to be same as condition. + */ + public Map findMatching(Predicate condition) + { + Map result = new HashMap<>(); + forEachMatching(condition, result::put); + return result; + } + + /** + * Find entries matching a given condition by doing a full traversal. + * @param condition The match condition + * + * The method traverses through all the nodes of the tree looking for matches. + */ + public Map findMatchingFullTraversal(Predicate condition) + { + Map result = new HashMap<>(); + forEachMatchingFullTraversal(condition, result::put); + return result; + } + + /** + * Perform on action for matching nodes. + * @param condition The match condition + * @param action The action + * + * This condition should not only return true when a node matches the condition but also when the child node range + * matches. It is a convenience method for {@link #forEachMatching(Predicate, Predicate, BiConsumer)} and see the + * method's documentation for more information. It calls the method with rangeCondition set to be same as condition. + */ + public void forEachMatching(Predicate condition, BiConsumer action) + { + forEachMatching(condition, condition, action); + } + + /** + * Perform on action for matching nodes by doing a full traversal. + * @param condition The match condition + * @param action The action + * + * The method traverses through all the nodes of the tree looking for matches. + */ + public void forEachMatchingFullTraversal(Predicate condition, BiConsumer action) + { + forEachMatching(condition, null, action); + } + + /** + * Perform an action for matching nodes + * @param condition The condition to match for the node + * @param rangeCondition The condition to check a child node for, to determine whether to traverse the subtree + * @param action The action to perform + * + * The rangeCondition is applied on the interval range of the child node and only if the condition returns true is the + * child subtree traversed. Interval range is the min start time to max end time for all the nodes in the child + * subtree. This is a lookup speedup optimization. If rangeCondition is null, the check is skipped and all the + * children are traversed to find matches. + * + * In some cases such as finding nodes overlapping the given interval or encompassing the given interval, the same + * predicate can be used for condition and rangeCondition. In other situations a full traversal maybe needed and a + * null can be passed in for rangeCondition. There are helper methods for these. + */ + public void forEachMatching(Predicate condition, Predicate rangeCondition, BiConsumer action) + { + forEachMatching(root, condition, rangeCondition, action); + } + + private void forEachMatching(Node node, Predicate condition, Predicate rangeCondition, BiConsumer action) + { + + if (node == null) { + return; + } + + // Process in-order + + // Search left + if ((node.left != null) && ((rangeCondition == null) || rangeCondition.apply(node.left.range))) { + forEachMatching(node.left, condition, rangeCondition, action); + } + + if (condition.apply(node.interval)) { + action.accept(node.interval, node.value); + } + + // Search right + if (node.right != null && ((rangeCondition == null) || rangeCondition.apply(node.right.range))) { + forEachMatching(node.right, condition, rangeCondition, action); + } + } + + @Override + public T remove(Object key) + { + return remove((Interval) key); + } + + public T remove(Interval interval) + { + List oldValue = new ArrayList<>(1); + root = removeNode(root, interval, oldValue); + if (root != null) { + root.parent = null; + } + checkRebalance(); + return oldValue.size() == 1 ? oldValue.get(0) : null; + } + + private Node removeNode(Node node, Interval interval, List oldValue) + { + // When deleting a node, try to replace it with the right most leaf of the left sub-tree. + // If it is does not exist, i.e., the bottom most right node in the left subtree only has a left child and does not + // have a right child, this node becomes the replacement. Also, in this scenario, the left child (subtree) of this + // node is moved up to its parent as the parent's right child. + if (node == null) { + return null; + } + + int cmp = compareInterval(interval, node.interval); + + if (cmp == 0) { + // This is the node to delete + --size; + oldValue.add(node.value); + if ((node.left != null) && (node.right != null)) { + // Make the right bottom most child in the left subtree of the node, the new node at the current level + Node left = node.left; + Node newNode = unlinkRightLeaf(left); + // Make the current left and right children, the left and right children of the new node respectively. + // However, if the new node turns out to be the same as the left node, it means the left node did not have any + // right child. In this case, only set its right child to be the current node's right child. + if (left != newNode) { + // A right child exists + setLeftNode(newNode, left); + } + setRightNode(newNode, node.right); + recomputeState(newNode); + return newNode; + } else if (node.left != null) { + // Right node is null, make the left node the new node at current level + return node.left; + } else if (node.right != null) { + // Left node is null, make the right node the new node at current level + return node.right; + } + return null; + } + + // Current node didn't match, search children + if (cmp < 0) { + Node left = removeNode(node.left, interval, oldValue); + setLeftNode(node, left); + } else { + Node right = removeNode(node.right, interval, oldValue); + setRightNode(node, right); + } + + // Update our state as a modification may have happened somewhere in our subtree + recomputeState(node); + + return node; + } + + private Node unlinkRightLeaf(Node node) + { + if (node.right == null) { + return node; + } else { + Node rnode = unlinkRightLeaf(node.right); + // If the right node has a left child, make it new right child + if (rnode == node.right) { + setRightNode(node, rnode.left); + rnode.left = null; + } + recomputeState(node); + return rnode; + } + } + + private void inOrderTraverse(Node node, List> nodes) + { + if (node == null) { + return; + } + inOrderTraverse(node.left, nodes); + nodes.add(node); + inOrderTraverse(node.right, nodes); + } + + public void rebalance() + { + // In order traversal followed by repeated binary segmentation of the list + List> nodes = new ArrayList<>(size); + inOrderTraverse(root, nodes); + root = constructTree(nodes, 0, nodes.size()); + root.parent = null; + } + + private Node constructTree(List> nodes, int start, int end) + { + if (start == end) { + return null; + } + int mid = (start + end - 1) / 2; + Node node = nodes.get(mid); + + Node left = constructTree(nodes, start, mid); + setLeftNode(node, left); + + Node right = constructTree(nodes, mid + 1, end); + setRightNode(node, right); + + recomputeState(node); + return node; + } + + @Override + public Map.Entry lowerEntry(Interval key) + { + Node lnode = null; + Node node = root; + while (node != null) { + // Since we want to return a smaller entry even when there is an exact match, go left in the equality case too + if (compareInterval(key, node.getKey()) <= 0) { + node = node.left; + } else { + lnode = node; + node = node.right; + } + } + return lnode; + } + + @Override + public Interval lowerKey(Interval key) + { + Map.Entry entry = lowerEntry(key); + return entry != null ? entry.getKey() : null; + } + + @Override + public Map.Entry floorEntry(Interval key) + { + Node fnode = null; + Node node = root; + while (node != null) { + int cmp = compareInterval(node.getKey(), key); + if (cmp == 0) { + fnode = node; + break; + } else if (cmp > 0) { + node = node.left; + } else { + fnode = node; + node = node.right; + } + } + return fnode; + } + + @Override + public Interval floorKey(Interval key) + { + Map.Entry entry = floorEntry(key); + return entry != null ? entry.getKey() : null; + } + + @Override + public Map.Entry ceilingEntry(Interval key) + { + Node cnode = null; + Node node = root; + while (node != null) { + int cmp = compareInterval(node.getKey(), key); + if (cmp == 0) { + cnode = node; + break; + } else if (cmp > 0) { + cnode = node; + node = node.left; + } else { + node = node.right; + } + } + return cnode; + } + + @Override + public Interval ceilingKey(Interval key) + { + Entry entry = ceilingEntry(key); + return entry != null ? entry.getKey() : null; + } + + @Override + public Map.Entry higherEntry(Interval key) + { + Node hnode = null; + Node node = root; + while (node != null) { + if (compareInterval(key, node.getKey()) < 0) { + hnode = node; + node = node.left; + } else { + node = node.right; + } + } + return hnode; + } + + @Override + public Interval higherKey(Interval key) + { + Entry entry = higherEntry(key); + return entry != null ? entry.getKey() : null; + } + + @Override + public Map.Entry firstEntry() + { + return firstEntry(root); + } + + @Override + public Map.Entry lastEntry() + { + if (root == null) { + return null; + } + Node node = root; + while (node.right != null) { + node = node.right; + } + return node; + } + + @Override + public Interval firstKey() + { + Map.Entry entry = firstEntry(); + return entry != null ? entry.getKey() : null; + } + + @Override + public Interval lastKey() + { + Map.Entry entry = lastEntry(); + return entry != null ? entry.getKey() : null; + } + + @Override + public Map.Entry pollFirstEntry() + { + throw new UnsupportedOperationException(); + } + + @Override + public Map.Entry pollLastEntry() + { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableMap descendingMap() + { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet navigableKeySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet descendingKeySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableMap subMap(Interval fromKey, boolean fromInclusive, Interval toKey, boolean toInclusive) + { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableMap headMap(Interval toKey, boolean inclusive) + { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableMap tailMap(Interval fromKey, boolean inclusive) + { + throw new UnsupportedOperationException(); + } + + @Override + public Comparator comparator() + { + throw new UnsupportedOperationException(); + } + + @Override + public SortedMap subMap(Interval fromKey, Interval toKey) + { + throw new UnsupportedOperationException(); + } + + @Override + public SortedMap headMap(Interval toKey) + { + throw new UnsupportedOperationException(); + } + + @Override + public SortedMap tailMap(Interval fromKey) + { + throw new UnsupportedOperationException(); + } + + private void recomputeState(Node node) + { + int lheight = (node.left != null) ? node.left.height : -1; + int rheight = (node.right != null) ? node.right.height : -1; + node.height = Math.max(lheight, rheight) + 1; + node.range = computeRange(node.interval, node.left, node.right); + } + + @Override + public void clear() + { + root = null; + size = 0; + } + + @Override + public @NotNull Set> entrySet() + { + return entrySet; + } + + @Override + public int size() + { + return size; + } + + @VisibleForTesting + // returns the number of edges from root to leaf along the longest path + int height() + { + return (root != null) ? root.height : -1; + } + + class EntrySet extends AbstractSet> + { + + // Currently this returns a distinct collection when iterating + @Override + public Iterator> iterator() + { + return new EntrySetIterator(); + } + + @Override + public int size() + { + return IntervalTree.this.size; + } + + class EntrySetIterator implements Iterator> + { + + Node current = firstEntry(IntervalTree.this.root); + + @Override + public boolean hasNext() + { + return (current != null); + } + + @Override + public Entry next() + { + Entry entry = current; + if (entry == null) { + return entry; + } + // Move current to next node + if (current.right != null) { + current = firstEntry(current.right); + } else { + // No more right children, go up one level to the parent. + // However, if the current node is right child of parent, keep going up till you find a parent who is on the + // right side + Node prev; + do { + prev = current; + current = current.parent; + } while ((current != null) && (current.right == prev)); + } + return entry; + } + } + + } + + /** + * Perform a tree rebalance if the imbalance between the left and right sides of the tree has increased beyond a + * tolerated limit, as opposed to rebalancing all the time. This is to done to strike a balance between performance + * degradation arising from an imbalance tree and the processing overheard of rebalancing each time the contents of + * the tree changes. + * + * The limit is defined using a configurable tolerance percentage in excess of an ideal balanced tree height for the + * number of entries in the tree. + */ + private void checkRebalance() + { + if (root != null) { + int ideal = (int) Math.floor(Math.log10(size + 1) / Math.log10(2)); + double tolerance = ideal * imbalanceTolerance / 100.0; + int threshold = ideal + (int) tolerance; + if (root.height > threshold) { + rebalance(); + } + } + } + + private Node firstEntry(Node node) + { + if (node == null) { + return null; + } + while (node.left != null) { + node = node.left; + } + return node; + } + + private void setLeftNode(Node node, Node left) + { + if (node.left != left) { + node.left = left; + if (left != null) { + left.parent = node; + } + } + } + + private void setRightNode(Node node, Node right) + { + if (node.right != right) { + node.right = right; + if (right != null) { + right.parent = node; + } + } + } + + @VisibleForTesting + public String print() + { + return (root != null) ? root.print(1) : null; + } + + @SafeVarargs + private Interval computeRange(Interval interval, Node... nodes) + { + // Find the intervals that have the minimum start and the maximum end + Interval min = interval; + Interval max = interval; + for (Node node : nodes) { + if (node != null) { + if (startComparator.compare(node.range, min) < 0) { + min = node.range; + } + if (endComparator.compare(node.range, max) > 0) { + max = node.range; + } + } + } + // Return an interval with the min and max + return interval.withStart(min.getStart()).withEnd(max.getEnd()); + } + +} diff --git a/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index a2f2699c41c9..c024b1a96940 100644 --- a/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.utils.CollectionUtils; @@ -74,21 +75,20 @@ public class VersionedIntervalTimeline> implements TimelineLookup { + private static final Logger logger = new Logger(VersionedIntervalTimeline.class); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // Below timelines stores only *visible* timelineEntries // adjusted interval -> timelineEntry - private final NavigableMap completePartitionsTimeline = new TreeMap<>( - Comparators.intervalsByStartThenEnd() - ); + private final NavigableMap completePartitionsTimeline; // IncompletePartitionsTimeline also includes completePartitionsTimeline // adjusted interval -> timelineEntry @VisibleForTesting - final NavigableMap incompletePartitionsTimeline = new TreeMap<>( - Comparators.intervalsByStartThenEnd() - ); + final NavigableMap incompletePartitionsTimeline; // true interval -> version -> timelineEntry private final Map> allTimelineEntries = new HashMap<>(); + // Only instantiated and used when fastIntervalSearch is enabled + private IntervalTree> allTimeIntervals; private final AtomicInteger numObjects = new AtomicInteger(); private final Comparator versionComparator; @@ -96,15 +96,38 @@ public class VersionedIntervalTimeline versionComparator) { this(versionComparator, false); } public VersionedIntervalTimeline(Comparator versionComparator, boolean skipObjectsWithNoData) + { + this(versionComparator, skipObjectsWithNoData, false); + } + + /** + * Constructor + * @param versionComparator The version comparator + * @param skipObjectsWithNoData Skip tombstones during lookup + * @param fastIntervalSearch Use the faster segment retrieval index based on interval tree + */ + public VersionedIntervalTimeline(Comparator versionComparator, boolean skipObjectsWithNoData, boolean fastIntervalSearch) { this.versionComparator = versionComparator; this.skipObjectsWithNoData = skipObjectsWithNoData; + this.fastIntervalSearch = fastIntervalSearch; + if (fastIntervalSearch) { + allTimeIntervals = new IntervalTree<>(Comparators.intervalsByStart(), Comparators.intervalsByEnd()); + this.completePartitionsTimeline = new IntervalTree<>(Comparators.intervalsByStart(), Comparators.intervalsByEnd()); + this.incompletePartitionsTimeline = new IntervalTree<>(Comparators.intervalsByStart(), Comparators.intervalsByEnd()); + } else { + this.completePartitionsTimeline = new TreeMap<>(Comparators.intervalsByStartThenEnd()); + this.incompletePartitionsTimeline = new TreeMap<>(Comparators.intervalsByStartThenEnd()); + } } public static > Iterable getAllObjects( @@ -210,6 +233,9 @@ public void addAll(final Iterator> TreeMap versionEntry = new TreeMap<>(versionComparator); versionEntry.put(version, entry); allTimelineEntries.put(interval, versionEntry); + if (fastIntervalSearch) { + allTimeIntervals.put(interval, versionEntry); + } numObjects.incrementAndGet(); } else { entry = exists.get(version); @@ -269,6 +295,9 @@ public PartitionChunk remove(Interval interval, VersionType version, versionEntries.remove(version); if (versionEntries.isEmpty()) { allTimelineEntries.remove(interval); + if (fastIntervalSearch) { + allTimeIntervals.remove(interval); + } } remove(incompletePartitionsTimeline, interval, entry, true); @@ -289,13 +318,40 @@ public PartitionChunk findChunk(Interval interval, VersionType versi { lock.readLock().lock(); try { - for (Entry> entry : allTimelineEntries.entrySet()) { - if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) { - TimelineEntry foundEntry = entry.getValue().get(version); - if (foundEntry != null) { - return foundEntry.getPartitionHolder().getChunk(partitionNum); + + // Speed up search with an exact interval match lookup first + TreeMap versionEntries = allTimelineEntries.get(interval); + if (versionEntries != null) { + TimelineEntry foundEntry = versionEntries.get(version); + if (foundEntry != null) { + return foundEntry.getPartitionHolder().getChunk(partitionNum); + } + } + + // If an exact interval match is not found search for an encapsulating interval + + // If tree search is enabled use it else revert to checking all intervals + if (fastIntervalSearch) { + Map> possibleMatches = allTimeIntervals.findEncompassing(interval); + for (Entry> entry : possibleMatches.entrySet()) { + Interval enInterval = entry.getKey(); + if (enInterval.contains(interval)) { + TimelineEntry foundEntry = entry.getValue().get(version); + if (foundEntry != null) { + return foundEntry.getPartitionHolder().getChunk(partitionNum); + } + } + } + } else { + for (Entry> entry : allTimelineEntries.entrySet()) { + if (entry.getKey().contains(interval)) { + TimelineEntry foundEntry = entry.getValue().get(version); + if (foundEntry != null) { + return foundEntry.getPartitionHolder().getChunk(partitionNum); + } } } + } return null; @@ -747,21 +803,38 @@ private List> lookup(Interval inte timeline = completePartitionsTimeline; } - for (Entry entry : timeline.entrySet()) { - Interval timelineInterval = entry.getKey(); - TimelineEntry val = entry.getValue(); - - // exclude empty partition holders (i.e. tombstones) since they do not add value - // for higher level code...they have no data rows... - if ((!skipObjectsWithNoData || val.partitionHolder.hasData()) && timelineInterval.overlaps(interval)) { - retVal.add( - new TimelineObjectHolder<>( - timelineInterval, - val.getTrueInterval(), - val.getVersion(), - PartitionHolder.copyWithOnlyVisibleChunks(val.getPartitionHolder()) - ) - ); + if (fastIntervalSearch) { + IntervalTree tree = (IntervalTree) timeline; + tree.forEachMatching(timelineInterval -> timelineInterval.overlaps(interval), + (timelineInterval, val) -> { + if (!skipObjectsWithNoData || val.partitionHolder.hasData()) { + retVal.add( + new TimelineObjectHolder<>( + timelineInterval, + val.getTrueInterval(), + val.getVersion(), + PartitionHolder.copyWithOnlyVisibleChunks(val.getPartitionHolder()) + ) + ); + } + }); + } else { + for (Entry entry : timeline.entrySet()) { + Interval timelineInterval = entry.getKey(); + TimelineEntry val = entry.getValue(); + + // exclude empty partition holders (i.e. tombstones) since they do not add value + // for higher level code...they have no data rows... + if ((!skipObjectsWithNoData || val.partitionHolder.hasData()) && timelineInterval.overlaps(interval)) { + retVal.add( + new TimelineObjectHolder<>( + timelineInterval, + val.getTrueInterval(), + val.getVersion(), + PartitionHolder.copyWithOnlyVisibleChunks(val.getPartitionHolder()) + ) + ); + } } } diff --git a/processing/src/test/java/org/apache/druid/timeline/IntervalTreeTest.java b/processing/src/test/java/org/apache/druid/timeline/IntervalTreeTest.java new file mode 100644 index 000000000000..056f576b72ae --- /dev/null +++ b/processing/src/test/java/org/apache/druid/timeline/IntervalTreeTest.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class IntervalTreeTest +{ + + @Test + public void testSize() + { + IntervalTree tree = setupTree(baseData); + Assert.assertEquals("Size", 6, tree.size()); + } + + @Test + public void testPut() + { + IntervalTree tree = setupTree(baseData); + compareData(baseData, tree); + } + + @Test + public void testReplace() + { + IntervalTree tree = setupTree(baseData); + Pair entry = baseData.get(2); + Interval interval = entry.lhs; + String value = entry.rhs; + String newValue = value + "n"; + String oldValue = tree.put(interval, newValue); + Assert.assertEquals("Old value match", oldValue, value); + } + + @Test + public void testGet() + { + IntervalTree tree = setupTree(baseData); + baseData.forEach( + (Pair item) -> { + Interval interval = item.lhs; + String evalue = item.rhs; + String value = tree.get(interval); + Assert.assertEquals("value", evalue, value); + } + ); + } + + @Test + public void testValues() + { + IntervalTree tree = setupTree(baseData); + Collection values = tree.values(); + Collection bvalues = baseData.stream().map(entry -> entry.rhs).collect(Collectors.toList()); + Assert.assertTrue("values", CollectionUtils.isEqualCollection(bvalues, values)); + } + + @Test + public void testMatch() + { + IntervalTree tree = setupTree(baseData); + Map entries = tree.findEncompassing(Intervals.of("2025-01-04T00:00:00/P1D")); + + Assert.assertEquals(1, entries.size()); + Assert.assertEquals("Match", "v5", entries.get(Intervals.of("2025-01-04T00:00:00/P1D"))); + } + + @Test + public void testNoMatch() + { + IntervalTree tree = setupTree(baseData); + Map entries = tree.findEncompassing(Intervals.of("2025-01-07T00:00:00/P1D")); + + Assert.assertEquals(0, entries.size()); + } + + @Test + public void testOverlap() + { + IntervalTree tree = setupTree(overlapData); + Map entries = tree.findEncompassing(Intervals.of("2025-01-02T09:00:00/PT1H")); + + Assert.assertEquals(2, entries.size()); + Assert.assertEquals("Day match", "v4", entries.get(Intervals.of("2025-01-02T00:00:00/P1D"))); + Assert.assertEquals("Year match", "v7", entries.get(Intervals.of("2025-01-01T00:00:00/P1Y"))); + } + + @Test + public void testSparseOverlap() + { + IntervalTree tree = setupTree(sparseOverlapData); + Map entries = tree.findEncompassing(Intervals.of("2025-06-03T00:00:00/P1D")); + + Assert.assertEquals(4, entries.size()); + Assert.assertEquals("Match 1", "v1", entries.get(Intervals.of("2025-05-10T00:00:00/P1M"))); + Assert.assertEquals("Match 2", "v7", entries.get(Intervals.of("2025-06-03T00:00:00/P1D"))); + Assert.assertEquals("Match 3", "v13", entries.get(Intervals.of("2025-06-01T00:00:00/P1M"))); + Assert.assertEquals("Match 4", "v14", entries.get(Intervals.of("2025-01-01T00:00:00/P1Y"))); + } + + @Test + public void testRemove() + { + IntervalTree tree = setupTree(sparseOverlapData); + int size = tree.size(); + + // Remove node that does not exist + String intervalstr = "2025-03-11T00:00:00/P1M"; + String oldValue = tree.remove(Intervals.of(intervalstr)); + Assert.assertEquals("Size", size, tree.size()); + Assert.assertNull("Old value", oldValue); + List> expectedData = new ArrayList<>(sparseOverlapData); + compareData(expectedData, tree); + + // Remove leaf + intervalstr = "2025-06-01T00:00:00/P1M"; + String value = tree.get(Intervals.of(intervalstr)); + Assert.assertNotNull("Value", value); + + oldValue = tree.remove(Intervals.of(intervalstr)); + size--; + Assert.assertEquals("Size", size, tree.size()); + Assert.assertEquals("Old value", value, oldValue); + expectedData = new ArrayList<>(sparseOverlapData); + expectedData.remove(Pair.of(Intervals.of(intervalstr), value)); + compareData(expectedData, tree); + + // Remove node in penultimate level + intervalstr = "2025-09-04T00:00:00/P1D"; + value = tree.get(Intervals.of(intervalstr)); + Assert.assertNotNull("Value", value); + + oldValue = tree.remove(Intervals.of(intervalstr)); + size--; + Assert.assertEquals("Size", size, tree.size()); + Assert.assertEquals("Old value", value, oldValue); + expectedData = new ArrayList<>(expectedData); + expectedData.remove(Pair.of(Intervals.of(intervalstr), value)); + compareData(expectedData, tree); + + // Remove node at a higher level + intervalstr = "2025-07-12T00:00:00/P1D"; + value = tree.get(Intervals.of(intervalstr)); + Assert.assertNotNull("Value", value); + + oldValue = tree.remove(Intervals.of(intervalstr)); + size--; + Assert.assertEquals("Size", size, tree.size()); + Assert.assertEquals("Old value", value, oldValue); + expectedData = new ArrayList<>(expectedData); + expectedData.remove(Pair.of(Intervals.of(intervalstr), value)); + compareData(expectedData, tree); + } + + @Test + public void testRemoveRootAndMatch() + { + IntervalTree tree = setupTree(baseData); + tree.remove(Intervals.of("2025-01-03T00:00:00/P1D")); + Assert.assertEquals("Size", 5, tree.size()); + Map entries = tree.findEncompassing(Intervals.of("2025-01-04T00:00:00/P1D")); + Assert.assertEquals(1, entries.size()); + Assert.assertEquals("Match", "v5", entries.get(Intervals.of("2025-01-04T00:00:00/P1D"))); + } + + @Test + public void testRemoveMultiple() + { + IntervalTree tree = setupTree(sparseOverlapData); + int isize = tree.size(); + tree.remove(Intervals.of("2025-01-12T00:00:00/P1D")); + tree.remove(Intervals.of("2025-06-03T00:00:00/P1D")); + tree.remove(Intervals.of("2025-06-01T00:00:00/P1M")); + int csize = tree.size(); + Assert.assertEquals("Size", 3, isize - csize); + } + + @Test + public void testClear() + { + IntervalTree tree = setupTree(baseData); + tree.clear(); + Assert.assertEquals("Size", 0, tree.size()); + } + + @Test + public void testLargeLoadTree() + { + IntervalTree tree = new IntervalTree<>(Comparators.intervalsByStart(), Comparators.intervalsByEnd()); + List> expectedData = new ArrayList<>(); + Set existingIntervals = new HashSet<>(); + Random random = ThreadLocalRandom.current(); + int total = 100000; + int count = 0; + while (count < total) { + int year = random.nextInt(26) + 2000; + int month = random.nextInt(12) + 1; + int day = random.nextInt(28) + 1; + int hour = random.nextInt(23) + 1; + String intervalstr = year + "-" + month + "-" + day + "T" + hour + ":00:00/P" + ((count % 30) + 1) + "D"; + if (!existingIntervals.contains(intervalstr)) { + Interval interval = Intervals.of(intervalstr); + String value = "v" + count; + tree.put(interval, value); + expectedData.add(Pair.of(interval, value)); + existingIntervals.add(intervalstr); + ++count; + } + } + Assert.assertEquals("Size", total, tree.size()); + compareData(expectedData, tree); + } + + @Ignore + @Test + public void testPerf() + { + IntervalTree tree = new IntervalTree<>(Comparators.intervalsByStart(), Comparators.intervalsByEnd()); + List> expectedData = new ArrayList<>(); + Map mappedData = new HashMap<>(); + Set existingIntervals = new HashSet<>(); + Random random = ThreadLocalRandom.current(); + int total = 10000; + int count = 0; + while (count < total) { + int year = random.nextInt(26) + 2000; + int month = random.nextInt(12) + 1; + int day = random.nextInt(28) + 1; + int hour = random.nextInt(23) + 1; + String intervalstr = year + "-" + month + "-" + day + "T" + hour + ":00:00/P" + ((count % 30) + 1) + "D"; + if (!existingIntervals.contains(intervalstr)) { + Interval interval = Intervals.of(intervalstr); + String value = "v" + count; + tree.put(interval, value); + mappedData.put(interval, value); + expectedData.add(Pair.of(interval, value)); + existingIntervals.add(intervalstr); + ++count; + } + } + long start = System.currentTimeMillis(); + for (int i = 0; i < total; i++) { + Pair pair = expectedData.get(i); + Interval interval = pair.lhs; + for (Map.Entry entry : mappedData.entrySet()) { + if (entry.getKey().contains(interval)) { + break; + } + } + } + System.out.println("Seq find time " + (System.currentTimeMillis() - start)); + start = System.currentTimeMillis(); + for (int i = 0; i < total; i++) { + Pair pair = expectedData.get(i); + Interval interval = pair.lhs; + tree.findEncompassing(interval); + } + System.out.println("Tree find time " + (System.currentTimeMillis() - start)); + } + + @Test + public void testAutoRebalance() + { + IntervalTree tree = setupTree(sparseOverlapData); + Assert.assertEquals("Height", 4, tree.height()); + compareData(sparseOverlapData, tree); + } + + @Test + public void testManualRebalance() + { + // Set a high threshold so auto-rebalance does not happen + IntervalTree tree = setupTree(sparseOverlapData, t -> t.setImbalanceTolerance(100)); + Assert.assertEquals("Height", 4, tree.height()); + compareData(sparseOverlapData, tree); + tree.rebalance(); + Assert.assertEquals("Height", 3, tree.height()); + compareData(sparseOverlapData, tree); + } + + @Test + public void testIsEmpty() + { + IntervalTree tree = setupTree(sparseOverlapData); + Assert.assertFalse("Not Empty", tree.isEmpty()); + sparseOverlapData.forEach(t -> tree.remove(t.lhs)); + Assert.assertTrue("Empty", tree.isEmpty()); + } + + @Test + public void testFirstEntryAndKey() + { + IntervalTree tree = setupTree(sparseOverlapData); + Map.Entry entry = tree.firstEntry(); + Interval matchInterval = Intervals.of("2025-01-01T00:00:00/P1D"); + Assert.assertEquals("Entry interval", matchInterval, entry.getKey()); + Assert.assertEquals("Entry value", "v2", entry.getValue()); + + Interval interval = tree.firstKey(); + Assert.assertEquals("Interval key", matchInterval, interval); + } + + @Test + public void testLastEntryAndKey() + { + IntervalTree tree = setupTree(sparseOverlapData); + Map.Entry entry = tree.lastEntry(); + Interval matchInterval = Intervals.of("2025-10-06T00:00:00/P1M"); + Assert.assertEquals("Entry interval", matchInterval, entry.getKey()); + Assert.assertEquals("Entry value", "v12", entry.getValue()); + + Interval interval = tree.lastKey(); + Assert.assertEquals("Interval key", matchInterval, interval); + } + + @Test + public void testFloorKey() + { + IntervalTree tree = setupTree(sparseOverlapData); + + // Only one smaller entry + Interval floor = tree.floorKey(Intervals.of("2025-01-11T00:00:00/P1D")); + Assert.assertEquals("Floor key 1", Intervals.of("2025-01-01T00:00:00/P1Y"), floor); + + // Entry with same start date but different end date + floor = tree.lowerKey(Intervals.of("2025-01-01T00:00:00/P1M")); + Assert.assertEquals("Lower key 2", Intervals.of("2025-01-01T00:00:00/P1D"), floor); + + // Matching entry + floor = tree.floorKey(Intervals.of("2025-01-12T00:00:00/P1D")); + Assert.assertEquals("Floor key 3", Intervals.of("2025-01-12T00:00:00/P1D"), floor); + + // Random entry + floor = tree.floorKey(Intervals.of("2025-08-01T00:00:00/P1D")); + Assert.assertEquals("Floor key 4", Intervals.of("2025-07-12T00:00:00/P1D"), floor); + + // Last entry + floor = tree.floorKey(Intervals.of("2025-11-01T00:00:00/P1M")); + Assert.assertEquals("Floor key 5", Intervals.of("2025-10-06T00:00:00/P1M"), floor); + + // No smaller entry + floor = tree.floorKey(Intervals.of("2024-12-31T00:00:00/P1D")); + Assert.assertNull("Floor key 6", floor); + } + + @Test + public void testLowerKey() + { + IntervalTree tree = setupTree(sparseOverlapData); + + // Only one smaller entry + Interval lower = tree.lowerKey(Intervals.of("2025-01-11T00:00:00/P1D")); + Assert.assertEquals("Lower key 1", Intervals.of("2025-01-01T00:00:00/P1Y"), lower); + + // Entry with same start date but different end date + lower = tree.lowerKey(Intervals.of("2025-01-01T00:00:00/P1M")); + Assert.assertEquals("Lower key 2", Intervals.of("2025-01-01T00:00:00/P1D"), lower); + + // Matching entry + lower = tree.lowerKey(Intervals.of("2025-01-12T00:00:00/P1D")); + Assert.assertEquals("Lower key 3", Intervals.of("2025-01-01T00:00:00/P1Y"), lower); + + // Random entry + lower = tree.lowerKey(Intervals.of("2025-08-01T00:00:00/P1D")); + Assert.assertEquals("Lower key 4", Intervals.of("2025-07-12T00:00:00/P1D"), lower); + + // Last entry + lower = tree.lowerKey(Intervals.of("2025-11-01T00:00:00/P1M")); + Assert.assertEquals("Lower key 5", Intervals.of("2025-10-06T00:00:00/P1M"), lower); + + // No smaller entry + lower = tree.lowerKey(Intervals.of("2024-12-31T00:00:00/P1D")); + Assert.assertNull("Lower key 6", lower); + } + + @Test + public void testCeiinglKey() + { + IntervalTree tree = setupTree(sparseOverlapData); + + // First entry + Interval ceiling = tree.ceilingKey(Intervals.of("2024-12-31T00:00:00/P1D")); + Assert.assertEquals("Ceiling key 1", Intervals.of("2025-01-01T00:00:00/P1D"), ceiling); + + // Entry with same start date but different end date + ceiling = tree.ceilingKey(Intervals.of("2025-02-01T00:00:00/PT6H")); + Assert.assertEquals("Ceiling key 2", Intervals.of("2025-02-01T00:00:00/P1D"), ceiling); + + // Matching entry + ceiling = tree.ceilingKey(Intervals.of("2025-09-04T00:00:00/P1D")); + Assert.assertEquals("Ceiling key 3", Intervals.of("2025-09-04T00:00:00/P1D"), ceiling); + + // Random entry + ceiling = tree.ceilingKey(Intervals.of("2025-03-31T00:00:00/P1D")); + Assert.assertEquals("Ceiling key 4", Intervals.of("2025-04-02T00:00:00/P1D"), ceiling); + + // Only one greater entry + ceiling = tree.ceilingKey(Intervals.of("2025-09-28T00:00:00/P1D")); + Assert.assertEquals("Ceiling key 5", Intervals.of("2025-10-06T00:00:00/P1M"), ceiling); + + // No greater entry + ceiling = tree.ceilingKey(Intervals.of("2025-11-01T00:00:00/P1D")); + Assert.assertNull("Ceiling key 6", ceiling); + } + + @Test + public void testHigherKey() + { + IntervalTree tree = setupTree(sparseOverlapData); + + // First entry + Interval higher = tree.higherKey(Intervals.of("2024-12-31T00:00:00/P1D")); + Assert.assertEquals("Higher key 1", Intervals.of("2025-01-01T00:00:00/P1D"), higher); + + // Entry with same start date but different end date + higher = tree.ceilingKey(Intervals.of("2025-02-01T00:00:00/PT6H")); + Assert.assertEquals("Higher key 2", Intervals.of("2025-02-01T00:00:00/P1D"), higher); + + // Matching entry + higher = tree.higherKey(Intervals.of("2025-09-04T00:00:00/P1D")); + Assert.assertEquals("Higher key 3", Intervals.of("2025-10-06T00:00:00/P1M"), higher); + + // Random entry + higher = tree.higherKey(Intervals.of("2025-03-31T00:00:00/P1D")); + Assert.assertEquals("Higher key 4", Intervals.of("2025-04-02T00:00:00/P1D"), higher); + + // Only one greater entry + higher = tree.higherKey(Intervals.of("2025-09-28T00:00:00/P1D")); + Assert.assertEquals("Higher key 5", Intervals.of("2025-10-06T00:00:00/P1M"), higher); + + // No greater entry + higher = tree.higherKey(Intervals.of("2025-11-01T00:00:00/P1D")); + Assert.assertNull("Higher key 6", higher); + } + + private void compareData(List> inputData, IntervalTree tree) + { + //Iterator> iterator = tree.inOrderTraverse(); + Iterator> iterator = tree.entrySet().iterator(); + + List> expected = inputData.stream() + .sorted((p1, p2) -> Comparators.intervalsByStartThenEnd().compare(p1.lhs, p2.lhs)) + .collect(Collectors.toList()); + + compareEntries(expected.iterator(), iterator); + } + + private void compareEntries(Iterator> expected, Iterator> actual) + { + while (actual.hasNext()) { + Assert.assertTrue("Entry available", expected.hasNext()); + Pair expectedEntry = expected.next(); + Map.Entry actualEntry = actual.next(); + Assert.assertEquals("Interval match", expectedEntry.lhs, actualEntry.getKey()); + Assert.assertEquals("Value match", expectedEntry.rhs, actualEntry.getValue()); + } + Assert.assertFalse("No outstanding entries", expected.hasNext()); + } + + static List> baseData = new ArrayList<>(); + static List> overlapData = new ArrayList<>(); + static List> sparseOverlapData = new ArrayList<>(); + + static { + + baseData.add(Pair.of(Intervals.of("2025-01-03T00:00:00/P1D"), "v1")); + baseData.add(Pair.of(Intervals.of("2025-01-05T00:00:00/P1D"), "v2")); + baseData.add(Pair.of(Intervals.of("2025-01-01T00:00:00/P1D"), "v3")); + baseData.add(Pair.of(Intervals.of("2025-01-02T00:00:00/P1D"), "v4")); + baseData.add(Pair.of(Intervals.of("2025-01-04T00:00:00/P1D"), "v5")); + baseData.add(Pair.of(Intervals.of("2025-01-06T00:00:00/P1D"), "v6")); + + overlapData.addAll(baseData); + overlapData.add(Pair.of(Intervals.of("2025-01-01T00:00:00/P1Y"), "v7")); + + sparseOverlapData.add(Pair.of(Intervals.of("2025-05-10T00:00:00/P1M"), "v1")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-01-01T00:00:00/P1D"), "v2")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-02-01T00:00:00/P1D"), "v3")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-01-12T00:00:00/P1D"), "v4")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-07-12T00:00:00/P1D"), "v5")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-02-01T00:00:00/P1M"), "v6")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-06-03T00:00:00/P1D"), "v7")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-08-09T00:00:00/P1D"), "v8")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-08-02T00:00:00/P1M"), "v9")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-09-04T00:00:00/P1D"), "v10")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-04-02T00:00:00/P1D"), "v11")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-10-06T00:00:00/P1M"), "v12")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-06-01T00:00:00/P1M"), "v13")); + sparseOverlapData.add(Pair.of(Intervals.of("2025-01-01T00:00:00/P1Y"), "v14")); + + } + + private IntervalTree setupTree(List> inputData) + { + return setupTree(inputData, null); + } + + private IntervalTree setupTree(List> inputData, Consumer> setupFunc) + { + IntervalTree tree = new IntervalTree<>(Comparators.intervalsByStart(), Comparators.intervalsByEnd()); + if (setupFunc != null) { + setupFunc.accept(tree); + } + for (Pair entry : inputData) { + tree.put(entry.lhs, entry.rhs); + } + return tree; + } + +} diff --git a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java index a41eda2fc360..46d57f06bbd0 100644 --- a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java @@ -32,20 +32,40 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; /** * This test class is separated from {@link VersionedIntervalTimelineTest} because it populates specific data for tests * in {@link #setUp()}. */ +@RunWith(Parameterized.class) public class VersionedIntervalTimelineSpecificDataTest extends VersionedIntervalTimelineTestBase { + @Parameterized.Parameters + public static Collection parameters() + { + return Arrays.asList( + false, + true + ); + } + + public VersionedIntervalTimelineSpecificDataTest(boolean fastIntervalSearch) + { + this.fastIntervalSearch = fastIntervalSearch; + } + + private final boolean fastIntervalSearch; + @Before public void setUp() { - timeline = makeStringIntegerTimeline(); + timeline = makeStringIntegerTimeline(fastIntervalSearch); add("2011-04-01/2011-04-03", "1", 2); add("2011-04-03/2011-04-06", "1", 3); diff --git a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java index bd2e61e20cbf..ca332773e829 100644 --- a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java @@ -34,6 +34,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; @@ -43,13 +45,29 @@ /** */ +@RunWith(Parameterized.class) public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTestBase { + @Parameterized.Parameters + public static Collection parameters() + { + return Arrays.asList( + false, + true + ); + } + + public VersionedIntervalTimelineTest(boolean fastIntervalSearch) + { + this.fastIntervalSearch = fastIntervalSearch; + } + + private final boolean fastIntervalSearch; @Before public void setUp() { - timeline = makeStringIntegerTimeline(); + timeline = makeStringIntegerTimeline(fastIntervalSearch); } @Test diff --git a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java index 55f5c8ddfa97..fa6986cf28ae 100644 --- a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java +++ b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java @@ -110,7 +110,12 @@ static void assertSingleElementChunks( static VersionedIntervalTimeline makeStringIntegerTimeline() { - return new VersionedIntervalTimeline<>(Ordering.natural()); + return makeStringIntegerTimeline(false); + } + + static VersionedIntervalTimeline makeStringIntegerTimeline(boolean fastIntervalSearch) + { + return new VersionedIntervalTimeline<>(Ordering.natural(), false, fastIntervalSearch); } VersionedIntervalTimeline timeline; diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index 09b953204a43..7df8f23fe6e9 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -33,6 +33,7 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.segment.DefaultColumnFormatConfig; import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.indexing.TimelineConfig; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLocalCacheManager; @@ -64,6 +65,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); JsonConfigProvider.bind(binder, "druid.indexing.formats", DefaultColumnFormatConfig.class); + JsonConfigProvider.bind(binder, "druid.segment.timeline", TimelineConfig.class); bindLocationSelectorStrategy(binder); binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null)); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class).in(LazySingleton.class); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TimelineConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TimelineConfig.java new file mode 100644 index 000000000000..73c19aa99c55 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/TimelineConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +public class TimelineConfig +{ + @JsonProperty + private final boolean fastIntervalSearch; + + @JsonCreator + public TimelineConfig(@JsonProperty("fastIntervalSearch") @Nullable Boolean fastIntervalSearch) + { + this.fastIntervalSearch = fastIntervalSearch != null && fastIntervalSearch; + } + + public boolean isFastIntervalSearch() + { + return fastIntervalSearch; + } +} diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index a3d58e102510..b2a21693dce3 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.SegmentMapFunction; import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.indexing.TimelineConfig; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.ReferenceCountedIndexedTableProvider; import org.apache.druid.segment.loading.AcquireMode; @@ -71,12 +72,21 @@ public class SegmentManager private final SegmentCacheManager cacheManager; + private final TimelineConfig timelineConfig; + private final ConcurrentHashMap dataSources = new ConcurrentHashMap<>(); - @Inject public SegmentManager(SegmentCacheManager cacheManager) + { + this(cacheManager, new TimelineConfig(false)); + } + + + @Inject + public SegmentManager(SegmentCacheManager cacheManager, TimelineConfig timelineConfig) { this.cacheManager = cacheManager; + this.timelineConfig = timelineConfig; } @VisibleForTesting @@ -323,7 +333,7 @@ private void loadSegmentInternal( dataSources.compute( dataSegment.getDataSource(), (k, v) -> { - final DataSourceState dataSourceState = v == null ? new DataSourceState() : v; + final DataSourceState dataSourceState = v == null ? new DataSourceState(timelineConfig) : v; final VersionedIntervalTimeline loadedIntervals = dataSourceState.getTimeline(); final PartitionChunk entry = loadedIntervals.findChunk( @@ -500,8 +510,7 @@ public void shutdown() */ public static class DataSourceState { - private final VersionedIntervalTimeline timeline = - new VersionedIntervalTimeline<>(Ordering.natural()); + private final VersionedIntervalTimeline timeline; private final ConcurrentHashMap tablesLookup = new ConcurrentHashMap<>(); private long totalSegmentSize; @@ -509,6 +518,11 @@ public static class DataSourceState private long rowCount; private final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution(); + public DataSourceState(TimelineConfig timelineConfig) + { + timeline = new VersionedIntervalTimeline<>(Ordering.natural(), false, timelineConfig.isFastIntervalSearch()); + } + private void addSegment(DataSegment segment, long numOfRows) { totalSegmentSize += segment.getSize();