{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}


module Spark.Core.Internal.OpFunctions(
  simpleShowOp,
  prettyShowOp,
  extraNodeOpData,
  hashUpdateNodeOp,
  prettyShowColOp,
  hdfsPath,
  updateSourceStamp,
  prettyShowColFun
) where

import qualified Data.Text as T
import qualified Data.Aeson as A
import qualified Data.Vector as V
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.HashMap.Strict as HM
import Data.Text(Text)
import Data.Aeson((.=), toJSON)
import Data.Char(isSymbol)
import qualified Crypto.Hash.SHA256 as SHA

import Spark.Core.Internal.OpStructures
import Spark.Core.Internal.Utilities
import Spark.Core.Try

-- (internal)
-- The serialized type of a node operation, as written in
-- the JSON description.
simpleShowOp :: NodeOp -> T.Text
simpleShowOp (NodeLocalOp op) = soName op
simpleShowOp (NodeDistributedOp op) = soName op
simpleShowOp (NodeLocalLit _ _) = "org.spark.LocalLiteral"
simpleShowOp (NodeOpaqueAggregator op) = soName op
simpleShowOp (NodeAggregatorReduction ua) =
  _jsonShowAggTrans . uaoInitialOuter $ ua
simpleShowOp (NodeAggregatorLocalReduction ua) = _jsonShowSGO . uaoMergeBuffer $ ua
simpleShowOp (NodeStructuredTransform _) = "org.spark.Select"
simpleShowOp (NodeDistributedLit _ _) = "org.spark.DistributedLiteral"
simpleShowOp (NodeGroupedReduction _) = "org.spark.GroupedReduction"
simpleShowOp (NodeReduction _) = "org.spark.Reduction"
simpleShowOp NodeBroadcastJoin = "org.spark.BroadcastJoin"
simpleShowOp (NodePointer _) = "org.spark.PlaceholderCache"

{-| A text representation of the operation that is appealing for humans.
-}
prettyShowOp :: NodeOp -> T.Text
prettyShowOp (NodeAggregatorReduction uao) =
  case uaoInitialOuter uao of
    OpaqueAggTransform so -> soName so
    -- Try to have a pretty name for the simple reductions
    InnerAggOp (AggFunction n _) -> n
    _ -> simpleShowOp (NodeAggregatorReduction uao)
prettyShowOp x = simpleShowOp x


-- A human-readable string that represents column operations.
prettyShowColOp :: ColOp -> T.Text
prettyShowColOp (ColExtraction fpath) = T.pack (show fpath)
prettyShowColOp (ColFunction txt cols) =
  prettyShowColFun txt (V.toList (prettyShowColOp <$> cols))
prettyShowColOp (ColLit _ cell) = show' cell
prettyShowColOp (ColStruct s) =
  "struct(" <> T.intercalate "," (prettyShowColOp . tfValue <$> V.toList s) <> ")"

{-| If the node is a reading operation, returns the HdfsPath of the source
that is going to be read.
-}
hdfsPath :: NodeOp -> Maybe HdfsPath
hdfsPath (NodeDistributedOp so) =
  if soName so == "org.spark.GenericDatasource"
  then case soExtra so of
    A.Object o -> case HM.lookup "inputPath" o of
      Just (A.String x) -> Just . HdfsPath $ x
      _ -> Nothing
    _ -> Nothing
  else Nothing
hdfsPath _ = Nothing

{-| Updates the input stamp if possible.

If the node cannot be updated, it is most likely a programming error: an error
is returned.
-}
updateSourceStamp :: NodeOp -> DataInputStamp -> Try NodeOp
updateSourceStamp (NodeDistributedOp so) (DataInputStamp dis) | soName so == "org.spark.GenericDatasource" =
  case soExtra so of
    A.Object o ->
      let extra' = A.Object $ HM.insert "inputStamp" (A.toJSON dis) o
          so' = so { soExtra = extra' }
      in pure $ NodeDistributedOp so'
    x -> tryError $ "updateSourceStamp: Expected dict, got " <> show' x
updateSourceStamp x _ =
  tryError $ "updateSourceStamp: Expected NodeDistributedOp, got " <> show' x

_jsonShowAggTrans :: AggTransform -> Text
_jsonShowAggTrans (OpaqueAggTransform op) = soName op
_jsonShowAggTrans (InnerAggOp _) = "org.spark.StructuredReduction"


_jsonShowSGO :: SemiGroupOperator -> Text
_jsonShowSGO (OpaqueSemiGroupLaw so) = soName so
_jsonShowSGO (UdafSemiGroupOperator ucn) = ucn
_jsonShowSGO (ColumnSemiGroupLaw sfn) = sfn


