{-# LANGUAGE  ExistentialQuantification, DeriveDataTypeable
, FlexibleInstances, MultiParamTypeClasses, OverloadedStrings, CPP #-}


module Transient.MapReduce
(
Distributable(..),distribute, getText,
getUrl, getFile,textUrl, textFile,
mapKeyB, mapKeyU, reduce,eval,
PartRef)
 where

#ifdef ghcjs_HOST_OS
import Transient.Base
import Transient.Move hiding (pack)
import Transient.Logged
-- dummy Transient.MapReduce module,
reduce _ _ = local stop :: Loggable a => Cloud a
mapKeyB _ _= undefined
mapKeyU _ _= undefined
distribute _ = undefined
getText _ _ = undefined
textFile _ = undefined
getUrl _ _ = undefined
textUrl _ = undefined
getFile _ _ = undefined
eval _= local stop

data DDS= DDS
class Distributable
data PartRef a=PartRef a

#else

import Transient.Internals

import Transient.Move hiding (pack)
import Transient.Logged
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.IO.Class

import System.IO
import Control.Monad
import Data.Monoid
import Data.Maybe

import Data.Typeable
import Data.List hiding (delete, foldl')
import Control.Exception
import Control.Concurrent
--import Data.Time.Clock
import Network.HTTP
import Data.TCache hiding (onNothing)
import Data.TCache.Defs

import Data.ByteString.Lazy.Char8 (pack,unpack)
import Control.Monad.STM
import qualified Data.Map.Strict as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
import qualified Data.Vector as DV
import Data.Hashable
import System.IO.Unsafe

import qualified Data.Foldable as F
import qualified Data.Text as Text
import Data.IORef

data DDS a= Loggable a => DDS  (Cloud (PartRef a))
data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)
data Partition a=  Part Node Path Save a deriving (Typeable,Read,Show)
type Save= Bool


instance Indexable (Partition a) where
    key (Part _ string b _)= keyp string b



keyp s True= "PartP@"++s :: String
keyp s False="PartT@"++s

instance Loggable a => IResource (Partition a) where
    keyResource= key
    readResourceByKey k=  r
      where
      typePart :: IO (Maybe a) -> a
      typePart = undefined
      r =  if k !! 4 /= 'P' then return Nothing   else
            defaultReadByKey  (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)
    writeResource (s@(Part _ _ save _))=
          unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)


eval :: DDS a -> Cloud (PartRef a)
eval (DDS mx) =  mx


type Path=String


