Skip to content

Commit eba1d8c

Browse files
committed
Refactor MemorySegmentRequest to make concurrent.
1 parent 1401350 commit eba1d8c

16 files changed

Lines changed: 239 additions & 197 deletions

src/main/java/org/apache/datasketches/common/MemorySegmentRequestExample.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424
import java.lang.foreign.MemorySegment;
2525
import java.util.Enumeration;
2626
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.Objects;
2728

2829
/**
2930
* This is an example of a possible implementation of the MemorySegmentRequest interface
30-
* where all requested segments are allocated off-heap. A local ConcurrentHashMap tracks a newly created confined Arena
31-
* for every new MemorySegment allocated off-heap. This allows individual segments to be freed
31+
* where all requested segments are allocated off-heap. A local ConcurrentHashMap tracks a newly created
32+
* confined Arena for every new MemorySegment allocated off-heap. This allows individual segments to be freed
3233
* immediately upon receiving the {@link #requestClose(MemorySegment) requestClose(MemorySegment)} call.
3334
*/
3435
public final class MemorySegmentRequestExample implements MemorySegmentRequest {
@@ -50,24 +51,26 @@ public synchronized MemorySegment request(final long newByteSize) {
5051

5152
@Override
5253
public synchronized void requestClose(final MemorySegment segKey) {
54+
Objects.requireNonNull(segKey, "MemorySegment segKey must not be null");
5355
final Arena arena = map.get(segKey);
54-
if (arena == null) { throw new SketchesArgumentException("Given MemorySegment key is not mapped to an Arena!"); }
55-
if (arena.scope().isAlive()) {
56-
arena.close();
56+
if (arena != null) {
57+
if (arena.scope().isAlive()) { arena.close(); }
5758
map.remove(segKey);
59+
} else {
60+
//ignore or
61+
//throw new SketchesArgumentException("Given MemorySegment key is not mapped to an Arena!");
5862
}
5963
}
6064

6165
/**
62-
* This cleans up any unclosed, off-heap MemorySegments.
66+
* This closes any unclosed, off-heap MemorySegments and removes all mappings from the map.
6367
*/
6468
public synchronized void cleanup() {
65-
for (final Enumeration<Arena> e = map.elements(); e.hasMoreElements();) {
69+
for (final Enumeration<Arena> e = map.elements(); e.hasMoreElements(); ) {
6670
final Arena arena = e.nextElement();
67-
if (arena.scope().isAlive()) {
68-
arena.close();
69-
}
71+
if (arena.scope().isAlive()) { arena.close(); }
7072
}
73+
map.clear();
7174
}
7275

7376
}

src/main/java/org/apache/datasketches/theta/ConcurrentDirectQuickSelectSketch.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ final class ConcurrentDirectQuickSelectSketch extends DirectQuickSelectSketch
7575
final double maxConcurrencyError, final MemorySegment dstSeg) {
7676
super(lgNomLongs, seed, 1.0F, //p
7777
ResizeFactor.X1, //rf,
78-
dstSeg, false); //unionGadget
78+
dstSeg,
79+
null,
80+
false); //unionGadget
7981

8082
volatileThetaLong_ = Long.MAX_VALUE;
8183
volatileEstimate_ = 0;
@@ -91,6 +93,7 @@ final class ConcurrentDirectQuickSelectSketch extends DirectQuickSelectSketch
9193
super(sketch.getLgNomLongs(), seed, 1.0F, //p
9294
ResizeFactor.X1, //rf,
9395
dstSeg,
96+
null,
9497
false); //unionGadget
9598

9699
exactLimit_ = ConcurrentSharedThetaSketch.computeExactLimit(1L << getLgNomLongs(),
@@ -115,7 +118,7 @@ public double getEstimate() {
115118

116119
@Override
117120
public boolean isEstimationMode() {
118-
return (getRetainedEntries(false) > exactLimit_) || super.isEstimationMode();
121+
return getRetainedEntries(false) > exactLimit_ || super.isEstimationMode();
119122
}
120123

121124
@Override
@@ -164,7 +167,7 @@ public long getExactLimit() {
164167
@Override
165168
public boolean startEagerPropagation() {
166169
while (!sharedPropagationInProgress_.compareAndSet(false, true)) { /* busy wait till free */ }
167-
return (!isEstimationMode());// no eager propagation is allowed in estimation mode
170+
return !isEstimationMode();// no eager propagation is allowed in estimation mode
168171
}
169172

170173
@Override
@@ -206,8 +209,8 @@ public void initBgPropagationService() {
206209
public boolean propagate(final AtomicBoolean localPropagationInProgress,
207210
final Sketch sketchIn, final long singleHash) {
208211
final long epoch = epoch_;
209-
if ((singleHash != NOT_SINGLE_HASH) // namely, is a single hash and
210-
&& (getRetainedEntries(false) < exactLimit_)) { // a small sketch then propagate myself (blocking)
212+
if (singleHash != NOT_SINGLE_HASH // namely, is a single hash and
213+
&& getRetainedEntries(false) < exactLimit_) { // a small sketch then propagate myself (blocking)
211214
if (!startEagerPropagation()) {
212215
endPropagation(localPropagationInProgress, true);
213216
return false;

src/main/java/org/apache/datasketches/theta/DirectQuickSelectSketch.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.lang.foreign.MemorySegment;
6161

6262
import org.apache.datasketches.common.Family;
63+
import org.apache.datasketches.common.MemorySegmentRequest;
6364
import org.apache.datasketches.common.ResizeFactor;
6465
import org.apache.datasketches.common.SketchesArgumentException;
6566
import org.apache.datasketches.common.SuppressFBWarnings;
@@ -81,15 +82,19 @@
8182
class DirectQuickSelectSketch extends DirectQuickSelectSketchR {
8283
private static final double DQS_RESIZE_THRESHOLD = 15.0 / 16.0; //tuned for space
8384
int hashTableThreshold_; //computed and mutable, kept only on heap, never serialized.
85+
private final MemorySegmentRequest mSegReq;
8486

8587
/**
86-
* Construct this sketch as a result of a wrap operation where the given MemorySegment already has a sketch image.
87-
* @param wseg the given MemorySegment that has a sketch image.
88+
* Construct this sketch as a result of a wrap operation where the given MemorySegment already has an updatable sketch image.
89+
* @param wseg the given MemorySegment that has an updatable sketch image.
90+
* @param mSegReq an implementation of the MemorySegmentRequest interface or null.
8891
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
8992
*/
9093
private DirectQuickSelectSketch(
9194
final MemorySegment wseg,
95+
final MemorySegmentRequest mSegReq,
9296
final long seed) {
97+
this.mSegReq = mSegReq == null ? MemorySegmentRequest.DEFAULT : mSegReq;
9398
super(wseg, seed);
9499
}
95100

@@ -105,6 +110,7 @@ private DirectQuickSelectSketch(
105110
* <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a>
106111
* @param dstSeg the given MemorySegment object destination. It cannot be null.
107112
* It will be cleared prior to use.
113+
* @param mSegReq an implementation of the MemorySegmentRequest interface or null.
108114
* @param unionGadget true if this sketch is implementing the Union gadget function.
109115
* Otherwise, it is behaving as a normal QuickSelectSketch.
110116
*/
@@ -114,6 +120,7 @@ private DirectQuickSelectSketch(
114120
final float p,
115121
final ResizeFactor rf,
116122
final MemorySegment dstSeg,
123+
final MemorySegmentRequest mSegReq,
117124
final boolean unionGadget) {
118125

119126
//Choose family, preambleLongs
@@ -129,7 +136,7 @@ private DirectQuickSelectSketch(
129136
final long curSegCapBytes = dstSeg.byteSize();
130137
if (curSegCapBytes < minReqBytes) {
131138
throw new SketchesArgumentException(
132-
"MemorySegment capacity is too small: " + curSegCapBytes + " < " + minReqBytes);
139+
"MemorySegment capacity is less than minimum required: " + curSegCapBytes + " < " + minReqBytes);
133140
}
134141

135142
//@formatter:off
@@ -153,17 +160,22 @@ private DirectQuickSelectSketch(
153160
//clear hash table area
154161
dstSeg.asSlice(preambleLongs << 3, Long.BYTES << lgArrLongs).fill((byte)0);
155162
hashTableThreshold_ = getOffHeapHashTableThreshold(lgNomLongs, lgArrLongs);
163+
this.mSegReq = mSegReq == null ? MemorySegmentRequest.DEFAULT : mSegReq;
156164
super(dstSeg, seed);
157165
}
158166

159167
/**
160-
* Wrap a sketch around the given source MemorySegment containing sketch data that originated from
161-
* this sketch.
168+
* Wrap a sketch around the given source MemorySegment containing sketch data that originated from this sketch.
162169
* @param srcSeg The given MemorySegment object must be in hash table form and not read only.
170+
* @param mSegReq an implementation of the MemorySegmentRequest interface or null.
163171
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>
164172
* @return instance of this sketch
165173
*/
166-
static DirectQuickSelectSketch writableWrap(final MemorySegment srcSeg, final long seed) {
174+
//called from UnionImpl and UpdateSketch
175+
static DirectQuickSelectSketch writableWrap(
176+
final MemorySegment srcSeg,
177+
final MemorySegmentRequest mSegReq,
178+
final long seed) {
167179
final int preambleLongs = extractPreLongs(srcSeg); //byte 0
168180
final int lgNomLongs = extractLgNomLongs(srcSeg); //byte 3
169181
final int lgArrLongs = extractLgArrLongs(srcSeg); //byte 4
@@ -176,7 +188,7 @@ static DirectQuickSelectSketch writableWrap(final MemorySegment srcSeg, final lo
176188
insertLgResizeFactor(srcSeg, ResizeFactor.X2.lg());
177189
}
178190

179-
final DirectQuickSelectSketch dqss = new DirectQuickSelectSketch(srcSeg, seed);
191+
final DirectQuickSelectSketch dqss = new DirectQuickSelectSketch(srcSeg, mSegReq, seed);
180192
dqss.hashTableThreshold_ = getOffHeapHashTableThreshold(lgNomLongs, lgArrLongs);
181193
return dqss;
182194
}
@@ -185,14 +197,19 @@ static DirectQuickSelectSketch writableWrap(final MemorySegment srcSeg, final lo
185197
* Fast-wrap a sketch around the given source MemorySegment containing sketch data that originated from
186198
* this sketch. This does NO validity checking of the given MemorySegment.
187199
* @param srcSeg The given MemorySegment must be in hash table form and not read only.
200+
* @param mSegReq an implementation of the MemorySegmentRequest interface or null.
188201
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>
189202
* @return instance of this sketch
190203
*/
191-
static DirectQuickSelectSketch fastWritableWrap(final MemorySegment srcSeg, final long seed) {
204+
//called from UnionImpl <- Union
205+
static DirectQuickSelectSketch fastWritableWrap(
206+
final MemorySegment srcSeg,
207+
final MemorySegmentRequest mSegReq,
208+
final long seed) {
192209
final int lgNomLongs = extractLgNomLongs(srcSeg); //byte 3
193210
final int lgArrLongs = extractLgArrLongs(srcSeg); //byte 4
194211

195-
final DirectQuickSelectSketch dqss = new DirectQuickSelectSketch(srcSeg, seed);
212+
final DirectQuickSelectSketch dqss = new DirectQuickSelectSketch(srcSeg, mSegReq, seed);
196213
dqss.hashTableThreshold_ = getOffHeapHashTableThreshold(lgNomLongs, lgArrLongs);
197214
return dqss;
198215
}
@@ -205,7 +222,7 @@ static DirectQuickSelectSketch fastWritableWrap(final MemorySegment srcSeg, fina
205222
public UpdateSketch rebuild() {
206223
final int lgNomLongs = getLgNomLongs();
207224
final int preambleLongs = wseg_.get(JAVA_BYTE, PREAMBLE_LONGS_BYTE) & 0X3F;
208-
if (getRetainedEntries(true) > (1 << lgNomLongs)) {
225+
if (getRetainedEntries(true) > 1 << lgNomLongs) {
209226
quickSelectAndRebuild(wseg_, preambleLongs, lgNomLongs);
210227
}
211228
return this;
@@ -279,10 +296,13 @@ UpdateReturnState hashUpdate(final long hash) {
279296
tgtLgArrLongs = Math.min(lgArrLongs + lgRF, lgNomLongs + 1);
280297
final int tgtArrBytes = 8 << tgtLgArrLongs;
281298
final int reqBytes = tgtArrBytes + preBytes;
282-
final MemorySegment newDstSeg = MemorySegment.ofArray(new byte[reqBytes]); //always on-heap //TODO ADD MemSegReq
299+
300+
final MemorySegment newDstSeg = mSegReq.request(reqBytes);
283301

284302
moveAndResize(wseg_, preambleLongs, lgArrLongs, newDstSeg, tgtLgArrLongs, thetaLong);
303+
final MemorySegment oldSeg = wseg_;
285304
wseg_ = newDstSeg;
305+
mSegReq.requestClose(oldSeg);
286306

287307
hashTableThreshold_ = getOffHeapHashTableThreshold(lgNomLongs, tgtLgArrLongs);
288308
return InsertedCountIncrementedResized;

src/main/java/org/apache/datasketches/theta/PreambleUtil.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ static String preambleToString(final MemorySegment seg) {
272272

273273
//Flags
274274
final int flags = extractFlags(seg);
275-
final String flagsStr = (flags) + ", 0x" + (Integer.toHexString(flags)) + ", "
275+
final String flagsStr = flags + ", 0x" + Integer.toHexString(flags) + ", "
276276
+ zeroPad(Integer.toBinaryString(flags), 8);
277277
final boolean readOnly = (flags & READ_ONLY_FLAG_MASK) > 0;
278278
final boolean empty = (flags & EMPTY_FLAG_MASK) > 0;
@@ -377,11 +377,11 @@ else if (preLongs == 3) {
377377
//@formatter:on
378378

379379
static int extractPreLongs(final MemorySegment seg) {
380-
return seg.get(JAVA_BYTE, PREAMBLE_LONGS_BYTE) & 0X3F;
380+
return seg.get(JAVA_BYTE, PREAMBLE_LONGS_BYTE) & 0X3F; //for SerVer 1,2,3
381381
}
382382

383383
static int extractLgResizeFactor(final MemorySegment seg) {
384-
return (seg.get(JAVA_BYTE, PREAMBLE_LONGS_BYTE) >>> LG_RESIZE_FACTOR_BIT) & 0X3;
384+
return seg.get(JAVA_BYTE, PREAMBLE_LONGS_BYTE) >>> LG_RESIZE_FACTOR_BIT & 0X3;
385385
}
386386

387387
static int extractLgResizeRatioV1(final MemorySegment seg) {
@@ -463,7 +463,7 @@ static void insertLgResizeFactor(final MemorySegment seg, final int rf) {
463463
final int curByte = seg.get(JAVA_BYTE, PREAMBLE_LONGS_BYTE) & 0xFF;
464464
final int shift = LG_RESIZE_FACTOR_BIT; // shift in bits
465465
final int mask = 3;
466-
final byte newByte = (byte) (((rf & mask) << shift) | (~(mask << shift) & curByte));
466+
final byte newByte = (byte) ((rf & mask) << shift | ~(mask << shift) & curByte);
467467
seg.set(JAVA_BYTE, PREAMBLE_LONGS_BYTE, newByte);
468468
}
469469

@@ -520,7 +520,7 @@ static void clearEmpty(final MemorySegment seg) {
520520
}
521521

522522
static boolean isEmptyFlag(final MemorySegment seg) {
523-
return ((extractFlags(seg) & EMPTY_FLAG_MASK) > 0);
523+
return (extractFlags(seg) & EMPTY_FLAG_MASK) > 0;
524524
}
525525

526526
/**

src/main/java/org/apache/datasketches/theta/Sketch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public static int getMaxUpdateSketchBytes(final int nomEntries) {
340340
* @return the maximum number of storage bytes required for a UpdateSketch with the given lgNomEntries
341341
*/
342342
public static int getUpdateSketchMaxBytes(final int lgNomEntries) {
343-
return (1 << lgNomEntries << 4) + (Family.QUICKSELECT.getMaxPreLongs() << 3);
343+
return (16 << lgNomEntries) + (Family.QUICKSELECT.getMaxPreLongs() << 3);
344344
}
345345

346346
/**

src/main/java/org/apache/datasketches/theta/Sketches.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,12 @@ public static UpdateSketch wrapUpdateSketch(final MemorySegment srcSeg) {
355355
* @return {@link UpdateSketch UpdateSketch}
356356
*/
357357
public static UpdateSketch wrapUpdateSketch(final MemorySegment srcSeg, final long expectedSeed) {
358-
return UpdateSketch.wrap(srcSeg, expectedSeed);
358+
return UpdateSketch.wrap(srcSeg, null, expectedSeed);
359359
}
360360

361361
//Restricted static methods
362362

363-
static void checkIfValidThetaSketch(final MemorySegment srcSeg) {
363+
private static void checkIfValidThetaSketch(final MemorySegment srcSeg) {
364364
final int fam = srcSeg.get(JAVA_BYTE, FAMILY_BYTE);
365365
if (!Sketch.isValidSketchID(fam)) {
366366
throw new SketchesArgumentException("Source MemorySegment not a valid Sketch. Family: "
@@ -371,7 +371,7 @@ static void checkIfValidThetaSketch(final MemorySegment srcSeg) {
371371
static boolean getEmpty(final MemorySegment srcSeg) {
372372
final int serVer = srcSeg.get(JAVA_BYTE, SER_VER_BYTE);
373373
if (serVer == 1) {
374-
return ((getThetaLong(srcSeg) == Long.MAX_VALUE) && (getRetainedEntries(srcSeg) == 0));
374+
return getThetaLong(srcSeg) == Long.MAX_VALUE && getRetainedEntries(srcSeg) == 0;
375375
}
376376
return (srcSeg.get(JAVA_BYTE, FLAGS_BYTE) & EMPTY_FLAG_MASK) != 0; //for SerVer 2 & 3
377377
}
@@ -384,7 +384,7 @@ static int getRetainedEntries(final MemorySegment srcSeg) {
384384
final int serVer = srcSeg.get(JAVA_BYTE, SER_VER_BYTE);
385385
if (serVer == 1) {
386386
final int entries = srcSeg.get(JAVA_INT_UNALIGNED, RETAINED_ENTRIES_INT);
387-
if ((getThetaLong(srcSeg) == Long.MAX_VALUE) && (entries == 0)) {
387+
if (getThetaLong(srcSeg) == Long.MAX_VALUE && entries == 0) {
388388
return 0;
389389
}
390390
return entries;
@@ -401,6 +401,6 @@ static int getRetainedEntries(final MemorySegment srcSeg) {
401401

402402
static long getThetaLong(final MemorySegment srcSeg) {
403403
final int preLongs = getPreambleLongs(srcSeg);
404-
return (preLongs < 3) ? Long.MAX_VALUE : srcSeg.get(JAVA_LONG_UNALIGNED, THETA_LONG); //for SerVer 1,2,3
404+
return preLongs < 3 ? Long.MAX_VALUE : srcSeg.get(JAVA_LONG_UNALIGNED, THETA_LONG); //for SerVer 1,2,3
405405
}
406406
}

src/main/java/org/apache/datasketches/theta/UnionImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ static UnionImpl initNewDirectInstance(
105105
final ResizeFactor rf,
106106
final MemorySegment dstSeg) {
107107
final UpdateSketch gadget = //create with UNION family
108-
new DirectQuickSelectSketch(lgNomLongs, seed, p, rf, dstSeg, true);
108+
new DirectQuickSelectSketch(lgNomLongs, seed, p, rf, dstSeg, null, true);
109109
final UnionImpl unionImpl = new UnionImpl(gadget, seed);
110110
unionImpl.unionThetaLong_ = gadget.getThetaLong();
111111
unionImpl.unionEmpty_ = gadget.isEmpty();
@@ -142,7 +142,7 @@ static UnionImpl fastWrapInstance(final MemorySegment srcSeg, final long expecte
142142
Family.UNION.checkFamilyID(extractFamilyID(srcSeg));
143143
final UpdateSketch gadget = srcSeg.isReadOnly()
144144
? DirectQuickSelectSketchR.fastReadOnlyWrap(srcSeg, expectedSeed)
145-
: DirectQuickSelectSketch.fastWritableWrap(srcSeg, expectedSeed);
145+
: DirectQuickSelectSketch.fastWritableWrap(srcSeg, null, expectedSeed);
146146
final UnionImpl unionImpl = new UnionImpl(gadget, expectedSeed);
147147
unionImpl.unionThetaLong_ = extractUnionThetaLong(srcSeg);
148148
unionImpl.unionEmpty_ = PreambleUtil.isEmptyFlag(srcSeg);
@@ -151,17 +151,17 @@ static UnionImpl fastWrapInstance(final MemorySegment srcSeg, final long expecte
151151

152152
/**
153153
* Wrap a Union object around a Union MemorySegment object containing data.
154-
* Called by SetOperation.
155154
* @param srcSeg The source MemorySegment object.
156155
* @param expectedSeed the seed used to validate the given MemorySegment image.
157156
* <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
158157
* @return this class
159158
*/
159+
//Called by SetOperation and Union
160160
static UnionImpl wrapInstance(final MemorySegment srcSeg, final long expectedSeed) {
161161
Family.UNION.checkFamilyID(extractFamilyID(srcSeg));
162162
final UpdateSketch gadget = srcSeg.isReadOnly()
163163
? DirectQuickSelectSketchR.readOnlyWrap(srcSeg, expectedSeed)
164-
: DirectQuickSelectSketch.writableWrap(srcSeg, expectedSeed);
164+
: DirectQuickSelectSketch.writableWrap(srcSeg, null, expectedSeed);
165165
final UnionImpl unionImpl = new UnionImpl(gadget, expectedSeed);
166166
unionImpl.unionThetaLong_ = extractUnionThetaLong(srcSeg);
167167
unionImpl.unionEmpty_ = PreambleUtil.isEmptyFlag(srcSeg);
@@ -269,7 +269,7 @@ public CompactSketch union(final Sketch sketchA, final Sketch sketchB, final boo
269269
public void union(final Sketch sketchIn) {
270270
//UNION Empty Rule: AND the empty states.
271271

272-
if ((sketchIn == null) || sketchIn.isEmpty()) {
272+
if (sketchIn == null || sketchIn.isEmpty()) {
273273
//null and empty is interpreted as (Theta = 1.0, count = 0, empty = T). Nothing changes
274274
return;
275275
}
@@ -287,7 +287,7 @@ public void union(final Sketch sketchIn) {
287287
final HashIterator it = sketchIn.iterator();
288288
while (it.next()) {
289289
final long hash = it.get();
290-
if ((hash < unionThetaLong_) && (hash < gadget_.getThetaLong())) {
290+
if (hash < unionThetaLong_ && hash < gadget_.getThetaLong()) {
291291
gadget_.hashUpdate(hash); // backdoor update, hash function is bypassed
292292
} else if (isOrdered) { break; }
293293
}

0 commit comments

Comments
 (0)