@@ -393,7 +393,7 @@ public abstract class AbstractSpliterator<R> implements Spliterator<R> {
393393 private long currentIndex ;
394394 private final long limit ;
395395 private final long recordSize ;
396- private final int cacheSize = 100_000 ; // Max records to cache
396+ private int cacheSize ; // Max records to cache
397397 private ByteBuffer cacheBuffer ; // Cache for raw bytes
398398 private int cachePosition = 0 ; // Current position in cache (record index)
399399 private int cacheLimit = 0 ; // Number of records in cache
@@ -402,7 +402,25 @@ public AbstractSpliterator(long start, long limit, long recordSize) {
402402 this .currentIndex = start ;
403403 this .limit = limit ;
404404 this .recordSize = recordSize ;
405- this .cacheBuffer = ByteBuffer .allocate ((int ) Math .min (cacheSize * recordSize , Integer .MAX_VALUE ));
405+
406+ // Default cacheSize and maximum buffer size (e.g., 100 MB)
407+ long defaultCacheSize = 100_000 ; // Default: 100,000 records
408+ long maxBufferSize = 100 * 1024 * 1024 ; // 100 MB
409+
410+ // Adjust cacheSize to keep buffer size within maxBufferSize
411+ this .cacheSize = (int ) Math .min (defaultCacheSize , maxBufferSize / recordSize );
412+ if (this .cacheSize <= 0 ) {
413+ throw new IllegalArgumentException ("Record size too large: " + recordSize );
414+ }
415+
416+ // Calculate buffer size
417+ long intendedSize = (long ) this .cacheSize * recordSize ;
418+ int bSize = (int ) Math .min (intendedSize , Integer .MAX_VALUE - 8 ); // Slightly below max
419+ if (bSize <= 0 || intendedSize < 0 ) {
420+ throw new IllegalArgumentException ("Invalid buffer size: " + intendedSize );
421+ }
422+
423+ this .cacheBuffer = ByteBuffer .allocate (bSize );
406424 }
407425
408426 @ Override
@@ -487,17 +505,26 @@ private boolean fillCache() {
487505 @ Override
488506 public Spliterator <R > trySplit () {
489507 long remaining = limit - currentIndex ;
490- if (remaining <= 1 ) {
491- return null ;
508+ if (remaining <= cacheSize * 2 ) {
509+ return null ; // Too few records to split efficiently
492510 }
493511
494- // Split remaining in half
495- long splitIndex = currentIndex + remaining / 2 ;
512+ // Align split with cacheSize boundaries
513+ long recordsPerCache = cacheSize ;
514+ long splitIndex = currentIndex + ((remaining / 2 + recordsPerCache - 1 ) / recordsPerCache ) * recordsPerCache ;
515+
516+ // Ensure splitIndex creates a meaningful split
517+ if (splitIndex >= limit ) {
518+ return null ; // Cannot split meaningfully
519+ }
520+
521+ // Create new Spliterator for [currentIndex, splitIndex)
496522 Spliterator <R > newSpliterator = createNewSpliterator (currentIndex , splitIndex , recordSize );
523+
524+ // Update current Spliterator for [splitIndex, limit)
497525 currentIndex = splitIndex ;
526+ cachePosition = cacheLimit = 0 ; // Reset cache for new range
498527
499- // Abandon cache after split
500- cachePosition = cacheLimit = 0 ;
501528 return newSpliterator ;
502529 }
503530
0 commit comments