instance F.Foldable DVU.Vector where
  {-# INLINE foldr #-}
  foldr = foldr

  {-# INLINE foldl #-}
  foldl = foldl

  {-# INLINE foldr1 #-}
  foldr1 = foldr1

  {-# INLINE foldl1 #-}
  foldl1 = foldl1

--foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b
--foldlIt' f z0 xs= V.foldr f' id xs z0
--      where f' x k z = k $! f z x
--
--foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a
--foldlIt1 f xs = fromMaybe (error "foldl1: empty structure")
--                    (V.foldl mf Nothing xs)
--      where
--        mf m y = Just (case m of
--                         Nothing -> y
--                         Just x  -> f x y)

class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where
   singleton :: a -> c a
   splitAt :: Int -> c a -> (c a, c a)
   fromList :: [a] -> c a

instance (Loggable a) => Distributable DV.Vector a where
   singleton = DV.singleton
   splitAt= DV.splitAt
   fromList = DV.fromList

instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where
   singleton= DVU.singleton
   splitAt= DVU.splitAt
   fromList= DVU.fromList




-- | perform a map and partition the result with different keys using boxed vectors
-- The final result will be used by reduce.
mapKeyB :: (Loggable a, Loggable b,  Loggable k,Ord k)
     => (a -> (k,b))
     -> DDS  (DV.Vector a)
     -> DDS (M.Map k(DV.Vector b))
mapKeyB= mapKey

-- | perform a map and partition the result with different keys using unboxed vectors
-- The final result will be used by reduce.
mapKeyU :: (Loggable a, DVU.Unbox a, Loggable b, DVU.Unbox b,  Loggable k,Ord k)
     => (a -> (k,b))
     -> DDS  (DVU.Vector a)
     -> DDS (M.Map k(DVU.Vector b))
mapKeyU= mapKey

-- | perform a map and partition the result with different keys.
-- The final result will be used by reduce.
mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k)
     => (a -> (k,b))
     -> DDS  (vector a)
     -> DDS (M.Map k (vector b))
mapKey f (DDS mx)= DDS $ loggedc $  do
        refs <-  mx
        process refs                            -- !> ("process",refs)

  where
--  process ::  Partition a -> Cloud [Partition b]
  process  (ref@(Ref node path sav))= runAt node $ local $ do
              xs <- getPartitionData ref         -- !> ("CMAP", ref,node)
              (generateRef  $ map1 f xs)



--  map1 :: (Ord k, F.Foldable vector) => (a -> (k,b)) -> vector a -> M.Map k(vector b)
  map1 f v=  F.foldl' f1  M.empty v
     where
     f1 map x=
           let (k,r) = f x
           in M.insertWith (<>) k (Transient.MapReduce.singleton r) map



data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show)

boxids= unsafePerformIO $ newIORef 0

reduce ::  (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a)
             => (a -> a -> a) -> DDS (M.Map k (vector a)) ->Cloud (M.Map k a)

reduce red  (dds@(DDS mx))= loggedc $ do

   mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n')
   nodes <- local getNodes

   let lengthNodes = length nodes
       shuffler nodes = do

          ref@(Ref node path sav) <- mx     -- return the resulting blocks of the map

          runAt node $ foldAndSend node nodes ref

          stop

--     groupByDestiny :: (Hashable k, Distributable vector a)  => M.Map k (vector a) -> M.Map Int [(k ,vector a)]
       groupByDestiny  map =  M.foldlWithKey' f M.empty  map
              where
--              f ::  M.Map Int [(k ,vector a)] -> k -> vector a -> M.Map Int [(k ,vector a)]
              f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map
              hash1 k= abs $ hash k `rem` length nodes


--           foldAndSend :: (Hashable k, Distributable vector a)=> (Int,[(k,vector a)]) -> Cloud ()
       foldAndSend node nodes ref=  do

             pairs <- onAll $ getPartitionData1 ref
                        <|>  return (error $ "DDS computed out of his node:"++ show ref)
             let mpairs = groupByDestiny pairs

             length <- local . return $ M.size mpairs

             let port2= nodePort node


             if  length == 0 then sendEnd   nodes else do

                 nsent <-  onAll $ liftIO $ newMVar 0

                 (i,folded) <- local $ parallelize foldthem (M.assocs  mpairs)

                 n <- localIO  $ modifyMVar nsent $ \r -> return (r+1, r+1)

                 runAt (nodes !! i) $  local $ putMailbox' mboxid (Reduce folded)
--                                                     !> ("send",n,length,port,i,folded))

--                 return () !> (port,n,length)

                 when (n == length) $ sendEnd   nodes
                 empty

             where


             foldthem (i,kvs)=  async . return
                                $ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)


       sendEnd  nodes   =  onNodes nodes $ local
                            $  putMailbox'  mboxid (EndReduce `asTypeOf` paramOf dds)
--                                                  !> ("send ENDREDUCE ", port))

       onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f) nodes

       sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f) nodes

       reducer nodes=   sumNodes nodes reduce1    -- a reduce1 process in each node, get the results and mappend them

--     reduce :: (Ord k)  => Cloud (M.Map k v)

       reduce1 = local $ do
           reduceResults <- liftIO $ newMVar M.empty
           numberSent    <- liftIO $ newMVar 0

           minput <- getMailbox' mboxid  -- get the chunk once it arrives to the mailbox

           case minput  of

             EndReduce -> do

                n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r')


                if n == lengthNodes
