-- ----------------------------------------------------------------------------
{- |
  Module     : Examples2.SimpleMR.SimpleDMapReduce
  Copyright  : Copyright (C) 2009 Sebastian Reese
  License    : MIT

  Maintainer : Sebastian Reese (str@holumbus.org)
  Stability  : experimental
  Portability: portable
  Version    : 0.1
-}
-- ----------------------------------------------------------------------------
module Holumbus.Distribution.SimpleDMapReduceIO
(
   MapFunction
 , ReduceFunction
 , client
 , worker
 , partition'
 , Priority(..)
 , putTimeStamp
)
where

import           Holumbus.Network.PortRegistry.PortRegistryPort
import           Holumbus.MapReduce.Types
import           Holumbus.Common.FileHandling
import           Data.Binary
--import           Holumbus.Common.MRBinary
import           Data.Maybe
import           Control.Parallel.Strategies
import qualified Holumbus.Distribution.DMapReduce               as MR
import qualified Holumbus.FileSystem.FileSystem                 as FS
import qualified Holumbus.MapReduce.DaemonInterface             as DI
import qualified Holumbus.Data.KeyMap                           as KMap
import           Holumbus.Common.Logging
import           Holumbus.Common.Utils ( handleAll )
import           System.Log.Logger
import           System.Environment
import           System.Exit
import           Data.Time.Clock.POSIX
import qualified Data.ByteString.Lazy as B

{-
splitConfiguration
  :: (Hash k1, NFData v1, NFData k1, Binary a, Binary k1, Binary v1)
  => SplitConfiguration a k1 v1
splitConfiguration
  = SplitConfiguration
      defaultSplit
      defaultInputReader
      defaultOutputWriter
-}

mapConfiguration
  :: (Hash k2, NFData v1, NFData k1, NFData v2, NFData k2, Ord k2, Binary a, Binary k1, Binary v1, Binary k2, Binary v2)
  => MapFunction a k1 v1 k2 v2
  -> MapConfiguration a k1 v1 k2 v2
mapConfiguration fct
  = MapConfiguration
      fct
      hashedPartition
      defaultInputReader
      defaultOutputWriter

reduceConfiguration
  :: (Hash k2, NFData v2, NFData k2, NFData v3, Ord k2, Binary a, Binary k2, Binary v2, Binary v3)
  => ReduceFunction a k2 v2 v3
  -> ReduceConfiguration a k2 v2 v3
reduceConfiguration fct
  = ReduceConfiguration
      defaultMerge
      fct
      hashedPartition
      defaultInputReader
      defaultOutputWriter


{-
actionConfig
-}
actionConfig :: (Hash k1, Hash k2, Binary a, NFData k1, NFData k2, Ord k2, Binary k1, Binary k2, NFData v1, NFData v4, NFData v2, NFData v3, Binary v1, Binary v3, Binary v2, Binary v4) => MapFunction a k1 v1 k2 v2 -> ReduceFunction a k2 v3 v4 -> ActionConfiguration a k1 v1 k2 v2 v3 v4
actionConfig m r = (defaultActionConfiguration "ID") {
           ac_Split   = Nothing -- Just splitConfiguration  
         , ac_Map     = Just . mapConfiguration    $ m
         , ac_Reduce  = Just . reduceConfiguration $ r
         }

{- ---------------------------------------------------------------------------------------------------------
   The simple client fucntions
  --------------------------------------------------------------------------------------------------------- -}   
  
  
{-
 The simple client
 
 doMapReduce :: ActionConfiguration a k1 v1 k2 v2 v3 v4
  -> a               -- ^ options
  -> [(k1,v1)]       -- ^ input (Tuples)
  -> [FileId]        -- ^ input (Files)
  -> Int             -- ^ number of splitters
  -> Int             -- ^ number of mappers
  -> Int             -- ^ number of reducers
  -> Int             -- ^ number of results
  -> TaskOutputType  -- ^ type of the result (file of raw)
  -> mr -> IO ([(k2,v4)],[FileId])
-}
client :: ( Show k1, Show k2, Show v1, Show v2, Show v3, Show v4
          , Binary v1, Binary v3, Binary v2, Binary v4
          , Binary a, Binary k1, Binary k2
          , NFData k1, NFData k2, NFData v1, NFData v4, NFData v2, NFData v3
          , Ord k2, Hash k1, Hash k2) =>
          MapFunction a k1 v1 k2 v2 -> ReduceFunction a k2 v3 v4
          -> a -> (Int,Int,Int) -> [[(k1,v1)]] -> IO [(k2,v4)]
client m r a (splitters,mappers,reducers) lss = do
      -- create port registry
      p <- newPortRegistryFromXmlFile "/tmp/registry.xml"
      setPortRegistry p      
      
      -- make filesystem
      fs <- FS.mkFileSystemClient FS.defaultFSClientConfig
      siteid <- FS.getMySiteId fs
      putStrLn $ "My FS Siteid is: " ++ show siteid
      
      -- create mapreduce data
      mr <- initializeData
      
      --_ <- readline "Press enter to continue .."
      -- create the filenames and store the data to the map reduce filesystem
