{-# LANGUAGE  ExistentialQuantification, DeriveDataTypeable
, FlexibleInstances, MultiParamTypeClasses, OverloadedStrings, CPP #-}


module Transient.MapReduce
(
Distributable(..),distribute, getText,
getUrl, getFile,textUrl, textFile,
mapKeyB, mapKeyU, reduce,eval,
PartRef)
 where

#ifdef ghcjs_HOST_OS
import Transient.Base
import Transient.Move hiding (pack)
import Transient.Logged
-- dummy Transient.MapReduce module,
reduce _ _ = local stop :: Loggable a => Cloud a
mapKeyB _ _= undefined
mapKeyU _ _= undefined
distribute _ = undefined
getText _ _ = undefined
textFile _ = undefined
getUrl _ _ = undefined
textUrl _ = undefined
getFile _ _ = undefined
eval _= local stop

data DDS= DDS
class Distributable
data PartRef a=PartRef a

#else

import Transient.Internals

import Transient.Move.Internals hiding (pack)
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.IO.Class

import Control.Monad
import Data.Monoid

import Data.Typeable
import Data.List hiding (delete, foldl')
import Control.Exception
import Control.Concurrent
--import Data.Time.Clock
import Network.HTTP
import Data.TCache hiding (onNothing)
import Data.TCache.Defs

import Data.ByteString.Lazy.Char8 (pack,unpack)
import qualified Data.Map.Strict as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
import qualified Data.Vector as DV
import Data.Hashable
import System.IO.Unsafe

import qualified Data.Foldable as F
import qualified Data.Text as Text
import Data.IORef

data DDS a= Loggable a => DDS  (Cloud (PartRef a))
data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)
data Partition a=  Part Node Path Save a deriving (Typeable,Read,Show)
type Save= Bool


instance Indexable (Partition a) where
    key (Part _ string b _)= keyp string b



keyp s True= "PartP@"++s :: String
keyp s False="PartT@"++s

instance Loggable a => IResource (Partition a) where
    keyResource= key
    readResourceByKey k=  r
      where
      typePart :: IO (Maybe a) -> a
      typePart = undefined
      r =  if k !! 4 /= 'P' then return Nothing   else
            defaultReadByKey  (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)
    writeResource (s@(Part _ _ save _))=
          unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)


eval :: DDS a -> Cloud (PartRef a)
eval (DDS mx) =  mx


type Path=String


