module Spark.Core.Internal.OpFunctions(
simpleShowOp,
extraNodeOpData,
hashUpdateNodeOp,
prettyShowColOp,
) where
import qualified Data.Text as T
import qualified Data.Aeson as A
import qualified Data.Vector as V
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import Data.Text(Text)
import Data.Aeson((.=), toJSON)
import Data.Char(isSymbol)
import qualified Crypto.Hash.SHA256 as SHA
import Spark.Core.Internal.OpStructures
import Spark.Core.Internal.Utilities
simpleShowOp :: NodeOp -> T.Text
simpleShowOp (NodeLocalOp op) = soName op
simpleShowOp (NodeDistributedOp op) = soName op
simpleShowOp (NodeLocalLit _ _) = "org.spark.LocalConstant"
simpleShowOp (NodeOpaqueAggregator op) = soName op
simpleShowOp (NodeAggregatorReduction uao) =
case uaoInitialOuter uao of
OpaqueAggTransform so -> soName so
_ -> "org.spark.StructuredReduction"
simpleShowOp (NodeAggregatorLocalReduction ua) = _prettyShowSGO . uaoMergeBuffer $ ua
simpleShowOp (NodeStructuredTransform _) = "org.spark.Select"
simpleShowOp (NodeDistributedLit _ _) = "org.spark.Constant"
simpleShowOp (NodeGroupedReduction _) = "org.spark.GroupedReduction"
simpleShowOp (NodeReduction _) = "org.spark.Reduction"
simpleShowOp NodeBroadcastJoin = "org.spark.BroadcastJoin"
prettyShowColOp :: ColOp -> T.Text
prettyShowColOp (ColExtraction fpath) = T.pack (show fpath)
prettyShowColOp (ColFunction txt cols) =
_prettyShowColFun txt (V.toList (prettyShowColOp <$> cols))
prettyShowColOp (ColLit _ cell) = T.pack (show cell)
prettyShowColOp (ColStruct s) =
"struct(" <> T.intercalate "," (prettyShowColOp . tfValue <$> V.toList s) <> ")"
_prettyShowAggOp :: AggOp -> T.Text
_prettyShowAggOp (AggUdaf _ ucn fp) = ucn <> "(" <> show' fp <> ")"
_prettyShowAggOp (AggFunction sfn v) = _prettyShowColFun sfn r where
r = V.toList (show' <$> v)
_prettyShowAggOp (AggStruct v) =
"struct(" <> T.intercalate "," (_prettyShowAggOp . afValue <$> V.toList v) <> ")"
_prettyShowAggTrans :: AggTransform -> Text
_prettyShowAggTrans (OpaqueAggTransform op) = soName op
_prettyShowAggTrans (InnerAggOp ao) = _prettyShowAggOp ao
_prettyShowSGO :: SemiGroupOperator -> Text
_prettyShowSGO (OpaqueSemiGroupLaw so) = soName so
_prettyShowSGO (UdafSemiGroupOperator ucn) = ucn
_prettyShowSGO (ColumnSemiGroupLaw sfn) = sfn
extraNodeOpData :: NodeOp -> A.Value
extraNodeOpData (NodeLocalLit dt cell) =
A.object [ "type" .= toJSON dt,
"content" .= toJSON cell]
extraNodeOpData (NodeStructuredTransform st) = toJSON st
extraNodeOpData (NodeDistributedLit dt lst) =
A.object [ "cellType" .= toJSON dt,
"content" .= toJSON lst]
extraNodeOpData (NodeDistributedOp so) = soExtra so
extraNodeOpData (NodeGroupedReduction ao) = toJSON ao
extraNodeOpData (NodeAggregatorReduction ua) =
case uaoInitialOuter ua of
OpaqueAggTransform so -> toJSON (soExtra so)
InnerAggOp ao -> toJSON ao
extraNodeOpData _ = A.Null
hashUpdateNodeOp :: SHA.Ctx -> NodeOp -> SHA.Ctx
hashUpdateNodeOp ctx op = _hashUpdateJson ctx $ A.object [
"op" .= simpleShowOp op,
"extra" .= extraNodeOpData op]
_prettyShowColFun :: T.Text -> [Text] -> T.Text
_prettyShowColFun txt [col] | _isSym txt =
T.concat [txt, col]
_prettyShowColFun txt [col1, col2] | _isSym txt =
T.concat [col1, txt, col2]
_prettyShowColFun txt cols =
let vals = T.intercalate ", " cols in
T.concat [txt, "(", vals, ")"]
_isSym :: T.Text -> Bool
_isSym txt = all isSymbol (T.unpack txt)
instance A.ToJSON ColOp where
toJSON (ColExtraction fp) = A.object [
"colOp" .= T.pack "extraction",
"field" .= toJSON fp]
toJSON (ColFunction txt cols) = A.object [
"colOp" .= T.pack "fun",
"function" .= txt,
"args" .= (toJSON <$> cols)]
toJSON (ColLit _ cell) = A.object [
"colOp" .= T.pack "literal",
"lit" .= toJSON cell]
toJSON (ColStruct v) =
let fun (TransformField fn colOp) =
A.object ["name" .= T.pack (show fn), "op" .= toJSON colOp]
in A.Array $ fun <$> v
instance A.ToJSON UdafApplication where
toJSON Algebraic = toJSON (T.pack "algebraic")
toJSON Complete = toJSON (T.pack "complete")
instance A.ToJSON AggField where
toJSON (AggField fn aggOp) =
A.object ["name" .= show' fn, "op" .= toJSON aggOp]
instance A.ToJSON AggOp where
toJSON (AggUdaf ua ucn fp) = A.object [
"aggOp" .= T.pack "udaf",
"udafApplication" .= toJSON ua,
"className" .= ucn,
"field" .= toJSON fp]
toJSON (AggFunction sfn v) = A.object [
"aggOp" .= toJSON (T.pack "function"),
"functionName" .= toJSON sfn,
"fields" .= toJSON (V.toList v)]
toJSON (AggStruct v) = toJSON (V.toList v)
_hashUpdateJson :: SHA.Ctx -> A.Value -> SHA.Ctx
_hashUpdateJson ctx val = SHA.update ctx bs where
bs = BS.concat . LBS.toChunks . encodeDeterministicPretty $ val