{-# LANGUAGE TemplateHaskell, DeriveDataTypeable #-} module Happstack.State.Spread ( connectToCluster , changeEventMapping ) where import Happstack.State.Transaction import Happstack.Data.Serialize import Happstack.Data.SerializeTH import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as L import qualified Data.Map as Map import Data.Typeable import Spread.Client import qualified Control.Concurrent.Chan.Closeable as Closeable import System.Log.Logger import Control.Monad hiding (join) import Control.Concurrent import System.Random logSP :: Priority -> String -> IO () logSP = logM "Happstack.State.Spread" data Cluster = Cluster { clusterChan :: Closeable.Chan Closeable.R Message , clusterConnection :: Connection } type EventId = Int data ClusterMsg = RequestState | NewState B.ByteString -- A piece of state (map of all components) | StateTransferred -- The entire state has been transmitted. | ForeignEvent EventId Object | EventResponse EventId Object deriving (Typeable) instance Version ClusterMsg -- version 0 $(deriveSerialize ''ClusterMsg) {- | Connects the Happstack process to the local running spread daemon. The spread daemon must already be running. -} connectToCluster :: IO Cluster connectToCluster = do logSP NOTICE "Connecting to spread daemon on localhost" uniq <- randomRIO (0,30000) -- FIXME: this is a hack to get unique names. let name = mkPrivateName (B.pack $ "node" ++ show (uniq::Int)) (chan, conn) <- connect defaultConf{desiredName = name} startReceive conn join receiverGroup conn return $ Cluster chan conn receiverGroup :: Group receiverGroup = let Just g = makeGroup "receiver" in g {- | This function modifies any event handlers in the given EventMap to make them route the update information to the rest of the cluster -} changeEventMapping :: MVar TxControl -> EventMap -> Cluster -> IO EventMap changeEventMapping ctlVar localEventMap cluster = do logSP NOTICE "Create new event mapper" mems <- getClusterMembers cluster responseIndex <- newMVar Map.empty ready <- newEmptyMVar eventQueue <- newChan stateVar <- newMVar L.empty eidStore <- newMVar 0 -- eid's are locally unique. let newEID = modifyMVar eidStore (\store -> return (store+1,store+1)) pushStateBlock st = modifyMVar_ stateVar (\acc -> return $ acc `L.append` L.fromChunks [st]) insertEID eid = do mv <- newEmptyMVar modifyMVar_ responseIndex $ \idx -> return $ Map.insert eid mv idx return $ takeMVar mv returnResponse eid object = modifyMVar_ responseIndex $ \idx -> case Map.lookup eid idx of Nothing -> return idx -- We're already received the result from another node. Just mv -> do putMVar mv object -- Notify the caller about the new response. return $ Map.delete eid idx listener = forever $ do mbMsg <- Closeable.readChan (clusterChan cluster) case mbMsg of Nothing -> error "Disconnected from the spread daemon." Just (Regular msg) -> case fst (deserialize (L.fromChunks [inData msg])) of RequestState -> sendState ctlVar (inSender msg) cluster ForeignEvent eid object -> writeChan eventQueue (inSender msg, eid, object) NewState st -> pushStateBlock st -- FIXME: check that stateVar is non-empty? StateTransferred -> do restoreState =<< takeMVar stateVar putMVar ready () EventResponse eid object-> returnResponse eid object Just _ -> return () responder = forever $ do (sender, eid, object) <- readChan eventQueue response <- runObjectEventFunc object localEventMap sendClusterMsg [sender] (EventResponse eid response) cluster forkIO listener forkIO responder case mems of [] -> putMVar ready () -- The cluster is empty, use state from disk. (x:_) -> do logSP NOTICE $ "Requesting state from: " ++ show x sendClusterMsg [x] RequestState cluster logSP NOTICE "Waiting for ready signal" takeMVar ready logSP NOTICE "Ready signal received" let newEventMap = flip Map.map localEventMap $ \handler -> case handler of UpdateHandler runCold _ parse -> let runHot ev = do eid <- newEID logSP NOTICE $ "New eid: " ++ show eid wait <- insertEID eid sendClusterMsg [receiverGroup] (ForeignEvent eid (mkObject ev)) cluster response <- wait logSP NOTICE $ "Received response for: " ++ show eid return $ parseObject response in UpdateHandler runCold runHot parse QueryHandler{} -> handler return newEventMap sendState :: MVar TxControl -> Group -> Cluster -> IO () sendState ctlVar sender cluster = withMVar ctlVar $ \ctl -> do logSP NOTICE $ "Sending state to: " ++ show sender allStates <- mapM getState (ctlAllComponents ctl) let chunks = L.toChunks $ serialize $ zip (ctlAllComponents ctl) allStates forM_ chunks $ \chunk -> do sendClusterMsg [sender] (NewState chunk) cluster sendClusterMsg [sender] StateTransferred cluster logSP NOTICE $ "The state has been transmitted." restoreState :: L.ByteString -> IO () restoreState stateObject = do logSP NOTICE $ "Loading components from network." forM_ state $ \(stateType, stateData) -> do setNewState stateType stateData logSP NOTICE $ "All components successfully loaded" where state = fst (deserialize stateObject) sendClusterMsg :: [Group] -> ClusterMsg -> Cluster -> IO () sendClusterMsg to msg cluster = send outMsg (clusterConnection cluster) where outMsg = Outgoing { outOrdering = Safe , outDiscard = False , outData = B.concat (L.toChunks (serialize msg)) , outGroups = to , outMsgType = 0 } -- Cluster members not including self. getClusterMembers :: Cluster -> IO [PrivateGroup] getClusterMembers cluster = do mbMsg <- Closeable.readChan (clusterChan cluster) case mbMsg of Just (Membership membership) -> case membership of Reg{members=m} -> return $ filter (/= privateGroup (clusterConnection cluster)) m Transient{} -> error $ "Received a Transient message on connect." SelfLeave{} -> error $ "Received a SelfLeave message on connect." Just msg -> error $ "Expected membership message from cluster. Received a " ++ showMsgType msg ++ " message." Nothing -> error "Cluster channel unexpectedly closed." showMsgType :: Message -> String showMsgType (Regular _) = "regular" showMsgType (Membership _) = "membership" showMsgType (Rejected _) = "rejected"