{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-|
Module      : Frames.Streamly.InCore
Description : Support for transformations between streamly streams and Frames Array of Structures (SoA).
Copyright   : (c) Adam Conner-Sax 2020
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

This module can be used in-place of Frames.InCore.  The pipes functions are all replaced by equivalent streamly functions.
Relevant classes and type-families are re-exported for convenience.
-}
module Frames.Streamly.InCore
    (
      inCoreSoA
    , inCoreAoS
    , inCoreAoS'
    , inCoreSoA_F
    , inCoreAoS_F
    , inCoreAoS'_F
    -- * Re-exports from Frames
    , toAoS
    , VectorFor
    , VectorMFor
    , VectorMs
    , Vectors
    , RecVec(..)

    )
where

import qualified Streamly                               as Streamly
import qualified Streamly.Prelude                       as Streamly
import qualified Streamly.Data.Fold                     as Streamly.Fold
import qualified Streamly.Internal.Data.Fold            as Streamly.Fold

import qualified Control.Monad.Primitive                as Prim

import qualified Data.Vinyl                             as Vinyl

import qualified Frames                                 as Frames
import qualified Frames.InCore                          as Frames
import           Frames.InCore                           (VectorFor, VectorMFor, VectorMs, Vectors, RecVec(..), toAoS)

import Data.Proxy (Proxy(..))


-- | Fold a stream of 'Vinyl' records into SoA (Structure-of-Arrays) form.
-- Here as a 'streamly' fold, so it may be deployed along with other folds or on only part of a stream.
inCoreSoA_F :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.Fold.Fold m (Frames.Record rs) (Int, Vinyl.Rec (((->) Int) Frames.:. Frames.ElField) rs)
inCoreSoA_F = Streamly.Fold.mkFold feed initial fin
  where feed (!i, !sz, !mvs') row
          | i == sz = Frames.growRec (Proxy::Proxy rs) mvs'
                      >>= flip feed row . (i, sz*2,)
          | otherwise = do Frames.writeRec (Proxy::Proxy rs) i mvs' row
                           return (i+1, sz, mvs')

        initial = do
          mvs <- Frames.allocRec (Proxy :: Proxy rs) Frames.initialCapacity
          return (0, Frames.initialCapacity, mvs)

        fin (n, _, mvs') =
          do vs <- Frames.freezeRec (Proxy::Proxy rs) n mvs'
             return . (n,) $ Frames.produceRec (Proxy::Proxy rs) vs
{-# INLINE inCoreSoA_F #-}

-- | Perform the 'inCoreSoA_F' fold on a stream of records.
inCoreSoA :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.SerialT m (Frames.Record rs)
          -> m (Int, Vinyl.Rec (((->) Int) Frames.:. Frames.ElField) rs)
inCoreSoA = Streamly.fold inCoreSoA_F
{-# INLINE inCoreSoA #-}

-- | Fold a stream of 'Vinyl' records into AoS (Array-of-Structures) form.
inCoreAoS_F :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.Fold.Fold m (Frames.Record rs) (Frames.FrameRec rs)
inCoreAoS_F = fmap (uncurry Frames.toAoS) inCoreSoA_F
{-# INLINE inCoreAoS_F #-}

-- | Perform the 'inCoreAoS_F' fold on a stream of records.
inCoreAoS :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.SerialT m (Frames.Record rs)
          -> m (Frames.FrameRec rs)
inCoreAoS = Streamly.fold inCoreAoS_F --fmap (uncurry Frames.toAoS) . inCoreSoA
{-# INLINE inCoreAoS #-}


-- | More general AoS fold, allowing for a, possible column changing, transformation of the records while in SoA form.
inCoreAoS'_F ::  forall ss rs m. (Prim.PrimMonad m, Frames.RecVec rs)
           => (Frames.Rec ((->) Int Frames.:. Frames.ElField) rs -> Frames.Rec ((->) Int Frames.:. Frames.ElField) ss)
           -> Streamly.Fold.Fold m (Frames.Record rs) (Frames.FrameRec ss)
inCoreAoS'_F f  = fmap (uncurry Frames.toAoS . aux) inCoreSoA_F
  where aux (x,y) = (x, f y)
{-# INLINE inCoreAoS'_F #-}

-- | Perform the more general AoS fold on a stream of records.
inCoreAoS' ::  forall ss rs m. (Prim.PrimMonad m, Frames.RecVec rs)
           => (Frames.Rec ((->) Int Frames.:. Frames.ElField) rs -> Frames.Rec ((->) Int Frames.:. Frames.ElField) ss)
           -> Streamly.SerialT m (Frames.Record rs)
           -> m (Frames.FrameRec ss)
inCoreAoS' f = Streamly.fold (inCoreAoS'_F f)
{-# INLINE inCoreAoS' #-}