From 1558b305c821101b3103fe295475044342eca2eb Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 31 May 2026 21:10:11 +0530 Subject: [PATCH 1/5] Remove the Producer module and all its uses --- benchmark/Streamly/Benchmark/Data/Parser.hs | 4 +- .../Benchmark/Data/Parser/Producer.hs | 67 ---- benchmark/streamly-benchmarks.cabal | 1 - .../Internal/Data/Array/Generic/Type.hs | 7 +- core/src/Streamly/Internal/Data/Array/Type.hs | 11 +- .../Internal/Data/MutArray/Generic.hs | 25 +- .../Streamly/Internal/Data/MutArray/Type.hs | 23 +- core/src/Streamly/Internal/Data/Producer.hs | 81 ----- .../Streamly/Internal/Data/Producer/Source.hs | 314 ------------------ .../Streamly/Internal/Data/Producer/Type.hs | 190 ----------- core/src/Streamly/Internal/Data/StreamK.hs | 65 ---- core/streamly-core.cabal | 4 - test/Streamly/Test/Data/Parser.hs | 39 --- 13 files changed, 17 insertions(+), 814 deletions(-) delete mode 100644 benchmark/Streamly/Benchmark/Data/Parser/Producer.hs delete mode 100644 core/src/Streamly/Internal/Data/Producer.hs delete mode 100644 core/src/Streamly/Internal/Data/Producer/Source.hs delete mode 100644 core/src/Streamly/Internal/Data/Producer/Type.hs diff --git a/benchmark/Streamly/Benchmark/Data/Parser.hs b/benchmark/Streamly/Benchmark/Data/Parser.hs index 1a677c5d82..93ffc29cde 100644 --- a/benchmark/Streamly/Benchmark/Data/Parser.hs +++ b/benchmark/Streamly/Benchmark/Data/Parser.hs @@ -34,7 +34,6 @@ import Streamly.Benchmark.Data.Parser.Alternative as Alternative import Streamly.Benchmark.Data.Parser.Applicative as Applicative import Streamly.Benchmark.Data.Parser.Monad as Monad import Streamly.Benchmark.Data.Parser.Sequence as Sequence -import Streamly.Benchmark.Data.Parser.Producer as Producer import Streamly.Benchmark.Data.Parser.Interleave as Interleave import Streamly.Benchmark.Data.Parser.Groups as Groups @@ -46,13 +45,12 @@ benchmarkList :: -> BenchEnv -> [Array.Array Int] -> [(SpaceComplexity, Benchmark)] -benchmarkList value env arrays = +benchmarkList value env _arrays = Alternative.benchmarks value ++ Applicative.benchmarks value ++ Monad.benchmarks value ++ Sequence.benchmarks value ++ Sequence.benchmarksFileIO env - ++ Producer.benchmarks value arrays ++ Groups.benchmarks value ++ Interleave.benchmarks value diff --git a/benchmark/Streamly/Benchmark/Data/Parser/Producer.hs b/benchmark/Streamly/Benchmark/Data/Parser/Producer.hs deleted file mode 100644 index 739c275003..0000000000 --- a/benchmark/Streamly/Benchmark/Data/Parser/Producer.hs +++ /dev/null @@ -1,67 +0,0 @@ -#undef FUSION_CHECK -#ifdef FUSION_CHECK -{-# OPTIONS_GHC -ddump-simpl -ddump-to-file -dsuppress-all #-} -#endif - --- | --- Module : Streamly.Benchmark.Data.Parser.Producer --- Copyright : (c) 2020 Composewell Technologies --- --- License : BSD-3-Clause --- Maintainer : streamly@composewell.com - -{-# LANGUAGE CPP #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# OPTIONS_GHC -Wno-orphans #-} - -module Streamly.Benchmark.Data.Parser.Producer - ( - benchmarks - ) where - -import Control.DeepSeq (NFData(..)) -import Streamly.Internal.Data.Parser (ParseError(..)) - -import qualified Streamly.Internal.Data.Array as Array -import qualified Streamly.Internal.Data.Fold as Fold -import qualified Streamly.Internal.Data.Parser as PR -import qualified Streamly.Data.Stream as Stream -import qualified Streamly.Internal.Data.Producer as Producer - -import Test.Tasty.Bench hiding (env) -import Streamly.Benchmark.Common - -------------------------------------------------------------------------------- --- Parsing with unfolds -------------------------------------------------------------------------------- - -{-# INLINE parseManyUnfoldArrays #-} -parseManyUnfoldArrays :: Int -> [Array.Array Int] -> IO () -parseManyUnfoldArrays count arrays = do - let src = Producer.source (Just (Producer.OuterLoop arrays)) - let parser = PR.fromFold (Fold.take count Fold.drain) - let readSrc = - Producer.producer - $ Producer.concat Producer.fromList Array.producer - let streamParser = - Producer.simplify (Producer.parseMany parser readSrc) - Stream.fold Fold.drain $ Stream.unfold streamParser src - -------------------------------------------------------------------------------- --- Benchmarks -------------------------------------------------------------------------------- - -instance NFData ParseError where - {-# INLINE rnf #-} - rnf (ParseError x) = rnf x - -benchmarks :: Int -> [Array.Array Int] -> [(SpaceComplexity, Benchmark)] -benchmarks value arrays = - [ - -- parseMany Unfolds - (SpaceO_1, bench "parseMany/Unfold/1000 arrays/take all" - $ nfIO $ parseManyUnfoldArrays value arrays) - , (SpaceO_1, bench "parseMany/Unfold/1000 arrays/take 1" - $ nfIO $ parseManyUnfoldArrays 1 arrays) - ] diff --git a/benchmark/streamly-benchmarks.cabal b/benchmark/streamly-benchmarks.cabal index 04bfb191b3..794591a942 100644 --- a/benchmark/streamly-benchmarks.cabal +++ b/benchmark/streamly-benchmarks.cabal @@ -330,7 +330,6 @@ benchmark Data.Parser , Streamly.Benchmark.Data.Parser.Monad , Streamly.Benchmark.Data.Parser.Interleave , Streamly.Benchmark.Data.Parser.Sequence - , Streamly.Benchmark.Data.Parser.Producer , Streamly.Benchmark.Data.Parser.Groups if impl(ghcjs) buildable: False diff --git a/core/src/Streamly/Internal/Data/Array/Generic/Type.hs b/core/src/Streamly/Internal/Data/Array/Generic/Type.hs index ae6d80d927..dd2f776f15 100644 --- a/core/src/Streamly/Internal/Data/Array/Generic/Type.hs +++ b/core/src/Streamly/Internal/Data/Array/Generic/Type.hs @@ -87,10 +87,10 @@ import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.MutArray.Generic as MArray import qualified Streamly.Internal.Data.Parser.Type as ParserD import qualified Streamly.Internal.Data.ParserK.Type as ParserK -import qualified Streamly.Internal.Data.Producer as Producer import qualified Streamly.Internal.Data.RingArray.Generic as RB import qualified Streamly.Internal.Data.Stream.Type as D import qualified Streamly.Internal.Data.Stream.Generate as D +import qualified Streamly.Internal.Data.Unfold.Type as Unfold import qualified Text.ParserCombinators.ReadPrec as ReadPrec import Prelude hiding (Foldable(..), read) @@ -206,9 +206,8 @@ length arr = arrEnd arr - arrStart arr {-# INLINE_NORMAL reader #-} reader :: Monad m => Unfold m (Array a) a reader = - Producer.simplify - $ Producer.translate unsafeThaw unsafeFreeze - $ MArray.producerWith (return . unsafeInlineIO) + Unfold.lmap unsafeThaw + $ MArray.readerWith (return . unsafeInlineIO) ------------------------------------------------------------------------------- -- Elimination - to streams diff --git a/core/src/Streamly/Internal/Data/Array/Type.hs b/core/src/Streamly/Internal/Data/Array/Type.hs index df6dbc89c2..2f3b2a891c 100644 --- a/core/src/Streamly/Internal/Data/Array/Type.hs +++ b/core/src/Streamly/Internal/Data/Array/Type.hs @@ -119,7 +119,6 @@ module Streamly.Internal.Data.Array.Type , toList -- *** Unfolds - , producer -- experimental , unsafeReader , reader , readerRev @@ -233,7 +232,6 @@ import GHC.ForeignPtr (ForeignPtr(..), ForeignPtrContents(..)) import GHC.IO (unsafePerformIO) import GHC.Ptr (Ptr(..), nullPtr) -import Streamly.Internal.Data.Producer.Type (Producer(..)) import Streamly.Internal.Data.MutArray.Type (MutArray) import Streamly.Internal.Data.MutByteArray.Type (MutByteArray) import Streamly.Internal.Data.Fold.Type (Fold(..)) @@ -254,7 +252,6 @@ import qualified Streamly.Internal.Data.MutArray.Type as MA import qualified Streamly.Internal.Data.Stream.Type as D import qualified Streamly.Internal.Data.StreamK.Type as K import qualified Streamly.Internal.Data.MutByteArray.Type as Unboxed -import qualified Streamly.Internal.Data.Producer as Producer import qualified Streamly.Internal.Data.Scanl.Type as Scanl import qualified Streamly.Internal.Data.Unfold.Type as Unfold import qualified Text.ParserCombinators.ReadPrec as ReadPrec @@ -987,17 +984,11 @@ byteLength = MA.byteLength . unsafeThaw length :: Unbox a => Array a -> Int length arr = MA.length (unsafeThaw arr) -{-# INLINE_NORMAL producer #-} -producer :: forall m a. (Monad m, Unbox a) => Producer m (Array a) a -producer = - Producer.translate unsafeThaw unsafeFreeze - $ MA.producerWith (return . unsafeInlineIO) - -- | Unfold an array into a stream. -- {-# INLINE_NORMAL reader #-} reader :: forall m a. (Monad m, Unbox a) => Unfold m (Array a) a -reader = Producer.simplify producer +reader = Unfold.lmap unsafeThaw $ MA.readerWith (return . unsafeInlineIO) -- | Unfold an array into a stream, does not check the end of the array, the -- user is responsible for terminating the stream within the array bounds. For diff --git a/core/src/Streamly/Internal/Data/MutArray/Generic.hs b/core/src/Streamly/Internal/Data/MutArray/Generic.hs index 5d9f2c2e2e..46d10be398 100644 --- a/core/src/Streamly/Internal/Data/MutArray/Generic.hs +++ b/core/src/Streamly/Internal/Data/MutArray/Generic.hs @@ -91,8 +91,7 @@ module Streamly.Internal.Data.MutArray.Generic -- ** Unfolds , reader -- , readerRev - , producerWith -- experimental - , producer -- experimental + , readerWith -- ** To containers , read @@ -200,13 +199,11 @@ import GHC.Base import GHC.IO (IO(..)) import GHC.Int (Int(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) -import Streamly.Internal.Data.Producer.Type (Producer (..)) import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import Streamly.Internal.Data.Stream.Type (Stream) import Streamly.Internal.Data.SVar.Type (adaptState) import qualified Streamly.Internal.Data.Fold.Type as FL -import qualified Streamly.Internal.Data.Producer as Producer import qualified Streamly.Internal.Data.Stream.Type as D import qualified Streamly.Internal.Data.Stream.Generate as D import qualified Streamly.Internal.Data.Stream.Lift as D @@ -813,21 +810,17 @@ chunksOf n (D.Stream step state) = -- Unfolds ------------------------------------------------------------------------------- --- | Resumable unfold of an array. +-- | Unfold an array into a stream. -- -{-# INLINE_NORMAL producerWith #-} -producerWith :: Monad m => (forall b. IO b -> m b) -> Producer m (MutArray a) a -producerWith liftio = Producer step inject extract +{-# INLINE_NORMAL readerWith #-} +readerWith :: Monad m => (forall b. IO b -> m b) -> Unfold m (MutArray a) a +readerWith liftio = Unfold step inject where {-# INLINE inject #-} inject arr = return (arr, 0) - {-# INLINE extract #-} - extract (arr, i) = - return $ arr {arrStart = arrStart arr + i} - {-# INLINE_LATE step #-} step (arr, i) | i == length arr = return D.Stop @@ -835,17 +828,11 @@ producerWith liftio = Producer step inject extract x <- liftio $ unsafeGetIndex i arr return $ D.Yield x (arr, i + 1) --- | Resumable unfold of an array. --- -{-# INLINE_NORMAL producer #-} -producer :: MonadIO m => Producer m (MutArray a) a -producer = producerWith liftIO - -- | Unfold an array into a stream. -- {-# INLINE_NORMAL reader #-} reader :: MonadIO m => Unfold m (MutArray a) a -reader = Producer.simplify producer +reader = readerWith liftIO -------------------------------------------------------------------------------- -- Appending arrays diff --git a/core/src/Streamly/Internal/Data/MutArray/Type.hs b/core/src/Streamly/Internal/Data/MutArray/Type.hs index fe2e8b18a9..9e01b7621a 100644 --- a/core/src/Streamly/Internal/Data/MutArray/Type.hs +++ b/core/src/Streamly/Internal/Data/MutArray/Type.hs @@ -212,8 +212,7 @@ module Streamly.Internal.Data.MutArray.Type -- *** Unfolds -- experimental - , producerWith - , producer + , readerWith , reader , readerRevWith @@ -541,7 +540,6 @@ import GHC.Exts (byteArrayContents#, unsafeCoerce#) import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.MutByteArray.Type (isPower2, roundUpLargeArray) -import Streamly.Internal.Data.Producer.Type (Producer (..)) import Streamly.Internal.Data.Scanl.Type (Scanl (..)) import Streamly.Internal.Data.Stream.Type (Stream) import Streamly.Internal.Data.Parser.Type (Parser (..)) @@ -555,7 +553,6 @@ import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.MutByteArray.Type as Unboxed import qualified Streamly.Internal.Data.Parser.Type as Parser -- import qualified Streamly.Internal.Data.Fold.Type as Fold -import qualified Streamly.Internal.Data.Producer as Producer import qualified Streamly.Internal.Data.Stream.Type as D import qualified Streamly.Internal.Data.Stream.Lift as D import qualified Streamly.Internal.Data.Stream.Generate as D @@ -2174,11 +2171,11 @@ fromArrayUnsafe :: ArrayUnsafe a -> MutArray a fromArrayUnsafe (ArrayUnsafe contents start end) = MutArray contents start end end -{-# INLINE_NORMAL producerWith #-} -producerWith :: +{-# INLINE_NORMAL readerWith #-} +readerWith :: forall m a. (Monad m, Unbox a) - => (forall b. IO b -> m b) -> Producer m (MutArray a) a -producerWith liftio = Producer step (return . toArrayUnsafe) extract + => (forall b. IO b -> m b) -> Unfold m (MutArray a) a +readerWith liftio = Unfold step (return . toArrayUnsafe) where {-# INLINE_LATE step #-} @@ -2192,19 +2189,11 @@ producerWith liftio = Producer step (return . toArrayUnsafe) extract !x <- liftio $ peekAt cur contents return $ D.Yield x (ArrayUnsafe contents (INDEX_NEXT(cur,a)) end) - extract = return . fromArrayUnsafe - --- | Resumable unfold of an array. --- -{-# INLINE_NORMAL producer #-} -producer :: forall m a. (MonadIO m, Unbox a) => Producer m (MutArray a) a -producer = producerWith liftIO - -- | Unfold an array into a stream. -- {-# INLINE_NORMAL reader #-} reader :: forall m a. (MonadIO m, Unbox a) => Unfold m (MutArray a) a -reader = Producer.simplify producer +reader = readerWith liftIO {-# INLINE_NORMAL readerRevWith #-} readerRevWith :: diff --git a/core/src/Streamly/Internal/Data/Producer.hs b/core/src/Streamly/Internal/Data/Producer.hs deleted file mode 100644 index 87c3f14c34..0000000000 --- a/core/src/Streamly/Internal/Data/Producer.hs +++ /dev/null @@ -1,81 +0,0 @@ --- | --- Module : Streamly.Internal.Data.Producer --- Copyright : (c) 2021 Composewell Technologies --- License : BSD-3-Clause --- Maintainer : streamly@composewell.com --- Stability : experimental --- Portability : GHC --- --- A 'Producer' is an 'Unfold' with an 'extract' function added to extract --- the state. It is more powerful but less general than an Unfold. --- --- A 'Producer' represents steps of a loop generating a sequence of elements. --- While unfolds are closed representation of imperative loops with some opaque --- internal state, producers are open loops with the state being accessible to --- the user. --- --- Unlike an unfold, which runs a loop till completion, a producer can be --- stopped in the middle, its state can be extracted, examined, changed, and --- then it can be resumed later from the stopped state. --- --- A producer can be used in places where a CPS stream would otherwise be --- needed, because the state of the loop can be passed around. However, it can --- be much more efficient than CPS because it allows stream fusion and --- unecessary function calls can be avoided. - -module Streamly.Internal.Data.Producer - ( - module Streamly.Internal.Data.Producer.Source - , module Streamly.Internal.Data.Producer.Type - - -- * Converting - , simplify - , fromStreamD - ) -where - -#include "inline.hs" - -import Streamly.Internal.Data.Stream.Step (Step(..)) -import Streamly.Internal.Data.Stream.Type (Stream(..)) -import Streamly.Internal.Data.SVar.Type (defState) -import Streamly.Internal.Data.Unfold.Type (Unfold(..)) - -import Streamly.Internal.Data.Producer.Source -import Streamly.Internal.Data.Producer.Type -import Prelude hiding (concat) - --- XXX We should write unfolds as producers where possible and define --- unfolds using "simplify". --- -------------------------------------------------------------------------------- --- Converting to unfolds -------------------------------------------------------------------------------- - --- | Simplify a producer to an unfold. --- --- /Pre-release/ -{-# INLINE simplify #-} -simplify :: Producer m a b -> Unfold m a b -simplify (Producer step inject _) = Unfold step inject - -------------------------------------------------------------------------------- --- Unfolds -------------------------------------------------------------------------------- - --- | Convert a StreamD stream into a producer. --- --- /Pre-release/ -{-# INLINE_NORMAL fromStreamD #-} -fromStreamD :: Monad m => Producer m (Stream m a) a -fromStreamD = Producer step return return - - where - - {-# INLINE_LATE step #-} - step (UnStream step1 state1) = do - r <- step1 defState state1 - return $ case r of - Yield x s -> Yield x (Stream step1 s) - Skip s -> Skip (Stream step1 s) - Stop -> Stop diff --git a/core/src/Streamly/Internal/Data/Producer/Source.hs b/core/src/Streamly/Internal/Data/Producer/Source.hs deleted file mode 100644 index 0f05560f4b..0000000000 --- a/core/src/Streamly/Internal/Data/Producer/Source.hs +++ /dev/null @@ -1,314 +0,0 @@ --- | --- Module : Streamly.Internal.Data.Producer.Source --- Copyright : (c) 2021 Composewell Technologies --- License : BSD-3-Clause --- Maintainer : streamly@composewell.com --- Stability : experimental --- Portability : GHC --- --- A 'Source' is a seed that can be unfolded to a stream with a buffer. Allows --- to 'unread' data i.e. push unused data back to the source buffer. This is --- useful in parsing applications with backtracking. --- - -module Streamly.Internal.Data.Producer.Source - ( Source - - -- * Creation - , source - - -- * Transformation - , unread - - -- * Consumption - , isEmpty - , producer - - -- * Parsing - , parse - , parseMany - , parseManyD - ) -where - -#include "inline.hs" - -import Control.Exception (assert) -import GHC.Exts (SpecConstrAnnotation(..)) -import GHC.Types (SPEC(..)) -import Streamly.Internal.Data.Parser - (ParseError(..), ParseErrorPos(..), Step(..), Final(..)) -import Streamly.Internal.Data.Producer.Type (Producer(..)) -import Streamly.Internal.Data.Stream.Step (Step(..)) - -import qualified Streamly.Internal.Data.Parser as ParserD --- import qualified Streamly.Internal.Data.Parser.ParserK.Type as ParserK - -import Prelude hiding (read) - --- | A seed with a buffer. It allows us to 'unread' or return some data --- after reading it. Useful in backtracked parsing. --- -data Source a b = Source [b] (Maybe a) - --- | Make a source from a seed value. The buffer would start as empty. --- --- /Pre-release/ -source :: Maybe a -> Source a b -source = Source [] - --- | Return some unused data back to the source. The data is prepended (or --- consed) to the source. --- --- /Pre-release/ -unread :: [b] -> Source a b -> Source a b -unread xs (Source ys seed) = Source (xs ++ ys) seed - --- | Determine if the source is empty. -isEmpty :: Source a b -> Bool -isEmpty (Source [] Nothing) = True -isEmpty _ = False - --- | Convert a producer to a producer from a buffered source. Any buffered data --- is read first and then the seed is unfolded. --- --- /Pre-release/ -{-# INLINE_NORMAL producer #-} -producer :: Monad m => Producer m a b -> Producer m (Source a b) b -producer (Producer step1 inject1 extract1) = Producer step inject extract - - where - - inject (Source [] (Just a)) = do - s <- inject1 a - return $ Left s - inject (Source xs a) = return $ Right (xs, a) - - {-# INLINE_LATE step #-} - step (Left s) = do - r <- step1 s - return $ case r of - Yield x s1 -> Yield x (Left s1) - Skip s1 -> Skip (Left s1) - Stop -> Stop - step (Right ([], Nothing)) = return Stop - step (Right ([], Just _)) = error "Bug: unreachable" - step (Right (x:[], Just a)) = do - s <- inject1 a - return $ Yield x (Left s) - step (Right (x:xs, a)) = return $ Yield x (Right (xs, a)) - - extract (Left s) = Source [] . Just <$> extract1 s - extract (Right (xs, a)) = return $ Source xs a - -------------------------------------------------------------------------------- --- Parsing -------------------------------------------------------------------------------- - --- GHC parser does not accept {-# ANN type [] NoSpecConstr #-}, so we need --- to make a newtype. -{-# ANN type List NoSpecConstr #-} -newtype List a = List {getList :: [a]} - -{-# INLINE_NORMAL parse #-} -parse - :: Monad m => - ParserD.Parser a m b - -> Producer m (Source s a) a - -> Source s a - -> m (Either ParseErrorPos b, Source s a) -parse - (ParserD.Parser pstep initial extract) - (Producer ustep uinject uextract) - seed = do - - res <- initial - case res of - ParserD.IPartial s -> do - state <- uinject seed - go SPEC state (List []) s 0 - ParserD.IDone b -> return (Right b, seed) - ParserD.IError err -> return (Left (ParseErrorPos 0 err), seed) - - where - - -- XXX currently we are using a dumb list based approach for backtracking - -- buffer. This can be replaced by a sliding/ring buffer using Data.Array. - -- That will allow us more efficient random back and forth movement. - go !_ st buf !pst i = do - r <- ustep st - case r of - Yield x s -> do - pRes <- pstep pst x - case pRes of - SPartial 1 pst1 -> go SPEC s (List []) pst1 i - SPartial m pst1 -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let src0 = Prelude.take n (x:getList buf) - src = Prelude.reverse src0 - gobuf SPEC s (List []) (List src) pst1 (i + 1 - n) - SContinue 1 pst1 -> go SPEC s (List (x:getList buf)) pst1 (i + 1) - SContinue m pst1 -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let (src0, buf1) = splitAt n (x:getList buf) - src = Prelude.reverse src0 - gobuf SPEC s (List buf1) (List src) pst1 (i + 1 - n) - SDone m b -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let src0 = Prelude.take n (x:getList buf) - src = Prelude.reverse src0 - s1 <- uextract s - return (Right b, unread src s1) - SError err -> do - s1 <- uextract s - let src = Prelude.reverse (getList buf) - return - ( Left (ParseErrorPos (i + 1) err) - , unread (src ++ [x]) s1 - ) - Skip s -> go SPEC s buf pst i - Stop -> goStop buf pst i - - gobuf !_ s buf (List []) !pst i = go SPEC s buf pst i - gobuf !_ s buf (List (x:xs)) !pst i = do - pRes <- pstep pst x - case pRes of - SPartial 1 pst1 -> - gobuf SPEC s (List []) (List xs) pst1 (i + 1) - SPartial m pst1 -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let src0 = Prelude.take n (x:getList buf) - src = Prelude.reverse src0 ++ xs - gobuf SPEC s (List []) (List src) pst1 (i + 1 - n) - SContinue 1 pst1 -> - gobuf SPEC s (List (x:getList buf)) (List xs) pst1 (i + 1) - SContinue m pst1 -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let (src0, buf1) = splitAt n (x:getList buf) - src = Prelude.reverse src0 ++ xs - gobuf SPEC s (List buf1) (List src) pst1 (i + 1 - n) - SDone m b -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let src0 = Prelude.take n (x:getList buf) - src = Prelude.reverse src0 - s1 <- uextract s - return (Right b, unread src s1) - SError err -> do - s1 <- uextract s - let src = Prelude.reverse (getList buf) - return - ( Left (ParseErrorPos (i + 1) err) - , unread (src ++ (x:xs)) s1 - ) - - -- This is a simplified gobuf - goExtract !_ buf (List []) !pst i = goStop buf pst i - goExtract !_ buf (List (x:xs)) !pst i = do - pRes <- pstep pst x - case pRes of - SPartial 1 pst1 -> - goExtract SPEC (List []) (List xs) pst1 (i + 1) - SPartial m pst1 -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let src0 = Prelude.take n (x:getList buf) - src = Prelude.reverse src0 ++ xs - goExtract SPEC (List []) (List src) pst1 (i + 1 - n) - SContinue 1 pst1 -> - goExtract SPEC (List (x:getList buf)) (List xs) pst1 (i + 1) - SContinue m pst1 -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let (src0, buf1) = splitAt n (x:getList buf) - src = Prelude.reverse src0 ++ xs - goExtract SPEC (List buf1) (List src) pst1 (i + 1 - n) - SDone m b -> do - let n = 1 - m - assert (n <= length (x:getList buf)) (return ()) - let src0 = Prelude.take n (x:getList buf) - src = Prelude.reverse src0 - return (Right b, unread src (source Nothing)) - SError err -> do - let src = Prelude.reverse (getList buf) - return - ( Left (ParseErrorPos (i + 1) err) - , unread (src ++ (x:xs)) (source Nothing) - ) - - -- This is a simplified goExtract - {-# INLINE goStop #-} - goStop buf pst i = do - pRes <- extract pst - case pRes of - FContinue 0 pst1 -> - goStop buf pst1 i - FContinue m pst1 -> do - let n = (- m) - assert (n <= length (getList buf)) (return ()) - let (src0, buf1) = splitAt n (getList buf) - src = Prelude.reverse src0 - goExtract SPEC (List buf1) (List src) pst1 (i - n) - FDone 0 b -> return (Right b, source Nothing) - FDone m b -> do - let n = (- m) - assert (n <= length (getList buf)) (return ()) - let src0 = Prelude.take n (getList buf) - src = Prelude.reverse src0 - return (Right b, unread src (source Nothing)) - FError err -> do - let src = Prelude.reverse (getList buf) - return (Left (ParseErrorPos i err), unread src (source Nothing)) - -{- --- | Parse a buffered source using a parser, returning the parsed value and the --- remaining source. --- --- /Pre-release/ -{-# INLINE [3] parseK #-} -parseK :: Monad m => - ParserK.Parser a m b - -> Producer m (Source s a) a - -> Source s a - -> m (Either ParseError b, Source s a) -parseK = parse . ParserK.toParser --} - -------------------------------------------------------------------------------- --- Nested parsing -------------------------------------------------------------------------------- - -{-# INLINE parseManyD #-} -parseManyD :: Monad m => - ParserD.Parser a m b - -> Producer m (Source x a) a - -> Producer m (Source x a) (Either ParseError b) -parseManyD parser reader = Producer step return return - - where - - {-# INLINE_LATE step #-} - step src = do - if isEmpty src - then return Stop - else do - (b, s1) <- parse parser reader src - case b of - Right b1 -> return $ Yield (Right b1) s1 - Left _ -> return Stop - --- | Apply a parser repeatedly on a buffered source producer to generate a --- producer of parsed values. --- --- /Pre-release/ -{-# INLINE parseMany #-} -parseMany :: Monad m => - ParserD.Parser a m b - -> Producer m (Source x a) a - -> Producer m (Source x a) (Either ParseError b) -parseMany = parseManyD diff --git a/core/src/Streamly/Internal/Data/Producer/Type.hs b/core/src/Streamly/Internal/Data/Producer/Type.hs deleted file mode 100644 index 3fdc3c9c0a..0000000000 --- a/core/src/Streamly/Internal/Data/Producer/Type.hs +++ /dev/null @@ -1,190 +0,0 @@ --- | --- Module : Streamly.Internal.Data.Producer.Type --- Copyright : (c) 2021 Composewell Technologies --- License : BSD-3-Clause --- Maintainer : streamly@composewell.com --- Stability : experimental --- Portability : GHC --- --- See "Streamly.Internal.Data.Producer" for introduction. --- - -module Streamly.Internal.Data.Producer.Type - ( - -- * Type - Producer (..) - - -- * Producers - , nil - , nilM - , unfoldrM - , fromList - - -- * Mapping - , translate - , lmap - - -- * Nesting - , NestedLoop (..) - , concat - ) -where - -#include "inline.hs" - -import Fusion.Plugin.Types (Fuse(..)) -import Streamly.Internal.Data.Stream.Step (Step(..)) -import Prelude hiding (concat, map) - ------------------------------------------------------------------------------- --- Type ------------------------------------------------------------------------------- - --- Note that this type cannot be made a Functor on the seed/result type because --- that requires bi-directional mapping between the two types, see translate --- and lmap below. - --- | A @Producer m a b@ is a generator of a stream of values of type @b@ from a --- seed of type 'a' in 'Monad' @m@. --- --- /Pre-release/ - -data Producer m a b = - -- | @Producer step inject extract@ - forall s. Producer (s -> m (Step s b)) (a -> m s) (s -> m a) - ------------------------------------------------------------------------------- --- Producers ------------------------------------------------------------------------------- - -{-# INLINE nilM #-} -nilM :: Monad m => (a -> m c) -> Producer m a b -nilM f = Producer step return return - - where - - {-# INLINE_LATE step #-} - step x = f x >> return Stop - -{-# INLINE nil #-} -nil :: Monad m => Producer m a b -nil = nilM (\_ -> return ()) - -{-# INLINE unfoldrM #-} -unfoldrM :: Monad m => (a -> m (Maybe (b, a))) -> Producer m a b -unfoldrM next = Producer step return return - - where - - {-# INLINE_LATE step #-} - step st = do - r <- next st - return $ case r of - Just (x, s) -> Yield x s - Nothing -> Stop - --- | Convert a list of pure values to a 'Stream' --- --- /Pre-release/ -{-# INLINE_LATE fromList #-} -fromList :: Monad m => Producer m [a] a -fromList = Producer step return return - - where - - {-# INLINE_LATE step #-} - step (x:xs) = return $ Yield x xs - step [] = return Stop - ------------------------------------------------------------------------------- --- Mapping ------------------------------------------------------------------------------- - --- | Interconvert the producer between two interconvertible input types. --- --- /Pre-release/ -{-# INLINE_NORMAL translate #-} -translate :: Functor m => - (a -> c) -> (c -> a) -> Producer m c b -> Producer m a b -translate f g (Producer step inject extract) = - Producer step (inject . f) (fmap g . extract) - --- | Map the producer input to another value of the same type. --- --- /Pre-release/ -{-# INLINE_NORMAL lmap #-} -lmap :: (a -> a) -> Producer m a b -> Producer m a b -lmap f (Producer step inject extract) = Producer step (inject . f) extract - ------------------------------------------------------------------------------- --- Functor ------------------------------------------------------------------------------- - --- | Map a function on the output of the producer (the type @b@). --- --- /Pre-release/ -{-# INLINE_NORMAL map #-} -map :: Functor m => (b -> c) -> Producer m a b -> Producer m a c -map f (Producer ustep uinject uextract) = Producer step uinject uextract - - where - - {-# INLINE_LATE step #-} - step st = fmap (fmap f) (ustep st) - --- | Maps a function on the output of the producer (the type @b@). -instance Functor m => Functor (Producer m a) where - {-# INLINE fmap #-} - fmap = map - ------------------------------------------------------------------------------- --- Nesting ------------------------------------------------------------------------------- - --- | State representing a nested loop. -{-# ANN type NestedLoop Fuse #-} -data NestedLoop s1 s2 = OuterLoop s1 | InnerLoop s1 s2 - --- | Apply the second unfold to each output element of the first unfold and --- flatten the output in a single stream. --- --- /Pre-release/ --- -{-# INLINE_NORMAL concat #-} -concat :: Monad m => - Producer m a b -> Producer m b c -> Producer m (NestedLoop a b) c -concat (Producer step1 inject1 extract1) (Producer step2 inject2 extract2) = - Producer step inject extract - - where - - inject (OuterLoop x) = do - s <- inject1 x - return $ OuterLoop s - inject (InnerLoop x y) = do - s1 <- inject1 x - s2 <- inject2 y - return $ InnerLoop s1 s2 - - {-# INLINE_LATE step #-} - step (OuterLoop st) = do - r <- step1 st - case r of - Yield x s -> do - innerSt <- inject2 x - return $ Skip (InnerLoop s innerSt) - Skip s -> return $ Skip (OuterLoop s) - Stop -> return Stop - - step (InnerLoop ost ist) = do - r <- step2 ist - return $ case r of - Yield x s -> Yield x (InnerLoop ost s) - Skip s -> Skip (InnerLoop ost s) - Stop -> Skip (OuterLoop ost) - - extract (OuterLoop s1) = OuterLoop <$> extract1 s1 - extract (InnerLoop s1 s2) = do - r1 <- extract1 s1 - r2 <- extract2 s2 - return (InnerLoop r1 r2) diff --git a/core/src/Streamly/Internal/Data/StreamK.hs b/core/src/Streamly/Internal/Data/StreamK.hs index a3baf87285..a84fcc496e 100644 --- a/core/src/Streamly/Internal/Data/StreamK.hs +++ b/core/src/Streamly/Internal/Data/StreamK.hs @@ -39,7 +39,6 @@ module Streamly.Internal.Data.StreamK , fold , foldBreak , foldEither - , foldConcat , ParserK.toParserK -- XXX move the code to this module , parseDBreak , parseD @@ -142,12 +141,10 @@ import Control.Monad (void, join) import Control.Monad.Catch (MonadCatch) import Control.Monad.IO.Class (MonadIO(..)) import Data.Ord (comparing) -import GHC.Types (SPEC(..)) import Streamly.Internal.Data.Array.Type (Array(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer) import Streamly.Internal.Data.ParserK.Type (ParserK) -import Streamly.Internal.Data.Producer.Type (Producer(..)) import Streamly.Internal.Data.SVar.Type (adaptState, defState) import Streamly.Internal.Data.Unbox (Unbox) @@ -396,68 +393,6 @@ foldBreak fld strm = do b <- final s return (b, nil) --- XXX Array folds can be implemented using this. --- foldContainers? Specialized to foldArrays. - --- | Generate streams from individual elements of a stream and fold the --- concatenation of those streams using the supplied fold. Return the result of --- the fold and residual stream. --- --- For example, this can be used to efficiently fold an Array Word8 stream --- using Word8 folds. --- --- /Internal/ -{-# INLINE foldConcat #-} -foldConcat :: Monad m => - Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a) -foldConcat - (Producer pstep pinject pextract) - (Fold fstep begin _ final) - stream = do - - res <- begin - case res of - FL.Partial fs -> go fs stream - FL.Done fb -> return (fb, stream) - - where - - go !acc m1 = do - let stop = do - r <- final acc - return (r, nil) - single a = do - st <- pinject a - res <- go1 SPEC acc st - case res of - Left fs -> do - r <- final fs - return (r, nil) - Right (b, s) -> do - x <- pextract s - return (b, fromPure x) - yieldk a r = do - st <- pinject a - res <- go1 SPEC acc st - case res of - Left fs -> go fs r - Right (b, s) -> do - x <- pextract s - return (b, x `cons` r) - in foldStream defState yieldk single stop m1 - - {-# INLINE go1 #-} - go1 !_ !fs st = do - r <- pstep st - case r of - Stream.Yield x s -> do - res <- fstep fs x - case res of - FL.Done b -> return $ Right (b, s) - FL.Partial fs1 -> go1 SPEC fs1 s - Stream.Skip s -> go1 SPEC fs s - Stream.Stop -> return $ Left fs - ------------------------------------------------------------------------------ -- Specialized folds ------------------------------------------------------------------------------ diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index 29fa3ae976..cc1833b768 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -349,7 +349,6 @@ library -- streamly-core-stream-types , Streamly.Internal.Data.SVar.Type , Streamly.Internal.Data.Refold.Type - , Streamly.Internal.Data.Producer -- streamly-core-array-types , Streamly.Internal.Data.MutByteArray @@ -542,9 +541,6 @@ library , Streamly.Internal.Data.Serialize.TH.Common , Streamly.Internal.Data.Serialize.TH.Bottom - , Streamly.Internal.Data.Producer.Type - , Streamly.Internal.Data.Producer.Source - , Streamly.Internal.Data.Time.Clock.Type , Streamly.Internal.FileSystem.Path.Common , Streamly.Internal.FileSystem.DirOptions diff --git a/test/Streamly/Test/Data/Parser.hs b/test/Streamly/Test/Data/Parser.hs index 65edbe9f5d..2d1ed9c614 100644 --- a/test/Streamly/Test/Data/Parser.hs +++ b/test/Streamly/Test/Data/Parser.hs @@ -26,8 +26,6 @@ import qualified Streamly.Internal.Data.Array as A import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Parser as P import qualified Streamly.Internal.Data.ParserK as PK -import qualified Streamly.Internal.Data.Producer as Producer -import qualified Streamly.Internal.Data.Unfold as Unfold import qualified Streamly.Internal.Data.Stream as SI import qualified Streamly.Internal.Data.StreamK as K import qualified Test.Hspec as H @@ -122,35 +120,6 @@ parseMany = listEquals (==) outs ins --- basic sanity test for parsing from arrays -parseUnfold :: Property -parseUnfold = do - let len = 200 - -- ls = input list (stream) - -- clen = chunk size - -- tlen = parser take size - forAll - ((,,) - <$> vectorOf len (chooseAny :: Gen Int) - <*> chooseInt (1, len) - <*> chooseInt (1, len)) $ \(ls, clen, tlen) -> - monadicIO $ do - arrays <- S.toList $ A.chunksOf clen (S.fromList ls) - let src = Producer.source (Just (Producer.OuterLoop arrays)) - let parser = P.fromFold (FL.take tlen FL.toList) - let readSrc = - Producer.producer - $ Producer.concat Producer.fromList A.producer - let streamParser = - Producer.simplify (Producer.parseManyD parser readSrc) - xs <- run - $ S.toList - $ S.unfoldEach Unfold.fromList - $ S.catRights - $ S.unfold streamParser src - - listEquals (==) xs ls - parserSequence :: Property parserSequence = forAll (vectorOf 11 (listOf (chooseAny :: Gen Int))) $ \ins -> @@ -326,13 +295,6 @@ quotedWordTest inp expected = do -- Parser sanity tests -------------------------------------------------------------------------------- -{- -TODO: -Add sanity tests for -- Producer.parse -- Producer.parseMany --} - sanityParseBreak :: [Move] -> SpecWith () sanityParseBreak jumps = it (show jumps) $ do (val, rest) <- SI.parseBreakPos (jumpParser jumps) $ S.fromList tape @@ -448,7 +410,6 @@ main = do describe "Stream parsing" $ do prop "parseMany" parseMany prop "parseMany2Events" parseMany2Events - prop "parseUnfold" parseUnfold prop "parserSequence" parserSequence describe "test for sequence parser" $ do From b2b94382dbfea394577a4f2e73a9ba201038ef1b Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 27 May 2026 12:34:32 +0530 Subject: [PATCH 2/5] Add a Transition module shared by Stream and Unfold --- .../Internal/Data/Stream/Transform.hs | 8 ++++- core/src/Streamly/Internal/Data/Transition.hs | 35 +++++++++++++++++++ core/src/Streamly/Internal/Data/Unfold.hs | 16 ++------- core/streamly-core.cabal | 2 ++ 4 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 core/src/Streamly/Internal/Data/Transition.hs diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 031030e0f0..b34ca17e9c 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -226,6 +226,7 @@ import qualified Streamly.Internal.Data.Array.Type as A import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.Pipe.Type as Pipe import qualified Streamly.Internal.Data.StreamK.Type as K +import qualified Streamly.Internal.Data.Transition as Transition import Prelude hiding ( drop, dropWhile, filter, map, mapM, reverse @@ -2166,7 +2167,12 @@ mapMaybe f = fmap fromJust . filter isJust . map f -- {-# INLINE_NORMAL mapMaybeM #-} mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b -mapMaybeM f = fmap fromJust . filter isJust . mapM f +mapMaybeM f (Stream step1 state1) = Stream step state1 + + where + + {-# INLINE_LATE step #-} + step gst = Transition.mapMaybeM f (step1 (adaptState gst)) -- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's. -- diff --git a/core/src/Streamly/Internal/Data/Transition.hs b/core/src/Streamly/Internal/Data/Transition.hs new file mode 100644 index 0000000000..fc9a9ff56b --- /dev/null +++ b/core/src/Streamly/Internal/Data/Transition.hs @@ -0,0 +1,35 @@ +-- | +-- Module : Streamly.Internal.Data.Transition +-- Copyright : (c) 2026 Composewell Technologies +-- License : BSD-3-Clause +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- +-- Combinators on stream transition functions of type +-- @s -> m (Step s a)@. These are shared by the @Stream@ and @Unfold@ +-- step functions. + +module Streamly.Internal.Data.Transition + ( + mapMaybeM + ) +where + +#include "inline.hs" + +import Streamly.Internal.Data.Stream.Step (Step(..)) + +{-# INLINE_LATE mapMaybeM #-} +mapMaybeM :: Monad m + => (b -> m (Maybe c)) -> (s -> m (Step s b)) -> (s -> m (Step s c)) +mapMaybeM f step1 st = do + r <- step1 st + case r of + Yield x s -> do + b <- f x + return $ case b of + Just c -> Yield c s + Nothing -> Skip s + Skip s -> return (Skip s) + Stop -> return Stop diff --git a/core/src/Streamly/Internal/Data/Unfold.hs b/core/src/Streamly/Internal/Data/Unfold.hs index 4ece9d0aa1..524e480924 100644 --- a/core/src/Streamly/Internal/Data/Unfold.hs +++ b/core/src/Streamly/Internal/Data/Unfold.hs @@ -142,6 +142,7 @@ import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.Scanl.Type as Scanl import qualified Streamly.Internal.Data.Stream.Type as D import qualified Streamly.Internal.Data.StreamK.Type as K +import qualified Streamly.Internal.Data.Transition as Transition import qualified Prelude import Streamly.Internal.Data.Unfold.Enumeration @@ -751,19 +752,8 @@ dropWhile f = dropWhileM (return . f) -- {-# INLINE_NORMAL mapMaybeM #-} mapMaybeM :: Monad m => (b -> m (Maybe c)) -> Unfold m a b -> Unfold m a c -mapMaybeM f (Unfold step1 inject1) = Unfold step inject1 - where - {-# INLINE_LATE step #-} - step st = do - r <- step1 st - case r of - Yield x s -> do - b <- f x - return $ case b of - Just c -> Yield c s - Nothing -> Skip s - Skip s -> return $ Skip s - Stop -> return Stop +mapMaybeM f (Unfold step1 inject1) = + Unfold (Transition.mapMaybeM f step1) inject1 -- | Map a 'Maybe' returning function on the output of the unfold, filter out -- the 'Nothing' elements, and return an unfold yielding the values extracted diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index cc1833b768..51e4750e09 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -519,6 +519,8 @@ library , Streamly.Internal.Data.Stream.Transformer , Streamly.Internal.Data.Stream.Type + , Streamly.Internal.Data.Transition + , Streamly.Internal.Data.StreamK.Type , Streamly.Internal.Data.StreamK.Transformer From 91da7ad8c3e89653a2446781dcf482527a445ed1 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 27 May 2026 12:40:18 +0530 Subject: [PATCH 3/5] Add a Transition type in the Transition module --- core/src/Streamly/Internal/Data/Transition.hs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/Streamly/Internal/Data/Transition.hs b/core/src/Streamly/Internal/Data/Transition.hs index fc9a9ff56b..75ae46a84e 100644 --- a/core/src/Streamly/Internal/Data/Transition.hs +++ b/core/src/Streamly/Internal/Data/Transition.hs @@ -20,9 +20,14 @@ where import Streamly.Internal.Data.Stream.Step (Step(..)) +-- | A stream transition: given the current state, produce the next 'Step'. +-- The state type @a@ is also the type carried inside 'Step', so a 'Yield' +-- delivers a new value alongside the updated state. +type Transition m a b = a -> m (Step a b) + {-# INLINE_LATE mapMaybeM #-} mapMaybeM :: Monad m - => (b -> m (Maybe c)) -> (s -> m (Step s b)) -> (s -> m (Step s c)) + => (b -> m (Maybe c)) -> Transition m s b -> Transition m s c mapMaybeM f step1 st = do r <- step1 st case r of From 713f2dbdee106332507619f999c3c7e879d125f3 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 27 May 2026 13:23:56 +0530 Subject: [PATCH 4/5] Move takeWhileM to Transition module --- core/src/Streamly/Internal/Data/Stream/Type.hs | 18 +++++++----------- core/src/Streamly/Internal/Data/Transition.hs | 13 +++++++++++++ core/src/Streamly/Internal/Data/Unfold/Type.hs | 15 ++++----------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index ee2f054baa..2d4b1780f8 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -231,6 +231,7 @@ import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import qualified Streamly.Internal.Data.Fold.Type as FL hiding (foldr) import qualified Streamly.Internal.Data.StreamK.Type as K +import qualified Streamly.Internal.Data.Transition as Transition import qualified Streamly.Internal.Data.Unfold.Type as Unfold #include "DocTestDataStream.hs" @@ -1162,17 +1163,12 @@ take n (Stream step state) = n `seq` Stream step' (state, 0) {-# INLINE_NORMAL takeWhileM #-} takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a -- takeWhileM p = scanMaybe (FL.takingEndByM_ (\x -> not <$> p x)) -takeWhileM f (Stream step state) = Stream step' state - where - {-# INLINE_LATE step' #-} - step' gst st = do - r <- step gst st - case r of - Yield x s -> do - b <- f x - return $ if b then Yield x s else Stop - Skip s -> return $ Skip s - Stop -> return Stop +takeWhileM f (Stream step1 state1) = Stream step state1 + + where + + {-# INLINE_LATE step #-} + step gst = Transition.takeWhileM f (step1 gst) -- | End the stream as soon as the predicate fails on an element. -- diff --git a/core/src/Streamly/Internal/Data/Transition.hs b/core/src/Streamly/Internal/Data/Transition.hs index 75ae46a84e..4e495b914f 100644 --- a/core/src/Streamly/Internal/Data/Transition.hs +++ b/core/src/Streamly/Internal/Data/Transition.hs @@ -13,6 +13,7 @@ module Streamly.Internal.Data.Transition ( mapMaybeM + , takeWhileM ) where @@ -38,3 +39,15 @@ mapMaybeM f step1 st = do Nothing -> Skip s Skip s -> return (Skip s) Stop -> return Stop + +{-# INLINE_LATE takeWhileM #-} +takeWhileM :: Monad m + => (b -> m Bool) -> Transition m s b -> Transition m s b +takeWhileM f step1 st = do + r <- step1 st + case r of + Yield x s -> do + b <- f x + return $ if b then Yield x s else Stop + Skip s -> return (Skip s) + Stop -> return Stop diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index d1d16a0aca..ed23ae1ad3 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -138,6 +138,8 @@ import Data.Void (Void) import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Data.Stream.Step (Step(..)) +import qualified Streamly.Internal.Data.Transition as Transition + import Prelude hiding (id, (.), map, mapM, concatMap, zipWith, takeWhile) #include "DocTestDataUnfold.hs" @@ -405,17 +407,8 @@ takeWhileM :: Monad m => (b -> m Bool) -> Unfold m a b -> Unfold m a b -- implementation below (the Tuple' should help eliminate the unused param): -- -- takeWhileM f = takeWhileMWithInput (\_ b -> f b) -takeWhileM f (Unfold step1 inject1) = Unfold step inject1 - where - {-# INLINE_LATE step #-} - step st = do - r <- step1 st - case r of - Yield x s -> do - b <- f x - return $ if b then Yield x s else Stop - Skip s -> return $ Skip s - Stop -> return Stop +takeWhileM f (Unfold step1 inject1) = + Unfold (Transition.takeWhileM f step1) inject1 -- | End the stream generated by the 'Unfold' as soon as the predicate fails -- on an element. From 4651f47bd2b465e9ae77db730e4c8236c0e34844 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 31 May 2026 21:19:56 +0530 Subject: [PATCH 5/5] Rename Transition to Producer --- .../Internal/Data/{Transition.hs => Producer.hs} | 10 +++++----- core/src/Streamly/Internal/Data/Stream/Transform.hs | 4 ++-- core/src/Streamly/Internal/Data/Stream/Type.hs | 4 ++-- core/src/Streamly/Internal/Data/Unfold.hs | 4 ++-- core/src/Streamly/Internal/Data/Unfold/Type.hs | 4 ++-- core/streamly-core.cabal | 2 +- 6 files changed, 14 insertions(+), 14 deletions(-) rename core/src/Streamly/Internal/Data/{Transition.hs => Producer.hs} (82%) diff --git a/core/src/Streamly/Internal/Data/Transition.hs b/core/src/Streamly/Internal/Data/Producer.hs similarity index 82% rename from core/src/Streamly/Internal/Data/Transition.hs rename to core/src/Streamly/Internal/Data/Producer.hs index 4e495b914f..a0ecfb9f25 100644 --- a/core/src/Streamly/Internal/Data/Transition.hs +++ b/core/src/Streamly/Internal/Data/Producer.hs @@ -1,5 +1,5 @@ -- | --- Module : Streamly.Internal.Data.Transition +-- Module : Streamly.Internal.Data.Producer -- Copyright : (c) 2026 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com @@ -10,7 +10,7 @@ -- @s -> m (Step s a)@. These are shared by the @Stream@ and @Unfold@ -- step functions. -module Streamly.Internal.Data.Transition +module Streamly.Internal.Data.Producer ( mapMaybeM , takeWhileM @@ -24,11 +24,11 @@ import Streamly.Internal.Data.Stream.Step (Step(..)) -- | A stream transition: given the current state, produce the next 'Step'. -- The state type @a@ is also the type carried inside 'Step', so a 'Yield' -- delivers a new value alongside the updated state. -type Transition m a b = a -> m (Step a b) +type Producer m a b = a -> m (Step a b) {-# INLINE_LATE mapMaybeM #-} mapMaybeM :: Monad m - => (b -> m (Maybe c)) -> Transition m s b -> Transition m s c + => (b -> m (Maybe c)) -> Producer m s b -> Producer m s c mapMaybeM f step1 st = do r <- step1 st case r of @@ -42,7 +42,7 @@ mapMaybeM f step1 st = do {-# INLINE_LATE takeWhileM #-} takeWhileM :: Monad m - => (b -> m Bool) -> Transition m s b -> Transition m s b + => (b -> m Bool) -> Producer m s b -> Producer m s b takeWhileM f step1 st = do r <- step1 st case r of diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index b34ca17e9c..04d1696f79 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -226,7 +226,7 @@ import qualified Streamly.Internal.Data.Array.Type as A import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.Pipe.Type as Pipe import qualified Streamly.Internal.Data.StreamK.Type as K -import qualified Streamly.Internal.Data.Transition as Transition +import qualified Streamly.Internal.Data.Producer as Producer import Prelude hiding ( drop, dropWhile, filter, map, mapM, reverse @@ -2172,7 +2172,7 @@ mapMaybeM f (Stream step1 state1) = Stream step state1 where {-# INLINE_LATE step #-} - step gst = Transition.mapMaybeM f (step1 (adaptState gst)) + step gst = Producer.mapMaybeM f (step1 (adaptState gst)) -- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's. -- diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 2d4b1780f8..bd436145cb 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -231,7 +231,7 @@ import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import qualified Streamly.Internal.Data.Fold.Type as FL hiding (foldr) import qualified Streamly.Internal.Data.StreamK.Type as K -import qualified Streamly.Internal.Data.Transition as Transition +import qualified Streamly.Internal.Data.Producer as Producer import qualified Streamly.Internal.Data.Unfold.Type as Unfold #include "DocTestDataStream.hs" @@ -1168,7 +1168,7 @@ takeWhileM f (Stream step1 state1) = Stream step state1 where {-# INLINE_LATE step #-} - step gst = Transition.takeWhileM f (step1 gst) + step gst = Producer.takeWhileM f (step1 gst) -- | End the stream as soon as the predicate fails on an element. -- diff --git a/core/src/Streamly/Internal/Data/Unfold.hs b/core/src/Streamly/Internal/Data/Unfold.hs index 524e480924..a1ddf17032 100644 --- a/core/src/Streamly/Internal/Data/Unfold.hs +++ b/core/src/Streamly/Internal/Data/Unfold.hs @@ -142,7 +142,7 @@ import qualified Streamly.Internal.Data.Fold.Type as FL import qualified Streamly.Internal.Data.Scanl.Type as Scanl import qualified Streamly.Internal.Data.Stream.Type as D import qualified Streamly.Internal.Data.StreamK.Type as K -import qualified Streamly.Internal.Data.Transition as Transition +import qualified Streamly.Internal.Data.Producer as Producer import qualified Prelude import Streamly.Internal.Data.Unfold.Enumeration @@ -753,7 +753,7 @@ dropWhile f = dropWhileM (return . f) {-# INLINE_NORMAL mapMaybeM #-} mapMaybeM :: Monad m => (b -> m (Maybe c)) -> Unfold m a b -> Unfold m a c mapMaybeM f (Unfold step1 inject1) = - Unfold (Transition.mapMaybeM f step1) inject1 + Unfold (Producer.mapMaybeM f step1) inject1 -- | Map a 'Maybe' returning function on the output of the unfold, filter out -- the 'Nothing' elements, and return an unfold yielding the values extracted diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index ed23ae1ad3..ea8ab2756d 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -138,7 +138,7 @@ import Data.Void (Void) import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Data.Stream.Step (Step(..)) -import qualified Streamly.Internal.Data.Transition as Transition +import qualified Streamly.Internal.Data.Producer as Producer import Prelude hiding (id, (.), map, mapM, concatMap, zipWith, takeWhile) @@ -408,7 +408,7 @@ takeWhileM :: Monad m => (b -> m Bool) -> Unfold m a b -> Unfold m a b -- -- takeWhileM f = takeWhileMWithInput (\_ b -> f b) takeWhileM f (Unfold step1 inject1) = - Unfold (Transition.takeWhileM f step1) inject1 + Unfold (Producer.takeWhileM f step1) inject1 -- | End the stream generated by the 'Unfold' as soon as the predicate fails -- on an element. diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index 51e4750e09..e0b0c3a84b 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -519,7 +519,7 @@ library , Streamly.Internal.Data.Stream.Transformer , Streamly.Internal.Data.Stream.Type - , Streamly.Internal.Data.Transition + , Streamly.Internal.Data.Producer , Streamly.Internal.Data.StreamK.Type , Streamly.Internal.Data.StreamK.Transformer