data DDS a= Loggable a => DDS  (Cloud (PartRef a))

data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)

data Partition a=  Part Node Path Save a deriving (Typeable,Read,Show)

type Save= Bool

instance Indexable (Partition a) where

    key (Part _ string b _)= keyp string b

keyp s True= "PartP@"++s :: String

keyp s False="PartT@"++s

instance Loggable a => IResource (Partition a) where

    keyResource= key

    readResourceByKey k=  r


      typePart :: IO (Maybe a) -> a

      typePart = undefined

      r =  if k !! 4 /= 'P' then return Nothing   else

            defaultReadByKey  (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)

    writeResource (s@(Part _ _ save _))=

          unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)

eval :: DDS a -> Cloud (PartRef a)

eval (DDS mx) =  mx

type Path=String

instance F.Foldable DVU.Vector where

  {-# INLINE foldr #-}

  foldr = foldr

  {-# INLINE foldl #-}

  foldl = foldl

  {-# INLINE foldr1 #-}

  foldr1 = foldr1

  {-# INLINE foldl1 #-}

  foldl1 = foldl1

--foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b

--foldlIt' f z0 xs= V.foldr f' id xs z0

--      where f' x k z = k $! f z x


--foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a

--foldlIt1 f xs = fromMaybe (error "foldl1: empty structure")

--                    (V.foldl mf Nothing xs)

--      where

--        mf m y = Just (case m of

--                         Nothing -> y

--                         Just x  -> f x y)

class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where

   singleton :: a -> c a

   splitAt :: Int -> c a -> (c a, c a)

   fromList :: [a] -> c a

instance (Loggable a) => Distributable DV.Vector a where

   singleton = DV.singleton

   splitAt= DV.splitAt

   fromList = DV.fromList

instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where

   singleton= DVU.singleton

   splitAt= DVU.splitAt

   fromList= DVU.fromList

-- | perform a map and partition the result with different keys using boxed vectors

-- The final result will be used by reduce.

mapKeyB :: (Loggable a, Loggable b,  Loggable k,Ord k)

     => (a -> (k,b))

     -> DDS  (DV.Vector a)

     -> DDS (M.Map k(DV.Vector b))

mapKeyB= mapKey

-- | perform a map and partition the result with different keys using unboxed vectors

-- The final result will be used by reduce.

mapKeyU :: (Loggable a, DVU.Unbox a, Loggable b, DVU.Unbox b,  Loggable k,Ord k)

     => (a -> (k,b))

     -> DDS  (DVU.Vector a)

     -> DDS (M.Map k(DVU.Vector b))

mapKeyU= mapKey

-- | perform a map and partition the result with different keys.

-- The final result will be used by reduce.

mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k)

     => (a -> (k,b))

     -> DDS  (vector a)

     -> DDS (M.Map k (vector b))

mapKey f (DDS mx)= DDS $ loggedc $  do

        refs <-  mx

        process refs                            -- !> ("process",refs)


--  process ::  Partition a -> Cloud [Partition b]

  process  (ref@(Ref node path sav))= runAt node $ local $ do

              xs <- getPartitionData ref         -- !> ("CMAP", ref,node)

              (generateRef  $ map1 f xs)

--  map1 :: (Ord k, F.Foldable vector) => (a -> (k,b)) -> vector a -> M.Map k(vector b)

  map1 f v=  F.foldl' f1  M.empty v


     f1 map x=

           let (k,r) = f x

           in M.insertWith (<>) k (Transient.MapReduce.singleton r) map

data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show)

boxids= unsafePerformIO $ newIORef 0

reduce ::  (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a)

             => (a -> a -> a) -> DDS (M.Map k (vector a)) ->Cloud (M.Map k a)

reduce red  (dds@(DDS mx))= loggedc $ do

   mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n')

   nodes <- local getEqualNodes

   let lengthNodes = length nodes

       shuffler nodes = do

          localIO $ threadDelay 100000

          ref@(Ref node path sav) <- mx     -- return the resulting blocks of the map

          runAt node $ foldAndSend node nodes ref


--     groupByDestiny :: (Hashable k, Distributable vector a)  => M.Map k (vector a) -> M.Map Int [(k ,vector a)]

       groupByDestiny  map =  M.foldlWithKey' f M.empty  map


--              f ::  M.Map Int [(k ,vector a)] -> k -> vector a -> M.Map Int [(k ,vector a)]

              f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map

              hash1 k= abs $ hash k `rem` length nodes

--           foldAndSend :: (Hashable k, Distributable vector a)=> (Int,[(k,vector a)]) -> Cloud ()

       foldAndSend node nodes ref=  do

             pairs <- onAll $ getPartitionData1 ref

                        <|>  return (error $ "DDS computed out of his node:"++ show ref    )

             let mpairs = groupByDestiny pairs

             length <- local . return $ M.size mpairs

             let port2= nodePort node

             if  length == 0 then sendEnd   nodes else do

                 nsent <-  onAll $ liftIO $ newMVar 0

                 (i,folded) <- local $ parallelize foldthem (M.assocs  mpairs)

                 n <- localIO  $ modifyMVar nsent $ \r -> return (r+1, r+1)

                 (runAt (nodes !! i) $  local $ putMailbox' mboxid (Reduce folded))

                                                     !> ("send",n,length,i,folded)

--                 return () !> (port,n,length)

                 when (n == length) $ sendEnd   nodes



             foldthem (i,kvs)=  async . return

                                $ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)

       sendEnd  nodes   =  onNodes nodes $ local

                            $  putMailbox'  mboxid (EndReduce `asTypeOf` paramOf dds)

--                                                  !> ("send ENDREDUCE ", port))

       onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f) nodes

       sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f) nodes

       reducer nodes=   sumNodes nodes reduce1    -- a reduce1 process in each node, get the results and mappend them