instance F.Foldable DVU.Vector where
  {-# INLINE foldr #-}
  foldr = foldr

  {-# INLINE foldl #-}
  foldl = foldl

  {-# INLINE foldr1 #-}
  foldr1 = foldr1

  {-# INLINE foldl1 #-}
  foldl1 = foldl1

--foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b
--foldlIt' f z0 xs= V.foldr f' id xs z0
--      where f' x k z = k $! f z x
--
--foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a
--foldlIt1 f xs = fromMaybe (error "foldl1: empty structure")
--                    (V.foldl mf Nothing xs)
--      where
--        mf m y = Just (case m of
--                         Nothing -> y
--                         Just x  -> f x y)

class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where
   singleton :: a -> c a
   splitAt :: Int -> c a -> (c a, c a)
   fromList :: [a] -> c a

instance (Loggable a) => Distributable DV.Vector a where
   singleton = DV.singleton
   splitAt= DV.splitAt
   fromList = DV.fromList

instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where
   singleton= DVU.singleton
   splitAt= DVU.splitAt
   fromList= DVU.fromList




-- | perform a map and partition the result with different keys using boxed vectors
-- The final result will be used by reduce.
mapKeyB :: (Loggable a, Loggable b,  Loggable k,Ord k)
     => (a -> (k,b))
     -> DDS  (DV.Vector a)
     -> DDS (M.Map k(DV.Vector b))
mapKeyB= mapKey

-- | perform a map and partition the result with different keys using unboxed vectors
-- The final result will be used by reduce.
mapKeyU :: (Loggable a, DVU.Unbox a, Loggable b, DVU.Unbox b,  Loggable k,Ord k)
     => (a -> (k,b))
     -> DDS  (DVU.Vector a)
     -> DDS (M.Map k(DVU.Vector b))
mapKeyU= mapKey

-- | perform a map and partition the result with different keys.
-- The final result will be used by reduce.
mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k)
     => (a -> (k,b))
     -> DDS  (vector a)
     -> DDS (M.Map k (vector b))
mapKey f (DDS mx)= DDS $ loggedc $  do
        refs <-  mx
        process refs                            -- !> ("process",refs)

  where
--  process ::  Partition a -> Cloud [Partition b]
  process  (ref@(Ref node path sav))= runAt node $ local $ do
              xs <- getPartitionData ref         -- !> ("CMAP", ref,node)
              (generateRef  $ map1 f xs)



--  map1 :: (Ord k, F.Foldable vector) => (a -> (k,b)) -> vector a -> M.Map k(vector b)
  map1 f v=  F.foldl' f1  M.empty v
     where
     f1 map x=
           let (k,r) = f x
           in M.insertWith (<>) k (Transient.MapReduce.singleton r) map



data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show)

boxids= unsafePerformIO $ newIORef 0

reduce ::  (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a)
             => (a -> a -> a) -> DDS (M.Map k (vector a)) ->Cloud (M.Map k a)

reduce red  (dds@(DDS mx))= loggedc $ do

   mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n')
   nodes <- local getEqualNodes

   let lengthNodes = length nodes
       shuffler nodes = do
          localIO $ threadDelay 100000
          ref@(Ref node path sav) <- mx     -- return the resulting blocks of the map

          runAt node $ foldAndSend node nodes ref

          stop

--     groupByDestiny :: (Hashable k, Distributable vector a)  => M.Map k (vector a) -> M.Map Int [(k ,vector a)]
       groupByDestiny  map =  M.foldlWithKey' f M.empty  map
              where
--              f ::  M.Map Int [(k ,vector a)] -> k -> vector a -> M.Map Int [(k ,vector a)]
              f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map
              hash1 k= abs $ hash k `rem` length nodes


--           foldAndSend :: (Hashable k, Distributable vector a)=> (Int,[(k,vector a)]) -> Cloud ()
       foldAndSend node nodes ref=  do

             pairs <- onAll $ getPartitionData1 ref
                        <|>  return (error $ "DDS computed out of his node:"++ show ref)
             let mpairs = groupByDestiny pairs

             length <- local . return $ M.size mpairs

             let port2= nodePort node


             if  length == 0 then sendEnd   nodes else do

                 nsent <-  onAll $ liftIO $ newMVar 0

                 (i,folded) <- local $ parallelize foldthem (M.assocs  mpairs)

                 n <- localIO  $ modifyMVar nsent $ \r -> return (r+1, r+1)

                 runAt (nodes !! i) $  local $ putMailbox' mboxid (Reduce folded)
--                                                     !> ("send",n,length,port,i,folded))

--                 return () !> (port,n,length)

                 when (n == length) $ sendEnd   nodes
                 empty

             where


             foldthem (i,kvs)=  async . return
                                $ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)


       sendEnd  nodes   =  onNodes nodes $ local
                            $  putMailbox'  mboxid (EndReduce `asTypeOf` paramOf dds)
--                                                  !> ("send ENDREDUCE ", port))

       onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f) nodes

       sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f) nodes

       reducer nodes=   sumNodes nodes reduce1    -- a reduce1 process in each node, get the results and mappend them

--     reduce :: (Ord k)  => Cloud (M.Map k v)

       reduce1 = local $ do
           reduceResults <- liftIO $ newMVar M.empty
           numberSent    <- liftIO $ newMVar 0

           minput <- getMailbox' mboxid  -- get the chunk once it arrives to the mailbox

           case minput  of

             EndReduce -> do

                n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r')


                if n == lengthNodes
