module Spark.Core.Internal.Projections(
ProjectReturn,
Project,
(//),
(/-),
_1,
_2,
StaticColProjection(..),
DynamicColProjection,
unsafeStaticProjection,
dynamicProjection,
) where
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Maybe(fromMaybe)
import Formatting
import Data.Text(Text)
import Spark.Core.Try
import Spark.Core.StructuresInternal
import Spark.Core.Internal.TypesStructures
import Spark.Core.Internal.ColumnFunctions
import Spark.Core.Internal.ColumnStructures
import Spark.Core.Internal.DatasetFunctions
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.Utilities
data StaticColProjection from to = StaticColProjection {
_staticProj :: SQLType from -> Try (FieldPath, SQLType to)
}
data DynamicColProjection = DynamicColProjection {
_dynProjTry :: DataType -> Try (FieldPath, DataType)
}
data FixedProjection1 = FixedProjection1
data FixedProjection2 = FixedProjection2
class Projection from proj to | from proj -> to where
_performProjection :: from -> proj -> to
(//) :: forall from proj. Project from proj => from -> proj -> ProjectReturn from proj
(//) = _performProject
(/-) :: forall from. Project from Text => from -> Text -> ProjectReturn from Text
(/-) = _performProject
type family ProjectReturn from proj where
ProjectReturn DataFrame DynamicColProjection = DynColumn
ProjectReturn DataFrame (StaticColProjection from to) = DynColumn
ProjectReturn DataFrame Text = DynColumn
ProjectReturn DynColumn DynamicColProjection = DynColumn
ProjectReturn DynColumn Text = DynColumn
ProjectReturn (Dataset (x1, x2)) FixedProjection1 = Column (x1, x2) x1
ProjectReturn (Dataset (x1, x2)) FixedProjection2 = Column (x1, x2) x2
ProjectReturn (Dataset x) DynamicColProjection = DynColumn
ProjectReturn (Dataset x) (StaticColProjection x y) = Column x y
ProjectReturn (Dataset x) Text = DynColumn
class MyString x where
convertToText :: x -> Text
instance (a ~ Text) => MyString a where
convertToText = id
class Project from proj where
_performProject :: from -> proj -> ProjectReturn from proj
instance Project DynColumn DynamicColProjection where
_performProject = projectDColDCol
instance Project DataFrame DynamicColProjection where
_performProject = projectDFDyn
instance forall a b. Project DataFrame (StaticColProjection a b) where
_performProject df proj = projectDFDyn df (colStaticProjToDynProj proj)
instance forall a b. Project (Dataset a) (StaticColProjection a b) where
_performProject = projectDsCol
instance forall a. Project (Dataset a) DynamicColProjection where
_performProject = projectDSDyn
instance Project DynColumn Text where
_performProject dc s =
let s' = T.unpack $ convertToText s
in _performProjection dc (stringToDynColProj s')
instance Project DataFrame Text where
_performProject df s =
let s' = T.unpack $ convertToText s
in projectDFDyn df (stringToDynColProj s')
instance Project (Dataset a) Text where
_performProject ds s =
let s' = T.unpack $ convertToText s
in projectDSDyn ds (stringToDynColProj s')
instance forall x1 x2. Project (Dataset (x1, x2)) FixedProjection1 where
_performProject ds _ = projectDsCol ds (StaticColProjection (_projectNthField 1))
instance forall x1 x2. Project (Dataset (x1, x2)) FixedProjection2 where
_performProject ds _ = projectDsCol ds (StaticColProjection (_projectNthField 2))
instance forall a to. Projection (Dataset a) (StaticColProjection a to) (Column a to) where
_performProjection = projectDsCol
instance forall a. Projection (Dataset a) DynamicColProjection DynColumn where
_performProjection = projectDSDyn
instance forall a . Projection (Dataset a) String DynColumn where
_performProjection ds s = projectDSDyn ds (stringToDynColProj s)
instance Projection DataFrame DynamicColProjection DynColumn where
_performProjection = projectDFDyn
instance forall a to. Projection DataFrame (StaticColProjection a to) DynColumn where
_performProjection df proj = projectDFDyn df (colStaticProjToDynProj proj)
instance Projection DataFrame String DynColumn where
_performProjection df s = projectDFDyn df (stringToDynColProj s)
instance forall ref a to. Projection (Column ref a) (StaticColProjection a to) (Column ref to) where
_performProjection = projectColCol
instance Projection DynColumn DynamicColProjection DynColumn where
_performProjection = projectDColDCol
instance forall a to. Projection DynColumn (StaticColProjection a to) DynColumn where
_performProjection dc proj = projectDColDCol dc (colStaticProjToDynProj proj)
instance Projection DynColumn String DynColumn where
_performProjection dc s = _performProjection dc (stringToDynColProj s)
_2 :: FixedProjection2
_2 = FixedProjection2
_1 :: FixedProjection1
_1 = FixedProjection1
unsafeStaticProjection :: forall from to. (HasCallStack) =>
SQLType from
-> String
-> StaticColProjection from to
unsafeStaticProjection sqlt field =
let
f = forceRight . fieldPath . T.pack $ field
sqlt' = fromMaybe
(failure $ sformat ("unsafeStaticProjection: Cannot find the field "%sh%" in type "%sh) field sqlt)
(extractPathUnsafe sqlt f)
f2 inSqlt = if inSqlt == sqlt
then pure (f, sqlt')
else tryError $ "Expected type " <> show' sqlt <> " but received type " <> show' inSqlt
in StaticColProjection f2
dynamicProjection :: String -> DynamicColProjection
dynamicProjection txt = case fieldPath (T.pack txt) of
Left msg -> DynamicColProjection $ \_ ->
tryError $ sformat ("dynamicProjection: invalid syntax for path "%shown%": "%shown) txt msg
Right fpath -> pathToDynColProj fpath
stringToDynColProj :: String -> DynamicColProjection
stringToDynColProj s =
let
fun dt =
case fieldPath (T.pack s) of
Right fp -> _dynProjTry (pathToDynColProj fp) dt
Left msg -> tryError (T.pack msg)
in DynamicColProjection fun
pathToDynColProj :: FieldPath -> DynamicColProjection
pathToDynColProj fpath =
let
fun dt = case extractPathUnsafe (SQLType dt) fpath of
Just (SQLType dt') -> pure (fpath, dt')
Nothing ->
tryError $ sformat ("unsafeStaticProjection: Cannot find the field "%shown%" in type "%shown) fpath dt
in DynamicColProjection fun
colStaticProjToDynProj :: forall from to. StaticColProjection from to -> DynamicColProjection
colStaticProjToDynProj (StaticColProjection fProj) =
DynamicColProjection $ \dt -> do
(fp, sqlt) <- fProj (SQLType dt)
let dt' = unSQLType sqlt
return (fp, dt')
projectDSDyn :: Dataset from -> DynamicColProjection -> DynColumn
projectDSDyn ds proj = do
(p, dt) <- _dynProjTry proj (unSQLType . nodeType $ ds)
colExtraction ds dt p
projectDFDyn :: DataFrame -> DynamicColProjection -> DynColumn
projectDFDyn df proj = do
node <- df
projectDSDyn node proj
projectDsCol :: (HasCallStack) => Dataset from -> StaticColProjection from to -> Column from to
projectDsCol ds proj = let (p, sqlt) = forceRight $ _staticProj proj (nodeType ds) in
iEmptyCol ds sqlt p
projectColCol :: Column ref from -> StaticColProjection from to -> Column ref to
projectColCol c (StaticColProjection fProj) =
let (fp, SQLType dt) = forceRight $ fProj (colType c)
in unsafeProjectCol c fp dt
projectColDynCol :: ColumnData ref a -> DynamicColProjection -> DynColumn
projectColDynCol cd proj =
_dynProjTry proj (_cType cd) <&> uncurry (unsafeProjectCol . dropColReference $ cd)
projectDColDCol :: DynColumn -> DynamicColProjection -> DynColumn
projectDColDCol c proj = do
cd <- c
projectColDynCol cd proj
_projectNthField :: Int -> SQLType a -> Try (FieldPath, SQLType b)
_projectNthField n (SQLType (StrictType (Struct (StructType v)))) =
let extractNth :: Int -> [StructField] -> Try (FieldPath, SQLType b)
extractNth 1 (f1 : _) =
pure (FieldPath . V.singleton . structFieldName $ f1, SQLType . structFieldType $ f1)
extractNth n' (_ : t) | n > 1 = extractNth (n'1) t
extractNth n' l = tryError $ "_projectNthField: n = "<>show' n'<>" l="<>show' l
in extractNth n (V.toList v)
_projectNthField _ sqlt = tryError $ "_1: Expected a struct, got " <> show' sqlt