--      let filenames = map (\i -> "initial_input_"++show i) [1..(length lss)]
--      FS.createFiles (zipWith (\fn c -> (fn,listToByteString c)) filenames lss) fs
      let (files,filenames) = prepareFiles lss 0
      FS.createFiles files fs
      -- mapM_ (\(filename,ls) -> FS.createFile filename (listToByteString ls) fs) $ zip filenames lss
      
      -- do the map reduce job
      putTimeStamp "SimpleDMR Begin MR"
      (_,fids) <- MR.doMapReduce (actionConfig m r) a [] filenames splitters mappers reducers 1 TOTFile mr
      putTimeStamp "SimpleDMR End MR"
      
      -- get the results from filesystem
      result <- merge fids fs
      
      -- close file- and mapreducesystem
      deinitializeData mr
      FS.closeFileSystem fs
      
      -- finally, return the result
      return result

prepareFiles :: Binary a => [[a]] -> Int -> ([(String,B.ByteString)],[String])
prepareFiles []     _ = ([],[])
prepareFiles (x:xs) i = ((fn,bin):files,fn:filenames)
  where
  fn = ("Initial_input_"++show i)
  bin = listToByteString x
  (files,filenames) = prepareFiles xs (i+1)
      
merge :: (Show k2, Show v4, Hash k2, Binary k2, Binary v4, NFData k2, NFData v4) => [FS.FileId] -> FS.FileSystem -> IO [(k2,v4)]
merge fids fs = do
   mayberesult <- mapM ( flip FS.getFileContent fs) fids
   let result = concat . map parseByteStringToList $ catMaybes mayberesult
   return result
   

{-
The simple client's init function
-}
initializeData :: IO (MR.DMapReduce)
initializeData 
  = do    
    let config = MR.defaultMRClientConfig
    MR.mkMapReduceClient config

{-
The simple client's deinit function
-}
deinitializeData :: MR.DMapReduce -> IO ()
deinitializeData mr
  = do
    MR.closeMapReduce mr

{- ---------------------------------------------------------------------------------------------------------
   The simple worker fucntions
  --------------------------------------------------------------------------------------------------------- -}   
version :: String
version = "SimpleWorker v 0.1"

prompt :: String
prompt = "# "++version++" > "

localLogger :: String
localLogger = "Holumbus.Distribution.SimpleDMapReduceIO.worker"

pUsage :: IO ()
pUsage = do
  putStrLn "Usage: SimpleWorker ConsolePort Logfile"

params :: IO [String]
params = do
  args <- getArgs
  if length args /= 2 then do
    errorM localLogger "Wrong argument count"
    pUsage
    exitFailure
    else
      return args

{-
 The simpleWorker
-}
worker :: (Show k1, Show k2, Show v1, Show v2, Hash k1, Hash k2, Binary a, NFData k1, NFData k2, Ord k2, Binary k1, Binary k2, NFData v1,NFData v4, NFData v2, NFData v3, Binary v1, Binary v3, Binary v2, Binary v4, Show v4, Show v3) => MapFunction a k1 v1 k2 v2  -> ReduceFunction a k2 v3 v4 -> [(String,Priority)] -> IO ()
worker m r loggers = do
  handleAll (\e -> errorM localLogger $ "EXCEPTION: " ++ show e) $
    do 
    (s_cport:logfile:[]) <- params
    --initializeFileLogging logfile [(localLogger, INFO),("Holumbus.Network.DoWithServer",INFO),("Holumbus.MapReduce.Types", INFO)]
    initializeFileLogging logfile ([(localLogger, INFO),("Holumbus.Network.DoWithServer",INFO)]++loggers)
    p <- newPortRegistryFromXmlFile "/tmp/registry.xml"
    setPortRegistry p
    (mr,fs) <- initWorker m r
    DI.runDaemon mr version (read s_cport) prompt 
    deinitWorker (mr,fs)

{-
 The simpleWorker's init functin
-}
initWorker :: (Hash k1, Show k1, Show k2, Show v1, Show v2, Show v3, Show v4, Hash k2, Binary a, NFData k1, NFData k2, Ord k2, Binary k1, Binary k2, NFData v1, NFData v4, NFData v2, NFData v3, Binary v1, Binary v3, Binary v2, Binary v4) => MapFunction a k1 v1 k2 v2  -> ReduceFunction a k2 v3 v4 -> IO (MR.DMapReduce, FS.FileSystem)
initWorker m r
  = do
    fs <- FS.mkFileSystemNode FS.defaultFSNodeConfig
    mr <- MR.mkMapReduceWorker fs actionMap MR.defaultMRWorkerConfig
    return (mr,fs)
    where
      actionMap :: ActionMap
      actionMap = KMap.insert (readActionConfiguration (actionConfig m r)) KMap.empty

{-
 The simpleWorker's deinit functin
-}
deinitWorker :: (MR.DMapReduce, FS.FileSystem) -> IO ()
deinitWorker (mr,fs)
  = do
    MR.closeMapReduce mr
    FS.closeFileSystem fs
    
{-

partition'

first list, list of 
 -}
partition' :: [a] -> [[a]] -> [[a]]
partition'     _        [] = []
partition'    []       xss = xss
partition'    us   (_xs:[]) = [us]
partition' (u:us) (xs:xss) = partition' us (xss ++ [xs'])
  where xs' = (u:xs)

putTimeStamp :: String -> IO ()
putTimeStamp s = do
  t1 <- getPOSIXTime
  putStrLn (s++" : "++ show t1)