{-# LANGUAGE ExistentialQuantification, DeriveDataTypeable , FlexibleInstances, MultiParamTypeClasses, OverloadedStrings, CPP #-} module Transient.MapReduce ( Distributable(..),distribute, getText, getUrl, getFile,textUrl, textFile, mapKeyB, mapKeyU, reduce,eval, --v* internals DDS(..),Partition(..),PartRef(..)) where #ifdef ghcjs_HOST_OS import Transient.Base import Transient.Move hiding (pack) import Transient.Logged -- dummy Transient.MapReduce module, reduce _ _ = local stop :: Loggable a => Cloud a mapKeyB _ _= undefined mapKeyU _ _= undefined distribute _ = undefined getText _ _ = undefined textFile _ = undefined getUrl _ _ = undefined textUrl _ = undefined getFile _ _ = undefined eval _= local stop data Partition data DDS= DDS class Distributable data PartRef a=PartRef a #else import Transient.Internals hiding (Ref) import Transient.Move.Internals hiding (pack) import Transient.Indeterminism import Control.Applicative import System.Random import Control.Monad.IO.Class import Control.Monad import Data.Monoid import Data.Typeable import Data.List hiding (delete, foldl') import Control.Exception import Control.Concurrent --import Data.Time.Clock import Network.HTTP import Data.TCache hiding (onNothing) import Data.TCache.Defs import Data.ByteString.Lazy.Char8 (pack,unpack) import qualified Data.Map.Strict as M import Control.Arrow (second) import qualified Data.Vector.Unboxed as DVU import qualified Data.Vector as DV import Data.Hashable import System.IO.Unsafe import qualified Data.Foldable as F import qualified Data.Text as Text import Data.IORef data DDS a= Loggable a => DDS (Cloud (PartRef a)) data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show) data Partition a= Part Node Path Save a deriving (Typeable,Read,Show) type Save= Bool instance Indexable (Partition a) where key (Part _ string b _)= keyp string b keyp s True= "PartP@"++s :: String keyp s False="PartT@"++s instance Loggable a => IResource (Partition a) where keyResource= key readResourceByKey k= r where typePart :: IO (Maybe a) -> a typePart = undefined r = if k !! 4 /= 'P' then return Nothing else defaultReadByKey (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack) writeResource (s@(Part _ _ save _))= unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s) eval :: DDS a -> Cloud (PartRef a) eval (DDS mx) = mx type Path=String instance F.Foldable DVU.Vector where {-# INLINE foldr #-} foldr = foldr {-# INLINE foldl #-} foldl = foldl {-# INLINE foldr1 #-} foldr1 = foldr1 {-# INLINE foldl1 #-} foldl1 = foldl1 --foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b --foldlIt' f z0 xs= V.foldr f' id xs z0 -- where f' x k z = k $! f z x -- --foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a --foldlIt1 f xs = fromMaybe (error "foldl1: empty structure") -- (V.foldl mf Nothing xs) -- where -- mf m y = Just (case m of -- Nothing -> y -- Just x -> f x y) class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where singleton :: a -> c a splitAt :: Int -> c a -> (c a, c a) fromList :: [a] -> c a instance (Loggable a) => Distributable DV.Vector a where singleton = DV.singleton splitAt= DV.splitAt fromList = DV.fromList instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where singleton= DVU.singleton splitAt= DVU.splitAt fromList= DVU.fromList -- | perform a map and partition the result with different keys using boxed vectors -- The final result will be used by reduce. mapKeyB :: (Loggable a, Loggable b, Loggable k,Ord k) => (a -> (k,b)) -> DDS (DV.Vector a) -> DDS (M.Map k(DV.Vector b)) mapKeyB= mapKey -- | perform a map and partition the result with different keys using unboxed vectors -- The final result will be used by reduce. mapKeyU :: (Loggable a, DVU.Unbox a, Loggable b, DVU.Unbox b, Loggable k,Ord k) => (a -> (k,b)) -> DDS (DVU.Vector a) -> DDS (M.Map k(DVU.Vector b)) mapKeyU= mapKey -- | perform a map and partition the result with different keys. -- The final result will be used by reduce. mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k) => (a -> (k,b)) -> DDS (vector a) -> DDS (M.Map k (vector b)) mapKey f (DDS mx)= DDS $ loggedc $ do refs <- mx process refs -- !> ("process",refs) where -- process :: Partition a -> Cloud [Partition b] process (ref@(Ref node path sav))= runAt node $ local $ do xs <- getPartitionData ref -- !> ("CMAP", ref,node) (generateRef $ map1 f xs) -- map1 :: (Ord k, F.Foldable vector) => (a -> (k,b)) -> vector a -> M.Map k(vector b) map1 f v= F.foldl' f1 M.empty v where f1 map x= let (k,r) = f x in M.insertWith (<>) k (Transient.MapReduce.singleton r) map data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show) boxids= unsafePerformIO $ newIORef 0 reduce :: (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a) => (a -> a -> a) -> DDS (M.Map k (vector a)) ->Cloud (M.Map k a) reduce red (dds@(DDS mx))= loggedc $ do mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n') nodes <- local getEqualNodes let lengthNodes = length nodes shuffler nodes = do localIO $ threadDelay 100000 ref@(Ref node path sav) <- mx -- return the resulting blocks of the map runAt node $ foldAndSend node nodes ref stop -- groupByDestiny :: (Hashable k, Distributable vector a) => M.Map k (vector a) -> M.Map Int [(k ,vector a)] groupByDestiny map = M.foldlWithKey' f M.empty map where -- f :: M.Map Int [(k ,vector a)] -> k -> vector a -> M.Map Int [(k ,vector a)] f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map hash1 k= abs $ hash k `rem` length nodes -- foldAndSend :: (Hashable k, Distributable vector a)=> (Int,[(k,vector a)]) -> Cloud () foldAndSend node nodes ref= do pairs <- onAll $ getPartitionData1 ref <|> return (error $ "DDS computed out of his node:"++ show ref ) let mpairs = groupByDestiny pairs length <- local . return $ M.size mpairs let port2= nodePort node if length == 0 then sendEnd nodes else do nsent <- onAll $ liftIO $ newMVar 0 (i,folded) <- local $ parallelize foldthem (M.assocs mpairs) n <- localIO $ modifyMVar nsent $ \r -> return (r+1, r+1) (runAt (nodes !! i) $ local $ putMailbox' mboxid (Reduce folded)) !> ("send",n,length,i,folded) -- return () !> (port,n,length) when (n == length) $ sendEnd nodes empty where foldthem (i,kvs)= async . return $ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs) sendEnd nodes = onNodes nodes $ local $ putMailbox' mboxid (EndReduce `asTypeOf` paramOf dds) -- !> ("send ENDREDUCE ", port)) onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f) nodes sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f) nodes reducer nodes= sumNodes nodes reduce1 -- a reduce1 process in each node, get the results and mappend them -- reduce :: (Ord k) => Cloud (M.Map k v) reduce1 = local $ do reduceResults <- liftIO $ newMVar M.empty numberSent <- liftIO $ newMVar 0 minput <- getMailbox' mboxid -- get the chunk once it arrives to the mailbox case minput of EndReduce -> do n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r') if n == lengthNodes -- !> ("END REDUCE RECEIVED",n, lengthNodes) then do cleanMailbox' mboxid (EndReduce `asTypeOf` paramOf dds) r <- liftIO $ readMVar reduceResults return r else stop Reduce kvs -> do let addIt (k,inp) = do let input= inp `asTypeOf` atype dds liftIO $ modifyMVar_ reduceResults $ \map -> do let maccum = M.lookup k map return $ M.insert k (case maccum of Just accum -> red input accum Nothing -> input) map mapM addIt (kvs `asTypeOf` paramOf' dds) !> ("Received Reduce",kvs) stop reducer nodes <|> shuffler nodes where atype ::DDS(M.Map k (vector a)) -> a atype = undefined -- type level paramOf :: DDS (M.Map k (vector a)) -> ReduceChunk [( k, a)] paramOf = undefined -- type level paramOf' :: DDS (M.Map k (vector a)) -> [( k, a)] paramOf' = undefined -- type level -- parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b parallelize f xs = foldr (<|>) empty $ map f xs mparallelize f xs = loggedc $ foldr (<>) mempty $ map f xs getPartitionData :: Loggable a => PartRef a -> TransIO a getPartitionData (Ref node path save) = Transient $ do mp <- (liftIO $ atomically $ readDBRef $ getDBRef $ keyp path save) `onNothing` error ("not found DDS data: "++ keyp path save) case mp of (Part _ _ _ xs) -> return $ Just xs getPartitionData1 :: Loggable a => PartRef a -> TransIO a getPartitionData1 (Ref node path save) = Transient $ do mp <- liftIO $ atomically $ readDBRef $ getDBRef $ keyp path save case mp of Just (Part _ _ _ xs) -> return $ Just xs Nothing -> return Nothing getPartitionData2 :: Loggable a => PartRef a -> IO a getPartitionData2 (Ref node path save) = do mp <- ( atomically $ readDBRef $ getDBRef $ keyp path save) `onNothing` error ("not found DDS data: "++ keyp path save) case mp of (Part _ _ _ xs) -> return xs -- en caso de fallo de Node, se lanza un clustered en busca del path -- si solo uno lo tiene, se copia a otro -- se pone ese nodo de referencia en Part runAtP :: Loggable a => Node -> (Path -> IO a) -> Path -> Cloud a runAtP node f uuid= do r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError case r of SLast r -> return r SError e -> do nodes <- mclustered $ search uuid when(length nodes < 1) $ asyncDuplicate node uuid runAtP ( head nodes) f uuid search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid asyncDuplicate node uuid= do forkTo node nodes <- onAll getEqualNodes let node'= head $ nodes \\ [node] content <- onAll . liftIO $ readFile uuid runAt node' $ local $ liftIO $ writeFile uuid content sendAnyError :: SomeException -> IO (StreamData a) sendAnyError e= return $ SError e -- | distribute a vector of values among many nodes. -- If the vector is static and sharable, better use the get* primitives -- since each node will load the data independently. distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a) distribute = DDS . distribute' distribute' xs= loggedc $ do nodes <- local getEqualNodes -- !> "DISTRIBUTE" let lnodes = length nodes let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n xss= split size lnodes 1 xs -- !> size r <- distribute'' xss nodes return r where split n s s' xs | s==s' = [xs] split n s s' xs= let (h,t)= Transient.MapReduce.splitAt n xs in h : split n s (s'+1) t distribute'' :: (Loggable a, Distributable vector a) => [vector a] -> [Node] -> Cloud (PartRef (vector a)) distribute'' xss nodes = parallelize move $ zip nodes xss -- !> show xss where move (node, xs)= runAt node $ local $ do par <- generateRef xs return par -- !> ("move", node,xs) -- | input data from a text that must be static and shared by all the nodes. -- The function parameter partition the text in words getText :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a) getText part str= DDS $ loggedc $ do nodes <- local getEqualNodes -- !> "getText" let lnodes = length nodes parallelize (process lnodes) $ zip nodes [0..lnodes-1] where process lnodes (node,i)= runAt node $ local $ do let xs = part str size= case length xs `div` lnodes of 0 ->1 ; n -> n xss= Transient.MapReduce.fromList $ if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs generateRef xss -- | get the worlds of an URL textUrl :: String -> DDS (DV.Vector Text.Text) textUrl= getUrl (map Text.pack . words) -- | generate a DDS from the content of a URL. -- The first parameter is a function that divide the text in words getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a) getUrl partitioner url= DDS $ do nodes <- local getEqualNodes -- !> "DISTRIBUTE" let lnodes = length nodes parallelize (process lnodes) $ zip nodes [0..lnodes-1] -- !> show xss where process lnodes (node,i)= runAt node $ local $ do r <- liftIO . simpleHTTP $ getRequest url body <- liftIO $ getResponseBody r let xs = partitioner body size= case length xs `div` lnodes of 0 ->1 ; n -> n xss= Transient.MapReduce.fromList $ if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs generateRef xss -- | get the words of a file textFile :: String -> DDS (DV.Vector Text.Text) textFile= getFile (map Text.pack . words) -- | generate a DDS from a file. All the nodes must access the file with the same path -- the first parameter is the parser that generates elements from the content getFile :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a) getFile partitioner file= DDS $ do nodes <- local getEqualNodes -- !> "DISTRIBUTE" let lnodes = length nodes parallelize (process lnodes) $ zip nodes [0..lnodes-1] -- !> show xss where process lnodes (node, i)= runAt node $ local $ do content <- do c <- liftIO $ readFile file length c `seq` return c let xs = partitioner content size= case length xs `div` lnodes of 0 ->1 ; n -> n xss= Transient.MapReduce.fromList $ if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs generateRef xss generateRef :: Loggable a => a -> TransIO (PartRef a) generateRef x= do node <- getMyNode liftIO $ do temp <- getTempName let reg= Part node temp False x atomically $ newDBRef reg -- syncCache (return $ getRef reg) -- !> ("generateRef",reg,node) getRef (Part n t s x)= Ref n t s getTempName :: IO String getTempName= ("DDS" ++) <$> replicateM 5 (randomRIO ('a','z')) -------------- Distributed Datasource Streams --------- -- | produce a stream of DDS's that can be map-reduced. Similar to spark streams. -- each interval of time,a new DDS is produced.(to be tested) streamDDS :: (Loggable a, Distributable vector a) => Integer -> IO (StreamData a) -> DDS (vector a) streamDDS time io= DDS $ do xs <- local . groupByTime time $ do r <- parallel io case r of SDone -> empty SLast x -> return x SMore x -> return x SError e -> error $ show e distribute' $ Transient.MapReduce.fromList xs #endif