--                                              !> ("END REDUCE RECEIVED",n, lengthNodes)
                 then do
                    cleanMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)
                    r <- liftIO $ readMVar reduceResults
                    return r

                 else stop

             Reduce kvs ->  do
                let addIt (k,inp) = do
                        let input= inp `asTypeOf` atype dds
                        liftIO $ modifyMVar_ reduceResults
                               $ \map -> do
                                  let maccum =  M.lookup k map
                                  return $ M.insert k (case maccum of
                                    Just accum ->  red input accum
                                    Nothing    ->  input) map

                mapM addIt  (kvs `asTypeOf` paramOf' dds)
                                                                   --  !> ("Received Reduce",kvs)
                stop


   reducer  nodes  <|>  shuffler nodes
   where
     atype ::DDS(M.Map k (vector a)) ->  a
     atype = undefined -- type level

     paramOf  :: DDS (M.Map k (vector a)) -> ReduceChunk [( k,  a)]
     paramOf = undefined -- type level
     paramOf'  :: DDS (M.Map k (vector a)) ->  [( k,  a)]
     paramOf' = undefined -- type level




-- parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
parallelize f xs =  foldr (<|>) empty $ map f xs

mparallelize f xs =  loggedc $ foldr (<>) mempty $ map f xs





getPartitionData :: Loggable a => PartRef a   -> TransIO  a
getPartitionData (Ref node path save)  = Transient $ do
    mp <- (liftIO $ atomically
                       $ readDBRef
                       $ getDBRef
                       $ keyp path save)
                  `onNothing` error ("not found DDS data: "++ keyp path save)
    case mp of
       (Part _ _ _ xs) -> return $ Just xs

getPartitionData1 :: Loggable a => PartRef a   -> TransIO  a
getPartitionData1 (Ref node path save)  = Transient $ do
    mp <- liftIO $ atomically
                  $ readDBRef
                  $ getDBRef
                  $ keyp path save

    case mp of
      Just (Part _ _ _ xs) -> return $ Just xs
      Nothing -> return Nothing

getPartitionData2 :: Loggable a => PartRef a   -> IO  a
getPartitionData2 (Ref node path save)  =  do
    mp <- ( atomically
                       $ readDBRef
                       $ getDBRef
                       $ keyp path save)
                  `onNothing` error ("not found DDS data: "++ keyp path save)
    case mp of
       (Part _ _ _ xs) -> return  xs

-- en caso de fallo de Node, se lanza un clustered en busca del path
--   si solo uno lo tiene, se copia a otro
--   se pone ese nodo de referencia en Part
runAtP :: Loggable a => Node  -> (Path -> IO a) -> Path -> Cloud a
runAtP node f uuid= do
   r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError
   case r of
     SLast r -> return r
     SError e -> do
         nodes <-  mclustered $ search uuid
         when(length nodes < 1) $ asyncDuplicate node uuid
         runAtP ( head nodes) f uuid

search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid

asyncDuplicate node uuid= do
    forkTo node
    nodes <- onAll getEqualNodes
    let node'= head $ nodes \\ [node]
    content <- onAll . liftIO $ readFile uuid
    runAt node' $ local $ liftIO $ writeFile uuid content

sendAnyError :: SomeException -> IO (StreamData a)
sendAnyError e= return $ SError  e


-- | distribute a vector of values among many nodes.
-- If the vector is static and sharable, better use the get* primitives
-- since each node will load the data independently.
distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a)
distribute = DDS . distribute'

