module Potoki.Core.Transform.ByteString
where
import Potoki.Core.Prelude hiding (filter)
import Potoki.Core.Transform.Basic
import Potoki.Core.Transform.Instances ()
import Potoki.Core.Transform.Concurrency
import Potoki.Core.Types
import qualified Potoki.Core.Fetch as A
import qualified Potoki.Core.Produce as H
import qualified Ptr.Poking as C
import qualified Ptr.ByteString as D
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as E
import qualified Data.ByteString.Lazy as F
import qualified Acquire.Acquire as M
{-# INLINE builderChunks #-}
builderChunks :: Transform E.Builder ByteString
builderChunks =
produce (H.list . F.toChunks . E.toLazyByteString)
extractLines :: Transform ByteString ByteString
extractLines =
lineList >>> list
where
lineList =
Transform $ \ (Fetch fetchIO) -> liftIO $ do
stateRef <- newIORef Nothing
return $ Fetch $ fetchIO >>= \case
Just chunk ->
case B.split 10 chunk of
firstInput : tailVal -> do
state <- readIORef stateRef
let
newPoking =
fold state <> C.bytes firstInput
in case unsnoc tailVal of
Just (!initVal, !lastVal) ->
do
writeIORef stateRef $! Just $! C.bytes lastVal
let !bytes = D.poking newPoking
return (Just (bytes : initVal))
Nothing ->
do
writeIORef stateRef (Just newPoking)
return (Just [])
_ -> return (Just [])
Nothing -> do
state <- readIORef stateRef
case state of
Just poking -> do
writeIORef stateRef Nothing
return (Just [D.poking poking])
Nothing -> return Nothing
extractLinesWithoutTrail :: Transform ByteString ByteString
extractLinesWithoutTrail =
lineList >>> list
where
lineList =
Transform $ \ (Fetch fetchIO) -> liftIO $ do
pokingRef <- newIORef mempty
return $ Fetch $ do
fetchResult <- fetchIO
case fetchResult of
Just chunk -> case B.split 10 chunk of
head : tail -> do
poking <- readIORef pokingRef
let newPoking = poking <> C.bytes head
case unsnoc tail of
Just (!init, last) -> do
writeIORef pokingRef (C.bytes last)
let
!bytes = D.poking newPoking
in return (Just (bytes : init))
Nothing -> do
writeIORef pokingRef newPoking
return (Just [])
_ -> return (Just [])
Nothing -> do
poking <- readIORef pokingRef
if C.null poking
then return Nothing
else let
!bytes = D.poking poking
in do
writeIORef pokingRef mempty
return (Just [bytes])
extractLinesConcurrently :: Int -> Transform ByteString ByteString
extractLinesConcurrently concurrency =
concurrentlyInOrderWithBatching 100 concurrency (arr (B.split 10)) >>> mergeLineChunks
mergeLineChunks :: Transform [ByteString] ByteString
mergeLineChunks = Transform $ \ (Fetch fetchChunkList) -> liftIO $ do
cachedChunkListRef <- newIORef (Just [])
incompleteChunkRef <- newIORef Nothing
return $ Fetch $ let
loop = do
cachedChunkListIfAny <- readIORef cachedChunkListRef
case cachedChunkListIfAny of
Just cachedChunkList -> case cachedChunkList of
(!a) : b : tail -> do
writeIORef cachedChunkListRef (Just (b : tail))
return (Just a)
(!a) : _ -> do
writeIORef cachedChunkListRef (Just [])
writeIORef incompleteChunkRef (Just a)
loop
_ -> do
newChunkListIfAny <- fetchChunkList
incompleteChunkIfAny <- readIORef incompleteChunkRef
case incompleteChunkIfAny of
Just incompleteChunk -> case newChunkListIfAny of
Just newChunkList -> case newChunkList of
a : b : tail -> do
completeChunk <- evaluate (incompleteChunk <> a)
writeIORef cachedChunkListRef (Just (b : tail))
writeIORef incompleteChunkRef Nothing
return (Just completeChunk)
a : _ -> do
newIncompleteChunk <- evaluate (incompleteChunk <> a)
writeIORef incompleteChunkRef (Just newIncompleteChunk)
loop
_ -> loop
Nothing -> do
writeIORef cachedChunkListRef Nothing
return (Just incompleteChunk)
Nothing -> do
writeIORef cachedChunkListRef newChunkListIfAny
loop
Nothing -> return Nothing
in loop