--                                              !> ("END REDUCE RECEIVED",n, lengthNodes)
                 then do
                    cleanMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)
                    r <- liftIO $ readMVar reduceResults
                    return r

                 else stop

             Reduce kvs ->  do
                let addIt (k,inp) = do
                        let input= inp `asTypeOf` atype dds
                        liftIO $ modifyMVar_ reduceResults
                               $ \map -> do
                                  let maccum =  M.lookup k map
                                  return $ M.insert k (case maccum of
                                    Just accum ->  red input accum
                                    Nothing    ->  input) map

                mapM addIt  (kvs `asTypeOf` paramOf' dds)
                                                                   --  !> ("Received Reduce",kvs)
                stop


   reducer  nodes  <|>  shuffler nodes
   where
     atype ::DDS(M.Map k (vector a)) ->  a
     atype = undefined -- type level

     paramOf  :: DDS (M.Map k (vector a)) -> ReduceChunk [( k,  a)]
     paramOf = undefined -- type level
     paramOf'  :: DDS (M.Map k (vector a)) ->  [( k,  a)]
     paramOf' = undefined -- type level




--parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
parallelize f xs =  foldr (<|>) empty $ map f xs

mparallelize f xs =  loggedc $ foldr (<>) mempty $ map f xs





getPartitionData :: Loggable a => PartRef a   -> TransIO  a
getPartitionData (Ref node path save)  = Transient $ do
    mp <- (liftIO $ atomically
                       $ readDBRef
                       $ getDBRef
                       $ keyp path save)
                  `onNothing` error ("not found DDS data: "++ keyp path save)
    case mp of
       (Part _ _ _ xs) -> return $ Just xs

getPartitionData1 :: Loggable a => PartRef a   -> TransIO  a
getPartitionData1 (Ref node path save)  = Transient $ do
    mp <- liftIO $ atomically
                  $ readDBRef
                  $ getDBRef
                  $ keyp path save

    case mp of
      Just (Part _ _ _ xs) -> return $ Just xs
      Nothing -> return Nothing

getPartitionData2 :: Loggable a => PartRef a   -> IO  a
getPartitionData2 (Ref node path save)  =  do
    mp <- ( atomically
                       $ readDBRef
                       $ getDBRef
                       $ keyp path save)
                  `onNothing` error ("not found DDS data: "++ keyp path save)
    case mp of
       (Part _ _ _ xs) -> return  xs

-- en caso de fallo de Node, se lanza un clustered en busca del path
--   si solo uno lo tiene, se copia a otro
--   se pone ese nodo de referencia en Part
runAtP :: Loggable a => Node  -> (Path -> IO a) -> Path -> Cloud a
runAtP node f uuid= do
   r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError
   case r of
     SLast r -> return r
     SError e -> do
         nodes <-  mclustered $ search uuid
         when(length nodes < 1) $ asyncDuplicate node uuid
         runAtP ( head nodes) f uuid

search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid

asyncDuplicate node uuid= do
    forkTo node
    nodes <- onAll getNodes
    let node'= head $ nodes \\ [node]
    content <- onAll . liftIO $ readFile uuid
    runAt node' $ local $ liftIO $ writeFile uuid content

sendAnyError :: SomeException -> IO (StreamData a)
sendAnyError e= return $ SError  e


-- | distribute a vector of values among many nodes.
-- If the vector is static and sharable, better use the get* primitives
-- since each node will load the data independently.
distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a)
distribute = DDS . distribute'

