module Potoki.Core.Transform.ByteString
where

import Potoki.Core.Prelude hiding (filter)
import Potoki.Core.Transform.Basic
import Potoki.Core.Transform.Instances ()
import Potoki.Core.Transform.Concurrency
import Potoki.Core.Types
import qualified Potoki.Core.Fetch as A
import qualified Potoki.Core.Produce as H
import qualified Ptr.Poking as C
import qualified Ptr.ByteString as D
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as E
import qualified Data.ByteString.Lazy as F
import qualified Acquire.Acquire as M


{-# INLINE builderChunks #-}
builderChunks :: Transform E.Builder ByteString
builderChunks =
  produce (H.list . F.toChunks . E.toLazyByteString)

{-|
Convert freeform bytestring chunks into chunks,
which are strictly separated by newline no matter how long they may be.
-}
extractLines :: Transform ByteString ByteString
extractLines =
  lineList >>> list
  where
    lineList =
      Transform $ \ (Fetch fetchIO) -> liftIO $ do
        stateRef <- newIORef Nothing
        return $ Fetch $ fetchIO >>= \case
          Just chunk ->
            case B.split 10 chunk of
              firstInput : tailVal -> do
                state <- readIORef stateRef
                let
                  newPoking =
                    fold state <> C.bytes firstInput
                  in case unsnoc tailVal of
                    Just (!initVal, !lastVal) ->
                      do
                        writeIORef stateRef $! Just $! C.bytes lastVal
                        let !bytes = D.poking newPoking
                        return (Just (bytes : initVal))
                    Nothing ->
                      do
                        writeIORef stateRef (Just newPoking)
                        return (Just [])
              _ -> return (Just [])
          Nothing -> do
            state <- readIORef stateRef
            case state of
              Just poking -> do
                writeIORef stateRef Nothing
                return (Just [D.poking poking])
              Nothing -> return Nothing

extractLinesWithoutTrail :: Transform ByteString ByteString
extractLinesWithoutTrail =
  lineList >>> list
  where
    lineList =
      Transform $ \ (Fetch fetchIO) -> liftIO $ do
        pokingRef <- newIORef mempty
        return $ Fetch $ do
          fetchResult <- fetchIO
          case fetchResult of
            Just chunk -> case B.split 10 chunk of
              head : tail -> do
                poking <- readIORef pokingRef
                let newPoking = poking <> C.bytes head
                case unsnoc tail of
                  Just (!init, last) -> do
                    writeIORef pokingRef (C.bytes last)
                    let
                      !bytes = D.poking newPoking
                      in return (Just (bytes : init))
                  Nothing -> do
                    writeIORef pokingRef newPoking
                    return (Just [])
              _ -> return (Just [])
            Nothing -> do
              poking <- readIORef pokingRef
              if C.null poking
                then return Nothing
                else let
                  !bytes = D.poking poking
                  in do
                    writeIORef pokingRef mempty
                    return (Just [bytes])

extractLinesConcurrently :: Int -> Transform ByteString ByteString
extractLinesConcurrently concurrency =
  concurrentlyInOrderWithBatching 100 concurrency (arr (B.split 10)) >>> mergeLineChunks

mergeLineChunks :: Transform [ByteString] ByteString
mergeLineChunks = Transform $ \ (Fetch fetchChunkList) -> liftIO $ do
  cachedChunkListRef <- newIORef (Just [])
  incompleteChunkRef <- newIORef Nothing
  return $ Fetch $ let
    loop = do
      cachedChunkListIfAny <- readIORef cachedChunkListRef
      case cachedChunkListIfAny of
        Just cachedChunkList -> case cachedChunkList of
          (!a) : b : tail -> do
            writeIORef cachedChunkListRef (Just (b : tail))
            return (Just a)
          (!a) : _ -> do
            writeIORef cachedChunkListRef (Just [])
            writeIORef incompleteChunkRef (Just a)
            loop
          _ -> do
            newChunkListIfAny <- fetchChunkList
            incompleteChunkIfAny <- readIORef incompleteChunkRef
            case incompleteChunkIfAny of
              Just incompleteChunk -> case newChunkListIfAny of
                Just newChunkList -> case newChunkList of
                  a : b : tail -> do
                    completeChunk <- evaluate (incompleteChunk <> a)
                    writeIORef cachedChunkListRef (Just (b : tail))
                    writeIORef incompleteChunkRef Nothing
                    return (Just completeChunk)
                  a : _ -> do
                    newIncompleteChunk <- evaluate (incompleteChunk <> a)
                    writeIORef incompleteChunkRef (Just newIncompleteChunk)
                    loop
                  _ -> loop
                Nothing -> do
                  writeIORef cachedChunkListRef Nothing
                  return (Just incompleteChunk)
              Nothing -> do
                writeIORef cachedChunkListRef newChunkListIfAny
                loop
        Nothing -> return Nothing
    in loop