module Data.Repa.Flow.IO.Bucket
( Bucket (..)
, openBucket
, hBucket
, fromFiles, fromFiles'
, fromDir
, fromSplitFile
, fromSplitFileAt
, toFiles, toFiles'
, toDir
, toDirs
, bClose
, bIsOpen
, bAtEnd
, bSeek
, bGetArray
, bPutArray)
where
import Data.Repa.Array.Material as A
import Data.Repa.Array.Generic as A
import Data.Repa.Array.Meta.Dense as A
import Data.Repa.Array.Meta.RowWise as A
import Data.Repa.Array.Auto.IO as A
import qualified Foreign.Storable as Foreign
import qualified Foreign.Marshal.Alloc as Foreign
import Control.Monad
import Data.Word
import System.IO
import System.FilePath
import System.Directory
import Prelude as P
data Bucket
= Bucket
{
bucketFilePath :: Maybe FilePath
, bucketStartPos :: Integer
, bucketLength :: Maybe Integer
, bucketHandle :: Handle }
openBucket :: FilePath -> IOMode -> IO Bucket
openBucket path mode
= do h <- openBinaryFile path mode
hSeek h SeekFromEnd 0
lenTotal <- hTell h
hSeek h AbsoluteSeek 0
return $ Bucket
{ bucketFilePath = Just path
, bucketStartPos = 0
, bucketLength = Just lenTotal
, bucketHandle = h }
hBucket :: Handle -> IO Bucket
hBucket h
= return $ Bucket
{ bucketFilePath = Nothing
, bucketStartPos = 0
, bucketLength = Nothing
, bucketHandle = h }
fromFiles
:: [FilePath]
-> (Array B Bucket -> IO b)
-> IO b
fromFiles files use
= fromFiles' (A.fromList B files) use
fromFiles'
:: (Bulk l FilePath, Target l Bucket)
=> Array l FilePath
-> (Array l Bucket -> IO b)
-> IO b
fromFiles' paths use
= do
bs <- mapM (flip openBucket ReadMode) $ A.toList paths
let Just bsArr = A.fromListInto (A.layout paths) bs
use bsArr
fromDir :: FilePath
-> (Array B Bucket -> IO b)
-> IO b
fromDir dir use
= do fs <- getDirectoryContents dir
let fsRel
= P.map (dir </>)
$ P.filter (\f -> f /= "." && f /= "..") fs
fromFiles fsRel use
fromSplitFile
:: Int
-> (Word8 -> Bool)
-> FilePath
-> (Array B Bucket -> IO b)
-> IO b
fromSplitFile n pEnd path use
= fromSplitFileAt n pEnd path 0 use
fromSplitFileAt
:: Int
-> (Word8 -> Bool)
-> FilePath
-> Integer
-> (Array B Bucket -> IO b)
-> IO b
fromSplitFileAt n pEnd path offsetStart use
= do
h0 <- openBinaryFile path ReadMode
hSeek h0 SeekFromEnd 0
lenTotal <- hTell h0
hClose h0
hh@(h1_ : _) <- mapM (flip openBinaryFile ReadMode) (replicate n path)
hSeek h1_ AbsoluteSeek offsetStart
let loop_advances _ _ []
= return []
loop_advances _ pos1 (_h1 : [])
= return [pos1]
loop_advances remain pos1 (h1 : h2 : hs)
= do
let lenWanted
= remain `div` (fromIntegral $ P.length (h1 : h2 : hs))
let posWanted = pos1 + lenWanted
hSeek h2 AbsoluteSeek posWanted
pos2 <- advance h2 pEnd
let remain' = lenTotal pos2
poss <- loop_advances remain' pos2 (h2 : hs)
return $ pos1 : poss
starts <- loop_advances lenTotal offsetStart hh
let ends = tail (starts ++ [lenTotal])
let lens = P.map (\(start, end) -> end start)
$ P.zip starts ends
let bs = [ Bucket
{ bucketFilePath = Just path
, bucketStartPos = start
, bucketLength = Just len
, bucketHandle = h }
| start <- starts
| len <- lens
| h <- hh ]
use $ A.fromList B bs
advance :: Handle -> (Word8 -> Bool) -> IO Integer
advance h pEnd
= do buf <- Foreign.mallocBytes 1
let loop_advance
= do c <- hGetBuf h buf 1
if c == 0
then return ()
else do
x <- Foreign.peek buf
if pEnd x
then return ()
else loop_advance
loop_advance
Foreign.free buf
hTell h
toFiles :: [FilePath]
-> (Array B Bucket -> IO b)
-> IO b
toFiles paths use
= toFiles' (A.fromList B paths) use
toFiles' :: (Bulk l FilePath, Target l Bucket)
=> Array l FilePath
-> (Array l Bucket -> IO b)
-> IO b
toFiles' paths use
= do
bs <- mapM (flip openBucket WriteMode) $ A.toList paths
let Just bsArr = A.fromListInto (A.layout paths) bs
use bsArr
toDir :: Int
-> FilePath
-> (Array B Bucket -> IO b)
-> IO b
toDir nBuckets path use
| nBuckets <= 0
= use (A.fromList B [])
| otherwise
= do
createDirectory path
let makeName i = path </> ((replicate (6 (P.length $ show i)) '0') ++ show i)
let names = [makeName i | i <- [0 .. nBuckets 1]]
let newBucket file
= do h <- openBinaryFile file WriteMode
return $ Bucket
{ bucketFilePath = Just file
, bucketStartPos = 0
, bucketLength = Nothing
, bucketHandle = h }
bs <- mapM newBucket names
use (A.fromList B bs)
toDirs :: Int
-> [FilePath]
-> (Array (E B DIM2) Bucket -> IO b)
-> IO b
toDirs nBucketsPerDir paths use
| nBucketsPerDir <= 0
= do let Just bsArr
= A.fromListInto (A.matrix B 0 0) []
use bsArr
| otherwise
= do
let makeName path i
= path </> ((replicate (6 (P.length $ show i)) '0') ++ show i)
let newBucket file
= do h <- openBinaryFile file WriteMode
return $ Bucket
{ bucketFilePath = Just file
, bucketStartPos = 0
, bucketLength = Nothing
, bucketHandle = h }
let newDir path
= do createDirectory path
bs <- mapM newBucket
$ [makeName path i | i <- [0 .. nBucketsPerDir 1]]
return bs
bs <- liftM P.concat $ P.mapM newDir paths
let Just bsArr
= A.fromListInto
(A.matrix B (P.length paths) nBucketsPerDir)
bs
use bsArr
bClose :: Bucket -> IO ()
bClose bucket
= hClose $ bucketHandle bucket
bIsOpen :: Bucket -> IO Bool
bIsOpen bucket
= hIsOpen $ bucketHandle bucket
bAtEnd :: Bucket -> IO Bool
bAtEnd bucket
= do eof <- hIsEOF $ bucketHandle bucket
posFile <- hTell $ bucketHandle bucket
when (posFile < bucketStartPos bucket)
$ error $ P.unlines
[ "repa-flow.bAtEnd: handle position is outside bucket."
, " bucket file path = " ++ show (bucketFilePath bucket)
, " bucket start pos = " ++ show (bucketStartPos bucket)
, " pos in file = " ++ show posFile ]
let posBucket = posFile bucketStartPos bucket
return $ eof || (case bucketLength bucket of
Nothing -> False
Just len -> posBucket >= len)
bSeek :: Bucket -> SeekMode -> Integer -> IO ()
bSeek bucket mode offset
= do
posFile <- hTell $ bucketHandle bucket
let posWanted
= case mode of
AbsoluteSeek
-> Just $ bucketStartPos bucket + max 0 offset
RelativeSeek
-> Just $ posFile + offset
SeekFromEnd
-> case bucketLength bucket of
Nothing -> Nothing
Just len -> Just $ bucketStartPos bucket
+ len max 0 offset
let posActual
= case posWanted of
Nothing
-> Nothing
Just wanted
| Just len <- bucketLength bucket
-> if wanted < bucketStartPos bucket
then Just $ bucketStartPos bucket
else if wanted > bucketStartPos bucket + len
then Just $ bucketStartPos bucket + len
else Just wanted
| otherwise
-> if wanted < bucketStartPos bucket
then Just $ bucketStartPos bucket
else Just wanted
case posActual of
Nothing -> hSeek (bucketHandle bucket) SeekFromEnd 0
Just pos -> hSeek (bucketHandle bucket) AbsoluteSeek pos
bGetArray :: Bucket -> Integer -> IO (Array F Word8)
bGetArray bucket lenWanted
= do
posFile <- hTell $ bucketHandle bucket
let posBucket = posFile bucketStartPos bucket
let len = case bucketLength bucket of
Nothing -> lenWanted
Just lenMax
-> let lenRemain = lenMax posBucket
in min lenWanted lenRemain
liftM (convert F)
$ hGetArray (bucketHandle bucket)
$ fromIntegral len
bPutArray :: Bucket -> Array F Word8 -> IO ()
bPutArray bucket arr
= hPutArray (bucketHandle bucket) (convert A arr)