--     reduce :: (Ord k)  => Cloud (M.Map k v)

       reduce1 = local $ do

           reduceResults <- liftIO $ newMVar M.empty

           numberSent    <- liftIO $ newMVar 0

           minput <- getMailbox' mboxid  -- get the chunk once it arrives to the mailbox

           case minput  of

             EndReduce -> do

                n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r')

                if n == lengthNodes

--                                              !> ("END REDUCE RECEIVED",n, lengthNodes)

                 then do

                    cleanMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)

                    r <- liftIO $ readMVar reduceResults

                    return r

                 else stop

             Reduce kvs ->  do

                let addIt (k,inp) = do

                        let input= inp `asTypeOf` atype dds

                        liftIO $ modifyMVar_ reduceResults

                               $ \map -> do

                                  let maccum =  M.lookup k map

                                  return $ M.insert k (case maccum of

                                    Just accum ->  red input accum

                                    Nothing    ->  input) map 

                mapM addIt  (kvs `asTypeOf` paramOf' dds)

                                                                     !> ("Received Reduce",kvs)


   reducer  nodes  <|>  shuffler nodes


     atype ::DDS(M.Map k (vector a)) ->  a

     atype = undefined -- type level

     paramOf  :: DDS (M.Map k (vector a)) -> ReduceChunk [( k,  a)]

     paramOf = undefined -- type level

     paramOf'  :: DDS (M.Map k (vector a)) ->  [( k,  a)]

     paramOf' = undefined -- type level

-- parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b

parallelize f xs =  foldr (<|>) empty $ map f xs

mparallelize f xs =  loggedc $ foldr (<>) mempty $ map f xs

getPartitionData :: Loggable a => PartRef a   -> TransIO  a

getPartitionData (Ref node path save)  = Transient $ do

    mp <- (liftIO $ atomically

                       $ readDBRef

                       $ getDBRef

                       $ keyp path save)

                  `onNothing` error ("not found DDS data: "++ keyp path save)

    case mp of

       (Part _ _ _ xs) -> return $ Just xs

getPartitionData1 :: Loggable a => PartRef a   -> TransIO  a

getPartitionData1 (Ref node path save)  = Transient $ do

    mp <- liftIO $ atomically

                  $ readDBRef

                  $ getDBRef

                  $ keyp path save

    case mp of

      Just (Part _ _ _ xs) -> return $ Just xs

      Nothing -> return Nothing

getPartitionData2 :: Loggable a => PartRef a   -> IO  a

getPartitionData2 (Ref node path save)  =  do

    mp <- ( atomically

                       $ readDBRef

                       $ getDBRef

                       $ keyp path save)

                  `onNothing` error ("not found DDS data: "++ keyp path save)

    case mp of

       (Part _ _ _ xs) -> return  xs

-- en caso de fallo de Node, se lanza un clustered en busca del path

--   si solo uno lo tiene, se copia a otro

--   se pone ese nodo de referencia en Part

runAtP :: Loggable a => Node  -> (Path -> IO a) -> Path -> Cloud a

runAtP node f uuid= do

   r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError

   case r of

     SLast r -> return r

     SError e -> do

         nodes <-  mclustered $ search uuid

         when(length nodes < 1) $ asyncDuplicate node uuid

         runAtP ( head nodes) f uuid

search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid

asyncDuplicate node uuid= do

    forkTo node

    nodes <- onAll getEqualNodes

    let node'= head $ nodes \\ [node]

    content <- onAll . liftIO $ readFile uuid

    runAt node' $ local $ liftIO $ writeFile uuid content

sendAnyError :: SomeException -> IO (StreamData a)

sendAnyError e= return $ SError  e

-- | distribute a vector of values among many nodes.

-- If the vector is static and sharable, better use the get* primitives

-- since each node will load the data independently.

distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a)