_prettyShowAggOp :: AggOp -> T.Text
_prettyShowAggOp (AggUdaf _ ucn fp) = ucn <> "(" <> show' fp <> ")"
_prettyShowAggOp (AggFunction sfn v) = prettyShowColFun sfn r where
  r = V.toList (show' <$> v)
_prettyShowAggOp (AggStruct v) =
  "struct(" <> T.intercalate "," (_prettyShowAggOp . afValue <$> V.toList v) <> ")"

_prettyShowAggTrans :: AggTransform -> Text
_prettyShowAggTrans (OpaqueAggTransform op) = soName op
_prettyShowAggTrans (InnerAggOp ao) = _prettyShowAggOp ao

_prettyShowSGO :: SemiGroupOperator -> Text
_prettyShowSGO (OpaqueSemiGroupLaw so) = soName so
_prettyShowSGO (UdafSemiGroupOperator ucn) = ucn
_prettyShowSGO (ColumnSemiGroupLaw sfn) = sfn

-- (internal)
-- The extra data associated with the operation, and that is required
-- by the backend to successfully perform the operation.
-- We pass the type as seen by Karps (along with some extra information about
-- nullability). This information is required by spark to analyze the exact
-- type of some operations.
extraNodeOpData :: NodeOp -> A.Value
extraNodeOpData (NodeLocalLit dt cell) =
  A.object [ "type" .= toJSON dt,
             "content" .= toJSON cell]
extraNodeOpData (NodeStructuredTransform st) = toJSON st
extraNodeOpData (NodeDistributedLit dt lst) =
  -- The backend deals with all the details translating the augmented type
  -- as a SQL datatype.
  A.object [ "cellType" .= toJSON dt,
             "content" .= toJSON lst]
extraNodeOpData (NodeDistributedOp so) = soExtra so
extraNodeOpData (NodeGroupedReduction ao) = toJSON ao
extraNodeOpData (NodeAggregatorReduction ua) =
  case uaoInitialOuter ua of
    OpaqueAggTransform so -> toJSON (soExtra so)
    InnerAggOp ao -> toJSON ao
extraNodeOpData (NodeOpaqueAggregator so) = soExtra so
extraNodeOpData (NodeLocalOp so) = soExtra so
extraNodeOpData NodeBroadcastJoin = A.Null
extraNodeOpData (NodeReduction _) = A.Null -- TODO: should it send something?
extraNodeOpData (NodeAggregatorLocalReduction _) = A.Null -- TODO: should it send something?
extraNodeOpData (NodePointer p) =
    A.object [
      "computation" .= toJSON (pointerComputation p),
      "path" .= toJSON (pointerPath p)
    ]

-- Adds the content of a node op to a hash.
-- Right now, this builds the json representation and passes it
-- to the hash function, which simplifies the verification on
-- on the server side.
-- TODO: this depends on some implementation details such as the hashing
-- function used by Aeson.
hashUpdateNodeOp :: SHA.Ctx -> NodeOp -> SHA.Ctx
hashUpdateNodeOp ctx op = _hashUpdateJson ctx $ A.object [
  "op" .= simpleShowOp op,
  "extra" .= extraNodeOpData op]


prettyShowColFun :: T.Text -> [Text] -> T.Text
prettyShowColFun txt [col] | _isSym txt =
  T.concat [txt, " ", col]
prettyShowColFun txt [col1, col2] | _isSym txt =
  -- This is not perfect for complex operations, but it should get the job done
  -- for now.
  -- TODO eventually use operator priority here
  T.concat [col1, " ", txt, " ", col2]
prettyShowColFun txt cols =
  let vals = T.intercalate ", " cols in
  T.concat [txt, "(", vals, ")"]

_isSym :: T.Text -> Bool
_isSym txt = all isSymbol (T.unpack txt)

-- This schema is not great because there is some ambiguity about the final
-- nodes.
-- Someone could craft a JSON that would confuse the object detection.
-- Not sure if this is much of a security risk anyway.
instance A.ToJSON ColOp where
  toJSON (ColExtraction fp) = A.object [
    "colOp" .= T.pack "extraction",
    "field" .= toJSON fp]
  toJSON (ColFunction txt cols) = A.object [
    "colOp" .= T.pack "fun",
    "function" .= txt,
    "args" .= (toJSON <$> cols)]
  toJSON (ColLit _ cell) = A.object [
    "colOp" .= T.pack "literal",
    "lit" .= toJSON cell]
  toJSON (ColStruct v) =
    let fun (TransformField fn colOp) =
          A.object ["name" .= T.pack (show fn), "op" .= toJSON colOp]
    in A.Array $ fun <$> v

-- instance A.ToJSON AggTransform where
--   toJSON (OpaqueAggTransform so) = A.object [
--       "aggOpaqueTrans" .= toJSON so
--     ]

instance A.ToJSON UdafApplication where
  toJSON Algebraic = toJSON (T.pack "algebraic")
  toJSON Complete = toJSON (T.pack "complete")

instance A.ToJSON AggField where
  toJSON (AggField fn aggOp) =
    A.object ["name" .= show' fn, "op" .= toJSON aggOp]

instance A.ToJSON AggOp where
  toJSON (AggUdaf ua ucn fp) = A.object [
    "aggOp" .= T.pack "udaf",
    "udafApplication" .= toJSON ua,
    "className" .= ucn,
    "field" .= toJSON fp]
  toJSON (AggFunction sfn v) = A.object [
    "aggOp" .= toJSON (T.pack "function"),
    "functionName" .= toJSON sfn,
    "fields" .= toJSON (V.toList v)]
  toJSON (AggStruct v) = toJSON (V.toList v)

_hashUpdateJson :: SHA.Ctx -> A.Value -> SHA.Ctx
_hashUpdateJson ctx val = SHA.update ctx bs where
  bs = BS.concat . LBS.toChunks . encodeDeterministicPretty $ val