module Data.Repa.Flow.Generic.IO.Sieve
(sieve_o)
where
import Data.Repa.Flow.Generic.Base
import Data.Repa.Array.Material as A
import Data.Repa.Array.Generic as A
import Data.Repa.Array.Auto.IO as A
import qualified Data.HashTable.IO as Hash
import qualified System.Mem as System
import System.IO
import Data.Word
import Data.IORef
import qualified Data.Vector.Mutable as M
import qualified Data.Vector as V
#include "repa-flow.h"
sieve_o :: Int
-> Int
-> (a -> Maybe (FilePath, Array F Word8))
-> IO (Sinks () IO a)
sieve_o sizeLimit chunksLimit diag
= do
(ht :: Hash.CuckooHashTable FilePath (Int, M.IOVector (Array F Word8)))
<- Hash.newSized 1024
!refSize <- newIORef 0
!refChunks <- newIORef 0
let flush_path (path, (n, mvec))
= do
!vec <- V.unsafeFreeze mvec
!h <- openBinaryFile path AppendMode
V.mapM_ (hPutArray h . convert A)
$ V.slice 0 n vec
hClose h
Hash.delete ht path
let flush_all
= do Hash.mapM_ flush_path ht
let acc_size !len
= do !sizeCurrent <- readIORef refSize
!chunksCurrent <- readIORef refChunks
if (sizeCurrent + len) > sizeLimit
|| (chunksCurrent + 1) > chunksLimit
then do
flush_all
writeIORef refSize 0
writeIORef refChunks 0
else do
let !sizeCurrent' = sizeCurrent + len
let !chunksCurrent' = chunksCurrent + 1
writeIORef refSize sizeCurrent'
writeIORef refChunks chunksCurrent'
let push_sieve _ !e
= case diag e of
Nothing
-> return ()
Just (path, arr)
-> do
!mElem <- Hash.lookup ht path
case mElem of
Nothing
-> do !mvec <- M.new 256
M.write mvec 0 arr
Hash.insert ht path (1, mvec)
acc_size (A.length arr)
Just (n, mvec)
-> do
!mvec' <- if n >= M.length mvec
then M.grow mvec (M.length mvec)
else return mvec
M.write mvec' n arr
let !n' = n + 1
Hash.insert ht path (n', mvec')
acc_size (A.length arr)
let eject_sieve _
= do flush_all
System.performMajorGC
return $ Sinks () push_sieve eject_sieve