distribute = DDS . distribute'

distribute' xs= loggedc $  do

   nodes <- local getEqualNodes                                        -- !> "DISTRIBUTE"

   let lnodes = length nodes

   let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n

       xss= split size lnodes 1 xs                                     -- !> size

   r <- distribute'' xss nodes

   return r


   split n s s' xs | s==s' = [xs]

   split n s s' xs=

      let (h,t)= Transient.MapReduce.splitAt n xs

      in h : split n s (s'+1) t

distribute'' :: (Loggable a, Distributable vector a)

             => [vector a] -> [Node] -> Cloud (PartRef (vector a))

distribute'' xss nodes =

   parallelize  move $ zip nodes xss   -- !> show xss


   move (node, xs)=  runAt node $ local $ do

                        par <- generateRef  xs

                        return  par

          --   !> ("move", node,xs)

-- | input data from a text that must be static and shared by all the nodes.

-- The function parameter partition the text in words

getText  :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)

getText part str= DDS $ loggedc $ do

   nodes <- local getEqualNodes                                        -- !> "getText"

   let lnodes = length nodes

   parallelize  (process lnodes)  $ zip nodes [0..lnodes-1]


   process lnodes (node,i)= 

      runAt node $ local $ do

            let xs = part str

                size= case length xs `div` lnodes of 0 ->1 ; n -> n

                xss= Transient.MapReduce.fromList $

                       if i== lnodes-1 then drop (i* size) xs else  take size $ drop  (i *  size) xs

            generateRef  xss

-- | get the worlds of an URL

textUrl :: String -> DDS (DV.Vector Text.Text)

textUrl= getUrl  (map Text.pack . words)

-- | generate a DDS from the content of a URL.

-- The first parameter is a function that divide the text in words

getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)

getUrl partitioner url= DDS $ do

   nodes <- local getEqualNodes                                        -- !> "DISTRIBUTE"

   let lnodes = length nodes

   parallelize  (process lnodes)  $ zip nodes [0..lnodes-1]    -- !> show xss


   process lnodes (node,i)=  runAt node $ local $ do

                        r <- liftIO . simpleHTTP $ getRequest url

                        body <- liftIO $  getResponseBody r

                        let xs = partitioner body

                            size= case length xs `div` lnodes of 0 ->1 ; n -> n

                            xss= Transient.MapReduce.fromList $

                                  if i== lnodes-1 then drop (i* size) xs else  take size $ drop  (i *  size) xs


                        generateRef  xss

-- | get the words of a file

textFile ::  String -> DDS (DV.Vector Text.Text)

textFile= getFile (map Text.pack . words)

-- | generate a DDS from a file. All the nodes must access the file with the same path

-- the first parameter is the parser that generates elements from the content

getFile :: (Loggable a, Distributable vector a) => (String -> [a]) ->  String -> DDS (vector a)

getFile partitioner file= DDS $ do

   nodes <- local getEqualNodes                                        -- !> "DISTRIBUTE"

   let lnodes = length nodes

   parallelize  (process lnodes) $ zip nodes [0..lnodes-1]    -- !> show xss


   process lnodes (node, i)=  runAt node $ local $ do

                        content <-  do

                              c <- liftIO $ readFile file

                              length c `seq` return c

                        let xs = partitioner  content


                            size= case length xs `div` lnodes of 0 ->1 ; n -> n

                            xss= Transient.MapReduce.fromList $

                                   if i== lnodes-1 then drop (i* size) xs else  take size $ drop  (i *  size) xs


                        generateRef    xss

generateRef :: Loggable a =>  a -> TransIO (PartRef a)

generateRef  x=  do

    node <- getMyNode

    liftIO $ do

       temp <- getTempName

       let reg=  Part node temp False  x

       atomically $ newDBRef reg

--       syncCache

       (return $ getRef reg)     -- !> ("generateRef",reg,node)

getRef (Part n t s x)= Ref n t s

getTempName :: IO String

getTempName=  ("DDS" ++) <$> replicateM  5 (randomRIO ('a','z'))

-- | produce a stream of DDS's that can be map-reduced. Similar to spark streams.

-- each interval of time,a new DDS is produced.(to be tested)


  :: (Loggable a, Distributable vector a) =>

     Integer -> IO (StreamData a) -> DDS (vector a)

streamDDS time io= DDS $ do

     xs <- local . groupByTime time $ do

               r <- parallel io

               case r of

                    SDone -> empty

                    SLast x -> return x

                    SMore x -> return x

                    SError e -> error $ show e

     distribute'  $ Transient.MapReduce.fromList xs
