module Data.Hadoop.Writable
( Writable(..)
, Collection
, Decoder(..)
, split
, vintSize
, bytesToVector
) where
import Control.Monad (replicateM_)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.ByteString.Internal (ByteString(..))
import Data.Int (Int8, Int16, Int32, Int64)
import Data.STRef
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Word (Word8)
import Foreign.ForeignPtr (castForeignPtr)
import Foreign.Storable (sizeOf)
import qualified Data.Vector as V
import qualified Data.Vector.Generic as G
import qualified Data.Vector.Generic.Mutable as M
import qualified Data.Vector.Unboxed as U
import qualified Data.Vector.Storable as S
class Collection a ~ c a => Writable c a where
javaType :: a -> Text
decoder :: Decoder (c a)
data Decoder a = Variable (ByteString -> U.Vector Int -> a)
| LE16 (ByteString -> a)
| LE32 (ByteString -> a)
| LE64 (ByteString -> a)
| BE16 (ByteString -> a)
| BE32 (ByteString -> a)
| BE64 (ByteString -> a)
deriving (Functor)
type family Collection a
type instance Collection () = U.Vector ()
type instance Collection Int16 = U.Vector Int16
type instance Collection Int32 = U.Vector Int32
type instance Collection Int64 = U.Vector Int64
type instance Collection Float = U.Vector Float
type instance Collection Double = U.Vector Double
type instance Collection Text = V.Vector Text
type instance Collection ByteString = V.Vector ByteString
instance Writable U.Vector () where
javaType _ = "org.apache.hadoop.io.NullWritable"
decoder = Variable $ \_ ls -> U.replicate (U.length ls) ()
instance Writable U.Vector Int16 where
javaType _ = "org.apache.hadoop.io.ShortWritable"
decoder = BE16 bytesToVector
instance Writable U.Vector Int32 where
javaType _ = "org.apache.hadoop.io.IntWritable"
decoder = BE32 bytesToVector
instance Writable U.Vector Int64 where
javaType _ = "org.apache.hadoop.io.LongWritable"
decoder = BE64 bytesToVector
instance Writable U.Vector Float where
javaType _ = "org.apache.hadoop.io.FloatWritable"
decoder = BE32 bytesToVector
instance Writable U.Vector Double where
javaType _ = "org.apache.hadoop.io.DoubleWritable"
decoder = BE64 bytesToVector
instance Writable V.Vector ByteString where
javaType _ = "org.apache.hadoop.io.BytesWritable"
decoder = Variable $ \bs ls -> split (B.drop 4) bs ls
instance Writable V.Vector Text where
javaType _ = "org.apache.hadoop.io.Text"
decoder = Variable $ \bs ls -> split go bs ls
where
go bs' | B.null bs' = T.empty
| otherwise = T.decodeUtf8 $ B.drop (vintSize (B.head bs')) bs'
bytesToVector :: forall a. (S.Storable a, U.Unbox a) => ByteString -> U.Vector a
bytesToVector (PS bs off nbytes) = U.convert
$ S.unsafeFromForeignPtr (castForeignPtr bs) off
$ nbytes `div` sizeOf (undefined :: a)
split :: G.Vector v a => (ByteString -> a) -> ByteString -> U.Vector Int -> v a
split f bs lens = G.create $ do
let numRecs = U.length lens
v <- M.unsafeNew numRecs
offRef <- newSTRef 0
idxRef <- newSTRef 0
replicateM_ numRecs $ do
idx <- readSTRef idxRef
off <- readSTRef offRef
let len = U.unsafeIndex lens idx
slice = unsafeSlice off len bs
writeSTRef idxRef (idx + 1)
writeSTRef offRef (off + len)
M.unsafeWrite v idx (f slice)
return v
unsafeSlice :: Int -> Int -> ByteString -> ByteString
unsafeSlice off len (PS ptr poff _) = PS ptr (poff + off) len
vintSize :: Word8 -> Int
vintSize = go . fromIntegral
where
go :: Int8 -> Int
go x | x >= 112 = 1
| x < 120 = fromIntegral (119 x)
| otherwise = fromIntegral (111 x)