@@ -385,35 +385,101 @@ public Stream<T> parallelStreamFlattened() {
385385 // --- Spliterators ---
386386
387387 /**
388- * Abstract base class for dataset spliterators.
388+ * Abstract base class for dataset spliterators with caching .
389389 *
390390 * @param <R> the type of elements produced by the spliterator
391391 */
392- private abstract class AbstractSpliterator <R > implements Spliterator <R > {
392+ public abstract class AbstractSpliterator <R > implements Spliterator <R > {
393393 private long currentIndex ;
394394 private final long limit ;
395395 private final long recordSize ;
396+ private final int cacheSize = 10000 ; // Max records to cache
397+ private ByteBuffer cacheBuffer ; // Cache for raw bytes
398+ private int cachePosition = 0 ; // Current position in cache (record index)
399+ private int cacheLimit = 0 ; // Number of records in cache
396400
397401 public AbstractSpliterator (long start , long limit , long recordSize ) {
398402 this .currentIndex = start ;
399403 this .limit = limit ;
400404 this .recordSize = recordSize ;
405+ this .cacheBuffer = ByteBuffer .allocate ((int ) Math .min (cacheSize * recordSize , Integer .MAX_VALUE ));
401406 }
402407
403408 @ Override
404409 public boolean tryAdvance (Consumer <? super R > action ) {
405410 if (currentIndex >= limit ) {
406411 return false ;
407412 }
408- long offset = currentIndex * recordSize ;
409- ByteBuffer buffer = null ;
413+
414+ // Refresh cache if empty
415+ if (cachePosition >= cacheLimit ) {
416+ if (!fillCache ()) {
417+ return false ;
418+ }
419+ }
420+
410421 try {
411- buffer = readBytes (offset , recordSize );
412- R record = populateRecord (buffer );
422+ // Process one record from cache
423+ cacheBuffer .position (cachePosition * (int ) recordSize );
424+ R record = populateRecord (cacheBuffer );
413425 action .accept (record );
426+ cachePosition ++;
414427 currentIndex ++;
415428 return true ;
416- } catch (InvocationTargetException | InstantiationException | IllegalAccessException | IOException e ) {
429+ } catch (IOException | InvocationTargetException | InstantiationException | IllegalAccessException e ) {
430+ throw new IllegalStateException (e );
431+ }
432+ }
433+
434+ @ Override
435+ public void forEachRemaining (Consumer <? super R > action ) {
436+ while (currentIndex < limit ) {
437+ // Refresh cache if empty
438+ if (cachePosition >= cacheLimit ) {
439+ if (!fillCache ()) {
440+ break ;
441+ }
442+ }
443+
444+ // Process all records in cache
445+ while (cachePosition < cacheLimit && currentIndex < limit ) {
446+ try {
447+ cacheBuffer .position (cachePosition * (int ) recordSize );
448+ R record = populateRecord (cacheBuffer );
449+ action .accept (record );
450+ cachePosition ++;
451+ currentIndex ++;
452+ } catch (IOException | InvocationTargetException | InstantiationException | IllegalAccessException e ) {
453+ throw new IllegalStateException (e );
454+ }
455+ }
456+ }
457+ }
458+
459+ private boolean fillCache () {
460+ if (currentIndex >= limit ) {
461+ return false ;
462+ }
463+
464+ try {
465+ // Determine how many records to read: min(remaining, cacheSize)
466+ int toRead = (int ) Math .min (limit - currentIndex , cacheSize );
467+ long offset = currentIndex * recordSize ;
468+ cacheBuffer .clear ();
469+ cacheBuffer .limit (toRead * (int ) recordSize );
470+
471+ // Read bytes into cache
472+ ByteBuffer readBuffer = readBytes (offset , toRead * recordSize );
473+ if (readBuffer .remaining () != toRead * recordSize ) {
474+ throw new IllegalStateException ("Incomplete read: expected " + (toRead * recordSize ) + " bytes, got " + readBuffer .remaining ());
475+ }
476+
477+ cacheBuffer .put (readBuffer ); // Copy read bytes to cache
478+ cacheBuffer .flip (); // Prepare for reading
479+ cachePosition = 0 ;
480+ cacheLimit = toRead ;
481+ return true ;
482+ } catch (IOException | InvocationTargetException | InstantiationException | IllegalAccessException e ) {
417483 throw new IllegalStateException (e );
418484 }
419485 }
@@ -424,9 +490,14 @@ public Spliterator<R> trySplit() {
424490 if (remaining <= 1 ) {
425491 return null ;
426492 }
493+
494+ // Split remaining in half
427495 long splitIndex = currentIndex + remaining / 2 ;
428496 Spliterator <R > newSpliterator = createNewSpliterator (currentIndex , splitIndex , recordSize );
429497 currentIndex = splitIndex ;
498+
499+ // Abandon cache after split
500+ cachePosition = cacheLimit = 0 ;
430501 return newSpliterator ;
431502 }
432503
@@ -458,7 +529,6 @@ public int characteristics() {
458529 */
459530 protected abstract Spliterator <R > createNewSpliterator (long start , long end , long recordSize );
460531 }
461-
462532 /**
463533 * Spliterator for streaming vector (1D) elements.
464534 */
0 commit comments