Skip to content

Commit 1401350

Browse files
committed
Updated MemorySegmentRequestExample to use ConcurrentHashMap.
1 parent 8ab3c35 commit 1401350

7 files changed

Lines changed: 81 additions & 80 deletions

File tree

src/main/java/org/apache/datasketches/common/MemorySegmentRequest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@
2929
public interface MemorySegmentRequest {
3030

3131
/**
32-
* Request a new MemorySegment with the given <i>newByteSize</i>.
32+
* Request a new heap MemorySegment with the given <i>newByteSize</i>.
3333
* Because we do not have a reference to an Arena, the default here is to
3434
* allocate a new MemorySegment on the heap. It is up to the user to override this as appropriate.
35-
* @param prevSeg the previous MemorySegment to be possibly closed here or by using the separate
36-
* {@link #requestClose requestClose} method. This is included for convenience, it may be null.
3735
* @param newByteSize The new <i>byteSize</i> being requested.
3836
* @return new MemorySegment with the requested <i>byteSize</i>.
3937
*/
40-
default MemorySegment request(final MemorySegment prevSeg, final long newByteSize) {
38+
default MemorySegment request(final long newByteSize) {
4139
if (newByteSize > Integer.MAX_VALUE) {
4240
throw new SketchesArgumentException("Requested size in bytes exceeds Integer.MAX_VALUE.");
4341
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datasketches.common;
21+
22+
import java.lang.foreign.Arena;
23+
24+
import java.lang.foreign.MemorySegment;
25+
import java.util.Enumeration;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
28+
/**
29+
* This is an example of a possible implementation of the MemorySegmentRequest interface
30+
* where all requested segments are allocated off-heap. A local ConcurrentHashMap tracks a newly created confined Arena
31+
* for every new MemorySegment allocated off-heap. This allows individual segments to be freed
32+
* immediately upon receiving the {@link #requestClose(MemorySegment) requestClose(MemorySegment)} call.
33+
*/
34+
public final class MemorySegmentRequestExample implements MemorySegmentRequest {
35+
private final ConcurrentHashMap<MemorySegment, Arena> map = new ConcurrentHashMap<>();
36+
37+
/**
38+
* Request a new off-heap MemorySegment with the given <i>newByteSeze</i>.
39+
* An internal confined Arena is created to exclusively manage the new segment and it is associated
40+
* with the new segment with a ConcurrentHashMap.
41+
*/
42+
@Override
43+
public synchronized MemorySegment request(final long newByteSize) {
44+
final Arena arena = Arena.ofConfined();
45+
final MemorySegment seg = arena.allocate(newByteSize);
46+
map.put(seg, arena);
47+
return seg;
48+
49+
}
50+
51+
@Override
52+
public synchronized void requestClose(final MemorySegment segKey) {
53+
final Arena arena = map.get(segKey);
54+
if (arena == null) { throw new SketchesArgumentException("Given MemorySegment key is not mapped to an Arena!"); }
55+
if (arena.scope().isAlive()) {
56+
arena.close();
57+
map.remove(segKey);
58+
}
59+
}
60+
61+
/**
62+
* This cleans up any unclosed, off-heap MemorySegments.
63+
*/
64+
public synchronized void cleanup() {
65+
for (final Enumeration<Arena> e = map.elements(); e.hasMoreElements();) {
66+
final Arena arena = e.nextElement();
67+
if (arena.scope().isAlive()) {
68+
arena.close();
69+
}
70+
}
71+
}
72+
73+
}

src/main/java/org/apache/datasketches/common/MemorySegmentRequestExtension.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

src/main/java/org/apache/datasketches/kll/KllHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ static MemorySegment memorySegmentSpaceMgmt(
357357
if (mSegReq == null) {
358358
mSegReq = MemorySegmentRequest.DEFAULT;
359359
}
360-
final MemorySegment newSeg = mSegReq.request(oldWseg, requiredSketchBytes);
360+
final MemorySegment newSeg = mSegReq.request(requiredSketchBytes);
361361
MemorySegment.copy(oldWseg, 0, newSeg, 0, DATA_START_ADR); //copy preamble (first 20 bytes)
362362
mSegReq.requestClose(oldWseg);
363363
return newSeg;

src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ private MemorySegment growCombinedSegBuffer(final int itemSpaceNeeded) {
358358

359359
mSegReq_ = (mSegReq_ == null) ? MemorySegmentRequest.DEFAULT : mSegReq_;
360360

361-
final MemorySegment newSeg = mSegReq_.request(seg_, needBytes);
361+
final MemorySegment newSeg = mSegReq_.request(needBytes);
362362
MemorySegment.copy(seg_, 0, newSeg, 0, segBytes);
363363
mSegReq_.requestClose(seg_);
364364
return newSeg;

src/test/java/org/apache/datasketches/kll/KllMemorySegmentRequestApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.lang.foreign.Arena;
2828
import java.lang.foreign.MemorySegment;
2929

30-
import org.apache.datasketches.common.MemorySegmentRequestExtension;
30+
import org.apache.datasketches.common.MemorySegmentRequestExample;
3131
import org.testng.annotations.Test;
3232

3333

@@ -49,7 +49,7 @@ public void checkMemorySegmentRequestExtension() {
4949
final MemorySegment seg = arena.allocate(numBytes);
5050

5151
//Use the custom extension of the MemorySegmentRequest interface.
52-
final MemorySegmentRequestExtension mSegReqExt = new MemorySegmentRequestExtension();
52+
final MemorySegmentRequestExample mSegReqExt = new MemorySegmentRequestExample();
5353

5454
//Create a new KllLongsSketch and pass the custom extension
5555
final KllLongsSketch sk = KllLongsSketch.newDirectInstance(k, seg, mSegReqExt);

src/test/java/org/apache/datasketches/quantiles/ClassicQuantilesMemorySegmentRequestApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.lang.foreign.Arena;
2626
import java.lang.foreign.MemorySegment;
2727

28-
import org.apache.datasketches.common.MemorySegmentRequestExtension;
28+
import org.apache.datasketches.common.MemorySegmentRequestExample;
2929
import org.testng.annotations.Test;
3030

3131
public class ClassicQuantilesMemorySegmentRequestApp {
@@ -47,7 +47,7 @@ public void checkMemorySegmentRequestExtension() {
4747
final MemorySegment seg = arena.allocate(initalBytes);
4848

4949
//Use the custom extension of the MemorySegmentRequest interface.
50-
final MemorySegmentRequestExtension mSegReqExt = new MemorySegmentRequestExtension();
50+
final MemorySegmentRequestExample mSegReqExt = new MemorySegmentRequestExample();
5151

5252
//Create a new KllLongsSketch and pass the custom extension
5353
final DoublesSketchBuilder bldr = DoublesSketch.builder().setK(k);

0 commit comments

Comments
 (0)