module Potoki.Core.Transform.Basic

import Potoki.Core.Prelude hiding (take, takeWhile, filter, drop)
import Potoki.Core.Types
import qualified Potoki.Core.Fetch as A
import qualified Data.HashSet as C
import qualified Data.Vector as P
import qualified Acquire.Acquire as M
import qualified Data.Vector.Generic.Mutable as MutableGenericVector
import qualified Data.Vector.Generic as GenericVector
import qualified Text.Builder as TextBuilder
import qualified Potoki.Core.TextBuilder as TextBuilder

{-# INLINE mapFilter #-}
mapFilter :: (input -> Maybe output) -> Transform input output
mapFilter mapping =
  Transform (return . A.mapFilter mapping)

{-# INLINE filter #-}
filter :: (input -> Bool) -> Transform input input
filter predicate =
  Transform (pure . A.filter predicate)

{-# INLINE just #-}
just :: Transform (Maybe input) input
just =
  Transform (pure . A.just)

{-# INLINE takeWhile #-}
takeWhile :: (input -> Bool) -> Transform input input
takeWhile predicate =
  Transform (pure . A.takeWhile predicate)

{-# INLINE drop #-}
drop :: Int -> Transform input input
drop amount =
  Transform $ \ (A.Fetch fetchIO) -> M.Acquire $ do
    countRef <- newIORef amount
    return $ (, return ()) $ A.Fetch $ let
      loop = do
        count <- readIORef countRef
        if count > 0
          then do
            writeIORef countRef $! pred count
          else fetchIO
      in loop

{-# INLINE list #-}
list :: Transform [a] a
list =
  Transform $ \ (A.Fetch fetchListIO) -> liftIO $ do
    bufferRef <- newIORef []
      fetchFromBufferOr whenEmpty = do
        buffer <- readIORef bufferRef
        case buffer of
          (!head) : tail -> do
            writeIORef bufferRef tail
            return (Just head)
          _ -> whenEmpty
      fetchFromSource = do
        fetchedList <- fetchListIO
        case fetchedList of
          Just list -> case list of
            (!head) : tail -> do
              writeIORef bufferRef tail
              return (Just head)
            _ -> do
          Nothing -> return Nothing
      in return $ A.Fetch $ fetchFromBufferOr fetchFromSource

{-# INLINABLE vector #-}
vector :: GenericVector.Vector vector a => Transform (vector a) a
vector =
  Transform $ \ (Fetch fetchVectorIO) -> liftIO $ do
    indexRef <- newIORef 0
    vectorRef <- newIORef GenericVector.empty
    return $ Fetch $ let
      loop = do
        vectorVal <- readIORef vectorRef
        indexVal <- readIORef indexRef
        if indexVal < GenericVector.length vectorVal
          then do
            writeIORef indexRef (succ indexVal)
            return (Just (GenericVector.unsafeIndex vectorVal indexVal))
          else fetchVectorIO >>= \case
            Just vectorVal' -> do
              writeIORef vectorRef vectorVal'
              writeIORef indexRef 0
            Nothing -> return Nothing
      in loop

Chunk the stream to vector batches of the given size.

It's useful in combination with 'concurrently' in cases where the lifted transform's iteration is too light.
Actually, there is a composed variation of 'concurrently', which utilizes it: 'concurrentlyWithBatching'.
{-# INLINABLE batch #-}
batch :: GenericVector.Vector vector a => Int -> Transform a (vector a)
batch size = if size < 1
  then Transform $ const $ liftIO $ return $ empty
  else Transform $ \ (Fetch fetch) -> liftIO $ do
    mvec <- size
    cursor <- newIORef 0
    activeVar <- newIORef True
    return $ Fetch $ let
      loop = do
        active <- readIORef activeVar
        if active
          then do
            fetchingResult <- fetch
            case fetchingResult of
              Just !a -> do
                index <- readIORef cursor
                MutableGenericVector.unsafeWrite mvec index a
                let !nextIndex = succ index
                if nextIndex == size
                  then do
                    writeIORef cursor 0
                    !vec <- GenericVector.freeze mvec
                    return (Just vec)
                  else do
                    writeIORef cursor nextIndex
              Nothing -> do
                writeIORef activeVar False
                index <- readIORef cursor
                if index > 0
                  then do
                    !vec <- GenericVector.freeze (MutableGenericVector.unsafeSlice 0 index mvec)
                    return (Just vec)
                  else return Nothing
          else return Nothing
      in loop

{-# INLINE distinctBy #-}
distinctBy :: (Eq comparable, Hashable comparable) => (element -> comparable) -> Transform element element
distinctBy f =
  Transform $ \ (A.Fetch fetch) -> M.Acquire $ do
    stateRef <- newIORef mempty
    return $ (, return ()) $ A.Fetch $ let
      loop = fetch >>= \case
        Nothing -> return Nothing
        Just input -> do
          let comparable = f input
          !set <- readIORef stateRef
          if C.member comparable set
            then loop
            else do
              writeIORef stateRef $! C.insert comparable set
              return (Just input)
      in loop

{-# INLINE distinct #-}
distinct :: (Eq element, Hashable element) => Transform element element
distinct = distinctBy id

{-# INLINE mapInIO #-}
mapInIO :: (a -> IO b) -> Transform a b
mapInIO io =
  Transform $ \ (A.Fetch fetch) -> M.Acquire $
  return $ (, return ()) $ A.Fetch $
  join $ (sequence . fmap io) <$> fetch

{-# INLINE ioTransform #-}
ioTransform :: IO (Transform a b) -> Transform a b
ioTransform io =
  Transform $ \ fetch -> do
    Transform acquire <- liftIO io
    acquire fetch

count :: Transform a Int
count = Transform $ \ (Fetch fetchIO) -> do
  counter <- liftIO (newIORef 0)
  return $ Fetch $ do
    result <- fetchIO
    case result of
      Just _ -> Just <$> atomicModifyIORef' counter (\ n -> (succ n, n))
      Nothing -> return Nothing

mapInIOWithCounter :: (Int -> a -> IO b) -> Transform a b
mapInIOWithCounter handler =
  ioTransform $ do
    counter <- newIORef 0
    return $ mapInIO $ \ !a -> do
      count <- atomicModifyIORef' counter (\ n -> (succ n, n))
      handler count a

handleCount :: (Int -> IO ()) -> Transform a a
handleCount handler = mapInIOWithCounter $ \ count a -> do
  handler count
  return a

Provides for progress monitoring by means of periodic measurement.
handleCountOnInterval :: NominalDiffTime -> (Int -> IO ()) -> Transform a a
handleCountOnInterval interval handler = ioTransform $ do
  nextTime <- addUTCTime interval <$> getCurrentTime
  nextTimeRef <- newIORef nextTime
  return $ handleCount $ \ count -> do
    nextTime <- readIORef nextTimeRef
    time <- getCurrentTime
    when (time >= nextTime) $ do
      writeIORef nextTimeRef (addUTCTime interval nextTime)
      handler count

Useful for debugging
traceWithCounter :: (Int -> String) -> Transform a a
traceWithCounter shower = handleCount (putStrLn . shower)

{-# INLINE consume #-}
consume :: Consume input output -> Transform input output
consume (Consume runFetch) =
  Transform $ \ (Fetch inputIO) -> do
    stoppedRef <- liftIO $ newIORef False
    return $ Fetch $ do
      stopped <- readIORef stoppedRef
      if stopped
        then do
          writeIORef stoppedRef False
          return Nothing
        else do
          emittedRef <- newIORef False
          output <- runFetch $ Fetch $ do
            input <- inputIO
            case input of
              Nothing     -> do
                writeIORef stoppedRef True
                return Nothing
              Just element -> do
                writeIORef emittedRef True
                return $ Just element
          checkStopped <- readIORef stoppedRef
          if checkStopped
            then do
              emitted <- readIORef emittedRef
              if emitted
                then return $ Just output
                else do
                  writeIORef stoppedRef False
                  return Nothing
            else return $ Just output

{-# INLINABLE produce #-}
produce :: (input -> Produce output) -> Transform input output
produce inputToProduce =
  Transform $ \ (Fetch inputFetchIO) -> do
    stateRef <- liftIO $ newIORef Nothing
    return $ Fetch $ let
      loop = do
        state <- readIORef stateRef
        case state of
          Just (Fetch outputFetchIO, kill) ->
              outputFetchResult <- outputFetchIO
              case outputFetchResult of
                Just x -> return (Just x)
                Nothing -> do
                  writeIORef stateRef Nothing
          Nothing ->
              inputFetchResult <- inputFetchIO
              case inputFetchResult of
                Just input -> do
                  case inputToProduce input of
                    Produce (Acquire produceIO) -> do
                      fetchAndKill <- produceIO
                      writeIORef stateRef (Just fetchAndKill)
                Nothing -> return Nothing
      in loop

{-# INLINE mapFetch #-}
mapFetch :: (Fetch a -> Fetch b) -> Transform a b
mapFetch mapping =
  Transform $ return . mapping

Execute the IO action.
{-# INLINE executeIO #-}
executeIO :: Transform (IO a) a
executeIO =
  mapFetch $ \ (Fetch fetchIO) -> Fetch (fetchIO >>= sequence)

{-# INLINE take #-}
take :: Int -> Transform input input
take amount
  | amount <= 0 =
    Transform $ \ _ -> return $ Fetch $ return Nothing
  | otherwise   =
    Transform $ \ (Fetch fetchIO) -> do
      countRef <- liftIO $ newIORef amount
      return $ Fetch $ do
        count <- readIORef countRef
        if count > 0
          then do
            modifyIORef countRef pred
            return Nothing

reportProgress :: (Text -> IO ()) -> Transform a a
reportProgress log = handleProgressAndCountOnInterval 1 $ \ progress count ->
  log $ $ TextBuilder.streamProgressMessage progress count

handleProgressAndCountOnInterval :: NominalDiffTime -> (Int -> Int -> IO ()) -> Transform a a
handleProgressAndCountOnInterval interval handle = ioTransform $ do
  lastCountRef <- newIORef 0
  return $ handleCountOnInterval interval $ \ count -> do
    lastCount <- readIORef lastCountRef
    writeIORef lastCountRef count
    handle (count - lastCount) count

uniquify :: (Eq a) => Transform a a
uniquify =
  Transform $ \ (Fetch fetchIO) -> M.Acquire $ do
    duplicateRef <- newIORef Nothing
    return $ (, return ()) $ A.Fetch $ let
      loop = do
        dupliacte <- readIORef duplicateRef
        fetch <- fetchIO
        case fetch of
          Nothing -> return Nothing
          Just elem -> do
            case dupliacte of
              Nothing -> do
                writeIORef duplicateRef fetch
                return fetch
              Just dupl -> do
                if elem == dupl
                  else do
                    writeIORef duplicateRef (Just elem)
                    return fetch
      in loop