module Potoki.Core.Consume ( Consume(..), apConcurrently, list, sum, transform, count, head, last, reverseList, vector, concat, fold, foldInIO, folding, foldingInIO, execState, writeBytesToStdout, writeBytesToFile, writeBytesToFileWithoutBuffering, appendBytesToFile, deleteFiles, printBytes, printText, printString, parseBytes, parseText, concurrently, ) where import Potoki.Core.Prelude hiding (sum, head, fold, concat, last) import Potoki.Core.Types import qualified Potoki.Core.Fetch as A import qualified Acquire.IO as B import qualified Potoki.Core.Transform as J import qualified Potoki.Core.IO.Fetch as L import qualified Data.ByteString as C import qualified Data.Attoparsec.ByteString as E import qualified Data.Attoparsec.Text as F import qualified Data.Attoparsec.Types as I import qualified Data.Text.IO as K import qualified Control.Foldl as D import qualified System.Directory as G import qualified Potoki.Core.Transform.Concurrency as B import qualified Control.Monad.Trans.State.Strict as O instance Profunctor Consume where {-# INLINE dimap #-} dimap inputMapping outputMapping (Consume consume) = Consume (\ fetch -> fmap outputMapping (consume $ fmap inputMapping fetch)) instance Choice Consume where right' :: Consume a b -> Consume (Either c a) (Either c b) right' (Consume rightConsumeIO) = Consume $ \ (Fetch eitherFetchIO) -> do fetchedLeftMaybeRef <- newIORef Nothing consumedRight <- rightConsumeIO $ Fetch $ do eitherFetch <- eitherFetchIO case eitherFetch of Nothing -> return Nothing Just element -> case element of Right fetchedRight -> return $ Just fetchedRight Left fetchedLeft -> do writeIORef fetchedLeftMaybeRef $ Just fetchedLeft return Nothing fetchedLeftMaybe <- readIORef fetchedLeftMaybeRef case fetchedLeftMaybe of Nothing -> return $ Right consumedRight Just fetchedLeft -> return $ Left fetchedLeft instance Functor (Consume input) where fmap = rmap instance Applicative (Consume a) where pure x = Consume $ \ _ -> pure x Consume leftConsumeIO <*> Consume rightConsumeIO = Consume $ \ fetch -> leftConsumeIO fetch <*> rightConsumeIO fetch instance Monad (Consume a) where Consume leftConsumeIO >>= toRightConsumeIO = Consume $ \ fetch -> do Consume rightConsumeIO <- toRightConsumeIO <$> leftConsumeIO fetch rightConsumeIO fetch instance MonadIO (Consume a) where liftIO a = Consume $ \ _ -> a apConcurrently :: Consume a (b -> c) -> Consume a b -> Consume a c apConcurrently (Consume leftConsumeIO) (Consume rightConsumeIO) = Consume $ \ fetch -> do (leftFetch, rightFetch) <- A.duplicate fetch rightOutputVar <- newEmptyMVar _ <- forkIO $ do !rightOutput <- rightConsumeIO rightFetch putMVar rightOutputVar rightOutput !leftOutput <- leftConsumeIO leftFetch rightOutput <- takeMVar rightOutputVar return (leftOutput rightOutput) unit :: Consume a () unit = Consume $ \ _ -> return () {-# INLINABLE list #-} list :: Consume input [input] list = Consume $ \ (Fetch fetchIO) -> let build !acc = do fetch <- fetchIO case fetch of Nothing -> pure $ acc [] Just !element -> build $ acc . (:) element in build id {-# INLINE sum #-} sum :: Num num => Consume num num sum = Consume $ \ (Fetch fetchIO) -> let build !acc = do fetch <- fetchIO case fetch of Nothing -> pure acc Just !element -> build $ element + acc in build 0 {-# INLINABLE transform #-} transform :: Transform input1 input2 -> Consume input2 output -> Consume input1 output transform (Transform transformAcquire) (Consume consumeIO) = Consume $ \ fetch -> B.acquire (transformAcquire fetch) consumeIO {-# INLINABLE head #-} head :: Consume input (Maybe input) head = Consume (\ (A.Fetch fetchIO) -> fetchIO) {-# INLINABLE last #-} last :: Consume input (Maybe input) last = fold D.last {-| A faster alternative to "list", which however constructs the list in the reverse order. -} {-# INLINABLE reverseList #-} reverseList :: Consume input [input] reverseList = Consume $ \ (A.Fetch fetchIO) -> build fetchIO [] where build fetchIO !acc = fetchIO >>= \case Nothing -> pure acc Just element -> build fetchIO (element : acc) {-# INLINABLE vector #-} vector :: Consume input (Vector input) vector = foldInIO D.vectorM {-# INLINABLE count #-} count :: Consume input Int count = Consume $ \ (A.Fetch fetchIO) -> let iterate !count = do fetchResult <- fetchIO case fetchResult of Just _ -> iterate (succ count) Nothing -> return count in iterate 0 {-# INLINABLE concat #-} concat :: (Semigroup monoid, Monoid monoid) => Consume monoid monoid concat = Consume $ \ (A.Fetch fetchIO) -> build fetchIO mempty where build fetchIO !acc = fetchIO >>= \case Nothing -> pure acc Just element -> build fetchIO (acc <> element) {-# INLINABLE processInIO #-} processInIO :: IO () -> (element -> IO ()) -> Consume element () processInIO stop process = Consume (\ fetch -> L.fetchAndHandleAll fetch stop process) {-# INLINABLE printBytes #-} printBytes :: Consume ByteString () printBytes = processInIO (putChar '\n') C.putStr {-# INLINABLE printText #-} printText :: Consume Text () printText = processInIO (putChar '\n') K.putStr {-# INLINABLE printString #-} printString :: Consume String () printString = processInIO (putChar '\n') putStr {-# INLINABLE writeBytesToStdout #-} writeBytesToStdout :: Consume ByteString () writeBytesToStdout = processInIO (return ()) (C.hPut stdout) {-| Overwrite a file. * Exception-free * Automatic resource management -} {-# INLINABLE writeBytesToFile #-} writeBytesToFile :: FilePath -> Consume ByteString (Either IOException ()) writeBytesToFile = writeBytesToFileWithBuffering (BlockBuffering Nothing) {-| Overwrite a file. * Exception-free * Automatic resource management -} {-# INLINABLE writeBytesToFileWithBuffering #-} writeBytesToFileWithBuffering :: BufferMode -> FilePath -> Consume ByteString (Either IOException ()) writeBytesToFileWithBuffering bufferMode path = Consume $ \ fetch -> try $ withFile path WriteMode $ \ handle -> do hSetBuffering handle bufferMode L.fetchAndHandleAll fetch (return ()) (C.hPut handle) {-| A more efficient implementation than just writing to file without buffering. It uses an explicit buffer of input chunks and flushes all the chunks that have been so far aggregated at once. -} {-# INLINABLE writeBytesToFileWithoutBuffering #-} writeBytesToFileWithoutBuffering :: FilePath -> Consume ByteString (Either IOException ()) writeBytesToFileWithoutBuffering = transform (arr mconcat . B.bufferizeFlushing 64) . writeBytesToFileWithBuffering NoBuffering {-| Append to a file. * Exception-free * Automatic resource management -} {-# INLINABLE appendBytesToFile #-} appendBytesToFile :: FilePath -> Consume ByteString (Either IOException ()) appendBytesToFile path = Consume $ \ fetch -> try $ withFile path AppendMode $ \ handleVal -> do L.fetchAndHandleAll fetch (return ()) (C.hPut handleVal) {-# INLINABLE deleteFiles #-} deleteFiles :: Consume FilePath (Either IOException ()) deleteFiles = Consume $ \ fetch -> try $ L.fetchAndHandleAll fetch (return ()) G.removeFile {-# INLINABLE fold #-} fold :: D.Fold input output -> Consume input output fold (D.Fold step initVal finish) = Consume $ \ (A.Fetch fetch) -> build fetch initVal where build fetch !acc = fetch >>= \case Nothing -> pure $ finish acc Just input -> build fetch (step acc input) {-# INLINABLE foldInIO #-} foldInIO :: D.FoldM IO input output -> Consume input output foldInIO (D.FoldM step initVal finish) = Consume $ \ (A.Fetch fetch) -> build fetch =<< initVal where build fetch !acc = fetch >>= \case Nothing -> finish acc Just input -> step acc input >>= build fetch {-# INLINABLE folding #-} folding :: D.Fold a b -> Consume a c -> Consume a (b, c) folding (D.Fold step initVal extract) (Consume consumeIO) = Consume $ \ fetch -> do foldStateRef <- newIORef initVal consumptionResult <- consumeIO (A.handlingElements (\ element -> do !newState <- flip step element <$> readIORef foldStateRef writeIORef foldStateRef newState) fetch) foldResult <- extract <$> readIORef foldStateRef return (foldResult, consumptionResult) {-# INLINABLE foldingInIO #-} foldingInIO :: D.FoldM IO a b -> Consume a c -> Consume a (b, c) foldingInIO (D.FoldM step initVal extract) (Consume consumeIO) = Consume $ \ fetch -> do foldStateRef <- newIORef =<< initVal consumptionResult <- consumeIO (A.handlingElements (\ element -> do !newState <- flip step element =<< readIORef foldStateRef writeIORef foldStateRef newState) fetch) foldResult <- extract =<< readIORef foldStateRef return (foldResult, consumptionResult) {-# INLINE execState #-} execState :: (a -> O.State s b) -> s -> Consume a s execState stateFn initialState = fold $ D.Fold (\currentState input -> snd $ O.runState (stateFn input) currentState) initialState id {-# INLINABLE runParseResult #-} runParseResult :: (Monoid input, Eq input) => (input -> I.IResult input output) -> Consume input (Either Text output) runParseResult inputToResult = Consume $ \ (A.Fetch fetchInput) -> let just !input = case inputToResult input of I.Partial newInputToResult -> consume newInputToResult I.Done _ parsed -> return (Right parsed) I.Fail _ contexts message -> return (Left resultMessage) where resultMessage = if null contexts then fromString message else fromString (showString (intercalate " > " contexts) (showString ": " message)) consume _ = fetchInput >>= \case Nothing -> just mempty Just !input -> just input in consume inputToResult {-# INLINABLE parseBytes #-} parseBytes :: E.Parser output -> Consume ByteString (Either Text output) parseBytes = runParseResult . E.parse {-# INLINABLE parseText #-} parseText :: F.Parser output -> Consume Text (Either Text output) parseText = runParseResult . F.parse {-| Execute a Consume concurrently and consume its results. -} {-# INLINABLE concurrently #-} concurrently :: NFData b => Int -> Consume a b -> Consume b c -> Consume a c concurrently amount consume1 consume2 = transform (B.concurrently amount (J.consume consume1)) consume2