module Data.Repa.Flow.Generic.IO.Sieve
        (sieve_o)
where
import Data.Repa.Flow.Generic.Base
import Data.Repa.Array.Material         as A
import Data.Repa.Array.Generic          as A
import Data.Repa.Array.Auto.IO          as A
import qualified Data.HashTable.IO      as Hash
import qualified System.Mem             as System
import System.IO
import Data.Word
import Data.IORef

import qualified Data.Vector.Mutable    as M
import qualified Data.Vector            as V

#include "repa-flow.h"


-- | Create an output sieve that writes data to an indeterminate number of
--   output files. Each new element is appended to its associated file.
--
---
--   We don't want to open and close a file every time we receieve data.
--   To avoid this, we instead batch data in memory for each file until
--   we have enough to warrant performing the IO operation.
--
sieve_o :: Int                  -- ^ Max payload size of in-memory data.
        -> Int                  -- ^ Max number of in-memory chunks.
        -> (a -> Maybe (FilePath, Array F Word8))   
                                -- ^ Produce the desired file path and output
                                --   record for this element, or `Nothing` if
                                --   it should be discarded.
        -> IO (Sinks () IO a)

sieve_o sizeLimit chunksLimit diag
 = do
        -- Store an array of chunks for each file.
        -- We use a mutable vector of chunks, and store the number of used 
        -- slots in that vector separately.
        (ht :: Hash.CuckooHashTable FilePath (Int, M.IOVector (Array F Word8)))
         <- Hash.newSized 1024

        !refSize   <- newIORef 0
        !refChunks <- newIORef 0

        -- Flush the chunks for a single file to disk.
        let flush_path (path, (n, mvec))
             = do 
                  !vec   <- V.unsafeFreeze mvec

                  !h     <- openBinaryFile path AppendMode 

                  -- Write out chunks for this file.
                  V.mapM_ (hPutArray h . convert A) 
                        $ V.slice 0 n vec

                  hClose h

                  -- Delete the entry from the hash table.
                  -- This allows the space for the mutable vector of chunks to be reclaimed.
                  Hash.delete ht path


        -- Flush all the chunks we have stored.
        let flush_all 
             = do Hash.mapM_ flush_path ht

        -- Remember that we've accumulated this chunk into memory.
        -- When we end up with too much data then we flush the whole lot
        -- to the file system.
        let acc_size !len
             = do !sizeCurrent    <- readIORef refSize
                  !chunksCurrent  <- readIORef refChunks

                  if  (sizeCurrent   + len) > sizeLimit
                   || (chunksCurrent + 1)   > chunksLimit
                   then do
                        flush_all
                        writeIORef refSize   0
                        writeIORef refChunks 0

                   else do
                        let !sizeCurrent'   = sizeCurrent + len
                        let !chunksCurrent' = chunksCurrent + 1
                        writeIORef refSize   sizeCurrent'
                        writeIORef refChunks chunksCurrent'

        -- Accept a single incoming element.
        let push_sieve _ !e

             = case diag e of

                -- The provided diag function told us to drop this
                -- element on the floor.
                Nothing 
                 -> return ()

                -- Accumulate a new chunk.
                Just (path, arr)
                 -> do  
                        -- See if we already have a buffer for this file.
                        !mElem   <- Hash.lookup ht path
                        case mElem of

                         -- We haven't seen chunks for this file before, 
                         -- so create a new vector to hold them.
                         Nothing
                          -> do !mvec    <- M.new 256
                                M.write mvec 0 arr
                                Hash.insert ht path (1, mvec)
                                acc_size (A.length arr)

                         -- We already have a chunk vector for this file.
                         Just (n, mvec)
                          -> do 
                                -- If the chunk vector has no space the expand it.
                                !mvec'  <- if n >= M.length mvec
                                                then M.grow mvec (M.length mvec)
                                                else return mvec

                                M.write mvec' n arr
                                let !n' = n + 1
                                Hash.insert ht path (n', mvec')
                                acc_size (A.length arr)

        let eject_sieve _ 
             = do flush_all
                  System.performMajorGC

        return  $ Sinks () push_sieve eject_sieve
{-# INLINE sieve_o #-}