module Frames.InCore where
import Control.Monad.Primitive
import Control.Monad.ST (runST)
import Data.Proxy
import Data.Text (Text)
import qualified Data.Vector as VB
import qualified Data.Vector.Generic as VG
import qualified Data.Vector.Generic.Mutable as VGM
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vinyl as V
import Data.Vinyl.Functor (Identity(..))
import Frames.Col
import Frames.Frame
import Frames.Rec
import Frames.RecF
#if __GLASGOW_HASKELL__ < 800
import GHC.Prim (RealWorld)
#endif
import qualified Pipes as P
import qualified Pipes.Prelude as P
type family VectorFor t :: * -> *
type instance VectorFor Bool = VU.Vector
type instance VectorFor Int = VU.Vector
type instance VectorFor Float = VU.Vector
type instance VectorFor Double = VU.Vector
type instance VectorFor String = VB.Vector
type instance VectorFor Text = VB.Vector
type VectorMFor a = VG.Mutable (VectorFor a)
initialCapacity :: Int
initialCapacity = 128
type family VectorMs m rs where
  VectorMs m '[] = '[]
  VectorMs m (s :-> a ': rs) =
    s :-> VectorMFor a (PrimState m) a ': VectorMs m rs
type family Vectors rs where
  Vectors '[] = '[]
  Vectors (s :-> a ': rs) = s :-> VectorFor a a ': Vectors rs
class RecVec rs where
  allocRec   :: PrimMonad m
             => proxy rs -> m (Record (VectorMs m rs))
  freezeRec  :: PrimMonad m
             => proxy rs -> Int -> Record (VectorMs m rs)
             -> m (Record (Vectors rs))
  growRec    :: PrimMonad m
             => proxy rs -> Record (VectorMs m rs) -> m (Record (VectorMs m rs))
  writeRec   :: PrimMonad m
             => proxy rs -> Int -> Record (VectorMs m rs) -> Record rs -> m ()
  indexRec   :: proxy rs -> Int -> Record (Vectors rs) -> Record rs
  produceRec :: proxy rs -> Record (Vectors rs) -> V.Rec ((->) Int) rs
instance RecVec '[] where
  allocRec _ = return Nil
  
  freezeRec _ _ V.RNil = return V.RNil
#if __GLASGOW_HASKELL__ < 800
  freezeRec _ _ x = case x of
#endif
  
  growRec _ V.RNil = return V.RNil
#if __GLASGOW_HASKELL__ < 800
  growRec _ x = case x of
#endif
  
  indexRec _ _ _ = V.RNil
  
  writeRec _ _ V.RNil V.RNil = return ()
#if __GLASGOW_HASKELL__ < 800
  writeRec _ _ x _ = case x of
#endif
  
  produceRec _ V.RNil = V.RNil
#if __GLASGOW_HASKELL__ < 800
  produceRec _ x = case x of
#endif
  
instance forall s a rs.
  (VGM.MVector (VectorMFor a) a,
   VG.Vector (VectorFor a) a,
   RecVec rs)
  => RecVec (s :-> a ': rs) where
  allocRec _ = (&:) <$> VGM.new initialCapacity <*> allocRec (Proxy::Proxy rs)
  
  freezeRec _ n (Identity (Col x) V.:& xs) =
    (&:) <$> (VG.unsafeFreeze $ VGM.unsafeSlice 0 n x)
         <*> freezeRec (Proxy::Proxy rs) n xs
#if __GLASGOW_HASKELL__ < 800
  freezeRec _ _ x = case x of
#endif
  
  growRec _ (Identity (Col x) V.:& xs) = (&:) <$> VGM.grow x (VGM.length x)
                                              <*> growRec (Proxy :: Proxy rs) xs
#if __GLASGOW_HASKELL__ < 800
  growRec _ x = case x of
#endif
  
  writeRec _ !i !(Identity (Col v) V.:& vs) (Identity (Col x) V.:& xs) =
    VGM.unsafeWrite v i x >> writeRec (Proxy::Proxy rs) i vs xs
#if __GLASGOW_HASKELL__ < 800
  writeRec _ _ _ x = case x of
#endif
  
  indexRec _ !i !(Identity (Col x) V.:& xs) =
    x VG.! i &: indexRec (Proxy :: Proxy rs) i xs
#if __GLASGOW_HASKELL__ < 800
  indexRec _ _ x = case x of
#endif
  
  produceRec _ (Identity (Col v) V.:& vs) = frameCons (v VG.!) $
                                            produceRec (Proxy::Proxy rs) vs
#if __GLASGOW_HASKELL__ < 800
  produceRec _ x = case x of
#endif
  
inCoreSoA :: forall m rs. (PrimMonad m, RecVec rs)
          => P.Producer (Record rs) m () -> m (Int, V.Rec ((->) Int) rs)
inCoreSoA xs =
  do mvs <- allocRec (Proxy :: Proxy rs)
     let feed (!i, !sz, !mvs') row
           | i == sz = growRec (Proxy::Proxy rs) mvs'
                       >>= flip feed row . (i, sz*2,)
           | otherwise = do writeRec (Proxy::Proxy rs) i mvs' row
                            return (i+1, sz, mvs')
         fin (n,_,mvs') =
           do vs <- freezeRec (Proxy::Proxy rs) n mvs'
              return . (n,) $ produceRec (Proxy::Proxy rs) vs
     P.foldM feed (return (0,initialCapacity,mvs)) fin xs
inCoreAoS :: (PrimMonad m, RecVec rs)
          => P.Producer (Record rs) m () -> m (FrameRec rs)
inCoreAoS = fmap (uncurry toAoS) . inCoreSoA
inCoreAoS' :: (PrimMonad m, RecVec rs)
           => (V.Rec ((->) Int) rs -> V.Rec ((->) Int) ss)
           -> P.Producer (Record rs) m () -> m (FrameRec ss)
inCoreAoS' f = fmap (uncurry toAoS . aux) . inCoreSoA
  where aux (x,y) = (x, f y)
toAoS :: Int -> V.Rec ((->) Int) rs -> FrameRec rs
toAoS n = Frame n . rtraverse (fmap Identity)
inCore :: forall m n rs. (PrimMonad m, RecVec rs, Monad n)
       => P.Producer (Record rs) m () -> m (P.Producer (Record rs) n ())
inCore xs =
  do mvs <- allocRec (Proxy :: Proxy rs)
     let feed (!i,!sz,!mvs') row
              | i == sz = growRec (Proxy::Proxy rs) mvs'
                          >>= flip feed row . (i, sz*2,)
              | otherwise = do writeRec (Proxy::Proxy rs) i mvs' row
                               return (i+1, sz, mvs')
         fin (n,_,mvs') =
           do vs <- freezeRec (Proxy::Proxy rs) n mvs'
              let spool !i
                    | i == n = pure ()
                    | otherwise = P.yield (indexRec Proxy i vs) >> spool (i+1)
              return $ spool 0
     P.foldM feed (return (0,initialCapacity,mvs)) fin xs
toFrame :: (P.Foldable f, RecVec rs) => f (Record rs) -> Frame (Record rs)
toFrame xs = runST $ inCoreAoS (P.each xs)
filterFrame :: RecVec rs => (Record rs -> Bool) -> FrameRec rs -> FrameRec rs
filterFrame p f = runST $ inCoreAoS $ P.each f P.>-> P.filter p