distribute' xs= loggedc $  do
   nodes <- local getNodes                                        -- !> "DISTRIBUTE"
   let lnodes = length nodes
   let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n
       xss= split size lnodes 1 xs                                     -- !> size
   r <- distribute'' xss nodes
   return r
   where
   split n s s' xs | s==s' = [xs]
   split n s s' xs=
      let (h,t)= Transient.MapReduce.splitAt n xs
      in h : split n s (s'+1) t

distribute'' :: (Loggable a, Distributable vector a)
             => [vector a] -> [Node] -> Cloud (PartRef (vector a))
distribute'' xss nodes =
   parallelize  move $ zip nodes xss   -- !> show xss
   where
   move (node, xs)=  runAt node $ local $ do
                        par <- generateRef  xs
                        return  par
          --   !> ("move", node,xs)

-- | input data from a text that must be static and shared by all the nodes.
-- The function parameter partition the text in words
getText  :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getText part str= DDS $ loggedc $ do
   nodes' <- local getNodes                                        -- !> "getText"
   let nodes  = filter (not . isWebNode) nodes'
   let lnodes = length nodes

   parallelize  (process lnodes)  $ zip nodes [0..lnodes-1]
   where
   isWebNode node= "webnode" `elem` (map fst $ nodeServices node)

   process lnodes (node,i)= do
      runAt node $ local $ do
            let xs = part str
                size= case length xs `div` lnodes of 0 ->1 ; n -> n
                xss= Transient.MapReduce.fromList $
                       if i== lnodes-1 then drop (i* size) xs else  take size $ drop  (i *  size) xs
            generateRef  xss

-- | get the worlds of an URL
textUrl :: String -> DDS (DV.Vector Text.Text)
textUrl= getUrl  (map Text.pack . words)

-- | generate a DDS from the content of a URL.
-- The first parameter is a function that divide the text in words
getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getUrl partitioner url= DDS $ do
   nodes <- local getNodes                                        -- !> "DISTRIBUTE"
   let lnodes = length nodes

   parallelize  (process lnodes)  $ zip nodes [0..lnodes-1]    -- !> show xss
   where
   process lnodes (node,i)=  runAt node $ local $ do
                        r <- liftIO . simpleHTTP $ getRequest url
                        body <- liftIO $  getResponseBody r
                        let xs = partitioner body
                            size= case length xs `div` lnodes of 0 ->1 ; n -> n
                            xss= Transient.MapReduce.fromList $ take size $ drop  (i *  size) xs
                        generateRef  xss


-- | get the words of a file
textFile ::  String -> DDS (DV.Vector Text.Text)
textFile= getFile (map Text.pack . words)

-- | generate a DDS from a file. All the nodes must access the file with the same path
-- the first parameter is the parser that generates elements from the content
getFile :: (Loggable a, Distributable vector a) => (String -> [a]) ->  String -> DDS (vector a)
getFile partitioner file= DDS $ do
   nodes <- local getNodes                                        -- !> "DISTRIBUTE"
   let lnodes = length nodes

   parallelize  (process lnodes) $ zip nodes [0..lnodes-1]    -- !> show xss
   where
   process lnodes (node, i)=  runAt node $ local $ do
                        content <- liftIO $ readFile file
                        let xs = partitioner content
                            size= case length xs `div` lnodes of 0 ->1 ; n -> n
                            xss=Transient.MapReduce.fromList $ take size $ drop  (i *  size) xs  -- !> size
                        generateRef    xss



generateRef :: Loggable a =>  a -> TransIO (PartRef a)
generateRef  x=  do
    node <- getMyNode
    liftIO $ do
       temp <- getTempName
       let reg=  Part node temp False  x
       atomically $ newDBRef reg
--       syncCache
       (return $ getRef reg)     -- !> ("generateRef",reg,node)

getRef (Part n t s x)= Ref n t s

getTempName :: IO String
getTempName=  ("DDS" ++) <$> replicateM  5 (randomRIO ('a','z'))


-------------- Distributed  Datasource Streams ---------
-- | produce a stream of DDS's that can be map-reduced. Similar to spark streams.
-- each interval of time,a new DDS is produced.(to be tested)
streamDDS
  :: (Loggable a, Distributable vector a) =>
     Integer -> IO (StreamData a) -> DDS (vector a)
streamDDS time io= DDS $ do
     xs <- local . groupByTime time $ do
               r <- parallel io
               case r of
                    SDone -> empty
                    SLast x -> return x
                    SMore x -> return x
                    SError e -> error $ show e
     distribute'  $ Transient.MapReduce.fromList xs




#endif