distribute' xs= loggedc $  do
   nodes <- local getEqualNodes                                        -- !> "DISTRIBUTE"
   let lnodes = length nodes
   let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n
       xss= split size lnodes 1 xs                                     -- !> size
   r <- distribute'' xss nodes
   return r
   where
   split n s s' xs | s==s' = [xs]
   split n s s' xs=
      let (h,t)= Transient.MapReduce.splitAt n xs
      in h : split n s (s'+1) t

distribute'' :: (Loggable a, Distributable vector a)
             => [vector a] -> [Node] -> Cloud (PartRef (vector a))
distribute'' xss nodes =
   parallelize  move $ zip nodes xss   -- !> show xss
   where
   move (node, xs)=  runAt node $ local $ do
                        par <- generateRef  xs
                        return  par
          --   !> ("move", node,xs)

-- | input data from a text that must be static and shared by all the nodes.
-- The function parameter partition the text in words
getText  :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getText part str= DDS $ loggedc $ do
   nodes' <- local getEqualNodes                                        -- !> "getText"
   let nodes  = filter (not . isWebNode) nodes'
   let lnodes = length nodes

   parallelize  (process lnodes)  $ zip nodes [0..lnodes-1]
   where
   isWebNode node= "webnode" `elem` (map fst $ nodeServices node)

   process lnodes (node,i)= do
      runAt node $ local $ do
            let xs = part str
                size= case length xs `div` lnodes of 0 ->1 ; n -> n
                xss= Transient.MapReduce.fromList $
                       if i== lnodes-1 then drop (i* size) xs else  take size $ drop  (i *  size) xs
            generateRef  xss

-- | get the worlds of an URL
textUrl :: String -> DDS (DV.Vector Text.Text)
textUrl= getUrl  (map Text.pack . words)

-- | generate a DDS from the content of a URL.
-- The first parameter is a function that divide the text in words
getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getUrl partitioner url= DDS $ do
   nodes <- local getEqualNodes                                        -- !> "DISTRIBUTE"
   let lnodes = length nodes

   parallelize  (process lnodes)  $ zip nodes [0..lnodes-1]    -- !> show xss
   where
   process lnodes (node,i)=  runAt node $ local $ do
                        r <- liftIO . simpleHTTP $ getRequest url
                        body <- liftIO $  getResponseBody r
                        let xs = partitioner body
                            size= case length xs `div` lnodes of 0 ->1 ; n -> n
                            xss= Transient.MapReduce.fromList $ take size $ drop  (i *  size) xs
                        generateRef  xss


-- | get the words of a file
textFile ::  String -> DDS (DV.Vector Text.Text)
textFile= getFile (map Text.pack . words)

-- | generate a DDS from a file. All the nodes must access the file with the same path
-- the first parameter is the parser that generates elements from the content
getFile :: (Loggable a, Distributable vector a) => (String -> [a]) ->  String -> DDS (vector a)
getFile partitioner file= DDS $ do
   nodes <- local getEqualNodes                                        -- !> "DISTRIBUTE"
   let lnodes = length nodes

   parallelize  (process lnodes) $ zip nodes [0..lnodes-1]    -- !> show xss
   where
   process lnodes (node, i)=  runAt node $ local $ do
                        content <- liftIO $ readFile file
                        let xs = partitioner content
                            size= case length xs `div` lnodes of 0 ->1 ; n -> n
                            xss=Transient.MapReduce.fromList $ take size $ drop  (i *  size) xs  -- !> size
                        generateRef    xss



generateRef :: Loggable a =>  a -> TransIO (PartRef a)
generateRef  x=  do
    node <- getMyNode
    liftIO $ do
       temp <- getTempName
       let reg=  Part node temp False  x
       atomically $ newDBRef reg
--       syncCache
       (return $ getRef reg)     -- !> ("generateRef",reg,node)

getRef (Part n t s x)= Ref n t s

getTempName :: IO String
getTempName=  ("DDS" ++) <$> replicateM  5 (randomRIO ('a','z'))


-------------- Distributed  Datasource Streams ---------
-- | produce a stream of DDS's that can be map-reduced. Similar to spark streams.
-- each interval of time,a new DDS is produced.(to be tested)
streamDDS
  :: (Loggable a, Distributable vector a) =>
     Integer -> IO (StreamData a) -> DDS (vector a)
streamDDS time io= DDS $ do
     xs <- local . groupByTime time $ do
               r <- parallel io
               case r of
                    SDone -> empty
                    SLast x -> return x
                    SMore x -> return x
                    SError e -> error $ show e
     distribute'  $ Transient.MapReduce.fromList xs




#endif