{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Network.Legion.Runtime (
forkLegionary,
StartupMode(..),
Runtime,
makeRequest,
search,
) where
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (writeChan, newChan, Chan)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
import Control.Monad (void, forever, join, (>=>))
import Control.Monad.Catch (catchAll, try, SomeException, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (logWarn, logError, logInfo, LoggingT,
MonadLoggerIO, runLoggingT, askLoggerIO, logDebug)
import Control.Monad.Trans.Class (lift)
import Data.Binary (encode, Binary)
import Data.Conduit (Source, ($$), (=$=), yield, await, awaitForever,
transPipe, ConduitM, runConduit, Sink)
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.Serialization.Binary (conduitDecode)
import Data.Map (Map)
import Data.Set (Set)
import Data.Text (pack)
import GHC.Generics (Generic)
import Network.Legion.Admin (runAdmin, AdminMessage(GetState, GetPart,
Eject))
import Network.Legion.Application (LegionConstraints, getState, Persistence)
import Network.Legion.BSockAddr (BSockAddr(BSockAddr))
import Network.Legion.ClusterState (ClusterPowerState)
import Network.Legion.Conduit (merge, chanToSink, chanToSource)
import Network.Legion.Distribution (Peer, newPeer)
import Network.Legion.Fork (forkC)
import Network.Legion.Index (IndexRecord(IndexRecord), irTag, irKey,
SearchTag(SearchTag))
import Network.Legion.LIO (LIO)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.Runtime.ConnectionManager (newConnectionManager,
send, ConnectionManager, newPeers)
import Network.Legion.Runtime.PeerMessage (PeerMessage(PeerMessage),
PeerMessagePayload(ForwardRequest, ForwardResponse, ClusterMerge,
PartitionMerge, Search, SearchResponse), MessageId, newSequence,
nextMessageId)
import Network.Legion.Settings (RuntimeSettings(RuntimeSettings,
adminHost, adminPort, peerBindAddr, joinBindAddr))
import Network.Legion.StateMachine (partitionMerge, clusterMerge,
NodeState, newNodeState, runSM, UserResponse(Forward, Respond),
userRequest, heartbeat, rebalance, migrate, propagate, ClusterAction,
eject, minimumCompleteServiceSet)
import Network.Legion.UUID (getUUID)
import Network.Socket (Family(AF_INET, AF_INET6, AF_UNIX, AF_CAN),
SocketOption(ReuseAddr), SocketType(Stream), accept, bind,
defaultProtocol, listen, setSocketOption, socket, SockAddr(SockAddrInet,
SockAddrInet6, SockAddrUnix, SockAddrCan), connect, getPeerName, Socket)
import Network.Socket.ByteString.Lazy (sendAll)
import qualified Data.Conduit.List as CL
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.ClusterState as C
import qualified Network.Legion.StateMachine as SM
runLegionary :: (LegionConstraints e o s)
=> Persistence e o s
-> RuntimeSettings
-> StartupMode
-> Source IO (RequestMsg e o)
-> LoggingT IO ()
runLegionary
persistence
settings@RuntimeSettings {adminHost, adminPort}
startupMode
requestSource
= do
peerS <- loggingC =<< startPeerListener settings
adminS <- loggingC =<< runAdmin adminPort adminHost
joinS <- loggingC (joinMsgSource settings)
(self, nodeState, peers) <- makeNodeState settings startupMode
cm <- newConnectionManager peers
firstMessageId <- newSequence
let
rts = RuntimeState {
forwarded = Map.empty,
nextId = firstMessageId,
cm,
self,
searches = Map.empty
}
runConduit $
(joinS `merge` (peerS `merge` (requestSource `merge` adminS)))
=$= CL.map toMessage
=$= messageSink persistence (rts, nodeState)
where
toMessage
:: Either
(JoinRequest, JoinResponse -> LIO ())
(Either
(PeerMessage e o s)
(Either
(RequestMsg e o)
(AdminMessage e o s)))
-> RuntimeMessage e o s
toMessage (Left m) = J m
toMessage (Right (Left m)) = P m
toMessage (Right (Right (Left m))) = R m
toMessage (Right (Right (Right m))) = A m
loggingC :: ConduitM e o LIO r -> LIO (ConduitM e o IO r)
loggingC c = do
logging <- askLoggerIO
return (transPipe (`runLoggingT` logging) c)
data RequestMsg e o
= Request PartitionKey e (o -> IO ())
| SearchDispatch SearchTag (Maybe IndexRecord -> IO ())
instance (Show e) => Show (RequestMsg e o) where
show (Request k e _) = "(Request " ++ show k ++ " " ++ show e ++ " _)"
show (SearchDispatch s _) = "(SearchDispatch " ++ show s ++ " _)"
messageSink :: (LegionConstraints e o s)
=> Persistence e o s
-> (RuntimeState e o s, NodeState e o s)
-> Sink (RuntimeMessage e o s) LIO ()
messageSink persistence states =
await >>= \case
Nothing -> return ()
Just msg -> do
$(logDebug) . pack
$ "Receieved: " ++ show msg
lift . handleMessage persistence msg
>=> lift . updatePeers persistence
>=> lift . clusterHousekeeping persistence
>=> messageSink persistence
$ states
updatePeers
:: Persistence e o s
-> (RuntimeState e o s, NodeState e o s)
-> LIO (RuntimeState e o s, NodeState e o s)
updatePeers persistence (rts, ns) = do
(peers, ns2) <- runSM persistence ns SM.getPeers
newPeers (cm rts) peers
return (rts, ns2)
clusterHousekeeping :: (LegionConstraints e o s)
=> Persistence e o s
-> (RuntimeState e o s, NodeState e o s)
-> LIO (RuntimeState e o s, NodeState e o s)
clusterHousekeeping persistence (rts, ns) = do
(actions, ns2) <- runSM persistence ns (
heartbeat
>> rebalance
>> migrate
>> propagate
)
rts2 <- foldr (>=>) return (clusterAction <$> actions) rts
return (rts2, ns2)
clusterAction
:: ClusterAction e o s
-> RuntimeState e o s
-> LIO (RuntimeState e o s)
clusterAction
(SM.ClusterMerge peer ps)
rts@RuntimeState {self, nextId, cm}
= do
send cm peer (PeerMessage self nextId (ClusterMerge ps))
return rts {nextId = nextMessageId nextId}
clusterAction
(SM.PartitionMerge peer key ps)
rts@RuntimeState {self, nextId, cm}
= do
send cm peer (PeerMessage self nextId (PartitionMerge key ps))
return rts {nextId = nextMessageId nextId}
handleMessage :: (LegionConstraints e o s)
=> Persistence e o s
-> RuntimeMessage e o s
-> (RuntimeState e o s, NodeState e o s)
-> LIO (RuntimeState e o s, NodeState e o s)
handleMessage
persistence
(P (PeerMessage source _ (PartitionMerge key ps)))
(rts, ns)
= do
((), ns2) <- runSM persistence ns (partitionMerge source key ps)
return (rts, ns2)
handleMessage
persistence
(P (PeerMessage source _ (ClusterMerge cs)))
(rts, ns)
= do
((), ns2) <- runSM persistence ns (clusterMerge source cs)
return (rts, ns2)
handleMessage
persistence
(P (msg@(PeerMessage source mid (ForwardRequest key request))))
(rts@RuntimeState {nextId, cm, self}, ns)
= do
(output, ns2) <- runSM persistence ns (userRequest key request)
case output of
Respond response -> do
send cm source (
PeerMessage self nextId (ForwardResponse mid response)
)
return (rts {nextId = nextMessageId nextId}, ns2)
Forward peer -> do
send cm peer msg
return (rts {nextId = nextMessageId nextId}, ns2)
handleMessage
_legionary
(msg@(P (PeerMessage _ _ (ForwardResponse mid response))))
(rts, ns)
=
case lookupDelete mid (forwarded rts) of
(Nothing, fwd) -> do
$(logWarn) . pack $ "Unsolicited ForwardResponse: " ++ show msg
return (rts {forwarded = fwd}, ns)
(Just respond, fwd) -> do
respond response
return (rts {forwarded = fwd}, ns)
handleMessage
persistence
(R (Request key request respond))
(rts@RuntimeState {self, cm, nextId, forwarded}, ns)
= do
(output, ns2) <- runSM persistence ns (userRequest key request)
case output of
Respond response -> do
lift (respond response)
return (rts, ns2)
Forward peer -> do
send cm peer (
PeerMessage self nextId (ForwardRequest key request)
)
return (
rts {
forwarded = Map.insert nextId (lift . respond) forwarded,
nextId = nextMessageId nextId
},
ns2
)
handleMessage
persistence
(R (SearchDispatch searchTag respond))
(rts@RuntimeState {cm, self, searches}, ns)
=
case Map.lookup searchTag searches of
Nothing -> do
(mcss, ns2) <- runSM persistence ns minimumCompleteServiceSet
rts2 <- foldr (>=>) return (sendOne <$> Set.toList mcss) rts
return (
rts2 {
searches = Map.insert
searchTag
(mcss, Nothing, [lift . respond])
searches
},
ns2
)
Just (peers, best, responders) ->
return (
rts {
searches = Map.insert
searchTag
(peers, best, (lift . respond):responders)
searches
},
ns
)
where
sendOne :: Peer -> RuntimeState e o s -> LIO (RuntimeState e o s)
sendOne peer r@RuntimeState {nextId} = do
send cm peer (PeerMessage self nextId (Search searchTag))
return r {nextId = nextMessageId nextId}
handleMessage
persistence
(P (PeerMessage source _ (Search searchTag)))
(rts@RuntimeState {nextId, cm, self}, ns)
= do
(output, ns2) <- runSM persistence ns (SM.search searchTag)
send cm source (PeerMessage self nextId (SearchResponse searchTag output))
return (rts {nextId = nextMessageId nextId}, ns2)
handleMessage
_legionary
(msg@(P (PeerMessage source _ (SearchResponse searchTag response))))
(rts@RuntimeState {searches}, ns)
=
case Map.lookup searchTag searches of
Nothing -> do
$(logWarn) . pack $ "Unsolicited SearchResponse: " ++ show msg
return (rts, ns)
Just (peers, best, responders) ->
if source `Set.member` peers
then
let peers2 = Set.delete source peers
in if null peers2
then do
mapM_ ($ bestOf best response) responders
return (
rts {searches = Map.delete searchTag searches},
ns
)
else
return (
rts {
searches = Map.insert
searchTag
(peers2, bestOf best response, responders)
searches
},
ns
)
else do
$(logWarn) . pack $ "Unsolicited SearchResponse: " ++ show msg
return (rts, ns)
where
bestOf :: Maybe IndexRecord -> Maybe IndexRecord -> Maybe IndexRecord
bestOf (Just a) (Just b) = Just (min a b)
bestOf Nothing b = b
bestOf a Nothing = a
handleMessage
persistence
(J (JoinRequest addy, respond))
(rts, ns)
= do
((peer, cluster), ns2) <- runSM persistence ns (SM.join addy)
respond (JoinOk peer cluster)
return (rts, ns2)
handleMessage
_legionary
(A (GetState respond))
(rts, ns)
=
respond ns >> return (rts, ns)
handleMessage
persistence
(A (GetPart key respond))
(rts, ns)
= do
respond =<< lift (getState persistence key)
return (rts, ns)
handleMessage
persistence
(A (Eject peer respond))
(rts, ns)
= do
((), ns2) <- runSM persistence ns (eject peer)
respond ()
return (rts, ns2)
data StartupMode
= NewCluster
| JoinCluster SockAddr
deriving (Show, Eq)
startPeerListener :: (LegionConstraints e o s)
=> RuntimeSettings
-> LIO (Source LIO (PeerMessage e o s))
startPeerListener RuntimeSettings {peerBindAddr} =
catchAll (do
(inputChan, so) <- lift $ do
inputChan <- newChan
so <- socket (fam peerBindAddr) Stream defaultProtocol
setSocketOption so ReuseAddr 1
bind so peerBindAddr
listen so 5
return (inputChan, so)
forkC "peer socket acceptor" $ acceptLoop so inputChan
return (chanToSource inputChan)
) (\err -> do
$(logError) . pack
$ "Couldn't start incomming peer message service, because of: "
++ show (err :: SomeException)
throwM err
)
where
acceptLoop :: (LegionConstraints e o s)
=> Socket
-> Chan (PeerMessage e o s)
-> LIO ()
acceptLoop so inputChan =
catchAll (
forever $ do
(conn, _) <- lift $ accept so
remoteAddr <- lift $ getPeerName conn
logging <- askLoggerIO
let runSocket =
sourceSocket conn
=$= conduitDecode
$$ msgSink
void
. lift
. forkIO
. (`runLoggingT` logging)
. logErrors remoteAddr
$ runSocket
) (\err -> do
$(logError) . pack
$ "error in peer message accept loop: "
++ show (err :: SomeException)
throwM err
)
where
msgSink = chanToSink inputChan
logErrors :: SockAddr -> LIO () -> LIO ()
logErrors remoteAddr io = do
result <- try io
case result of
Left err ->
$(logWarn) . pack
$ "Incomming peer connection (" ++ show remoteAddr
++ ") crashed because of: " ++ show (err :: SomeException)
Right v -> return v
makeNodeState
:: RuntimeSettings
-> StartupMode
-> LIO (Peer, NodeState e o s, Map Peer BSockAddr)
makeNodeState RuntimeSettings {peerBindAddr} NewCluster = do
self <- newPeer
clusterId <- getUUID
let
cluster = C.new clusterId self peerBindAddr
nodeState = newNodeState self cluster
return (self, nodeState, C.getPeers cluster)
makeNodeState RuntimeSettings {peerBindAddr} (JoinCluster addr) = do
$(logInfo) "Trying to join an existing cluster."
(self, clusterPS) <- joinCluster (JoinRequest (BSockAddr peerBindAddr))
let
cluster = C.initProp self clusterPS
nodeState = newNodeState self cluster
return (self, nodeState, C.getPeers cluster)
where
joinCluster :: JoinRequest -> LIO (Peer, ClusterPowerState)
joinCluster joinMsg = liftIO $ do
so <- socket (fam addr) Stream defaultProtocol
connect so addr
sendAll so (encode joinMsg)
sourceSocket so =$= conduitDecode $$ do
response <- await
case response of
Nothing -> fail
$ "Couldn't join a cluster because there was no response "
++ "to our join request!"
Just (JoinOk self cps) ->
return (self, cps)
Just (JoinRejected reason) -> fail
$ "The cluster would not allow us to re-join. "
++ "The reason given was: " ++ show reason
joinMsgSource
:: RuntimeSettings
-> Source LIO (JoinRequest, JoinResponse -> LIO ())
joinMsgSource RuntimeSettings {joinBindAddr} = join . lift $
catchAll (do
(chan, so) <- lift $ do
chan <- newChan
so <- socket (fam joinBindAddr) Stream defaultProtocol
setSocketOption so ReuseAddr 1
bind so joinBindAddr
listen so 5
return (chan, so)
forkC "join socket acceptor" $ acceptLoop so chan
return (chanToSource chan)
) (\err -> do
$(logError) . pack
$ "Couldn't start join request service, because of: "
++ show (err :: SomeException)
throwM err
)
where
acceptLoop :: Socket -> Chan (JoinRequest, JoinResponse -> LIO ()) -> LIO ()
acceptLoop so chan =
catchAll (
forever $ do
(conn, _) <- lift $ accept so
logging <- askLoggerIO
(void . lift . forkIO . (`runLoggingT` logging) . logErrors) (
sourceSocket conn
=$= conduitDecode
=$= attachResponder conn
$$ chanToSink chan
)
) (\err -> do
$(logError) . pack
$ "error in join request accept loop: "
++ show (err :: SomeException)
throwM err
)
where
logErrors :: LIO () -> LIO ()
logErrors io = do
result <- try io
case result of
Left err ->
$(logWarn) . pack
$ "Incomming join connection crashed because of: "
++ show (err :: SomeException)
Right v -> return v
attachResponder
:: Socket
-> ConduitM JoinRequest (JoinRequest, JoinResponse -> LIO ()) LIO ()
attachResponder conn = awaitForever (\msg -> do
mvar <- liftIO newEmptyMVar
yield (msg, lift . putMVar mvar)
response <- liftIO $ takeMVar mvar
liftIO $ sendAll conn (encode response)
)
fam :: SockAddr -> Family
fam SockAddrInet {} = AF_INET
fam SockAddrInet6 {} = AF_INET6
fam SockAddrUnix {} = AF_UNIX
fam SockAddrCan {} = AF_CAN
forkLegionary :: (LegionConstraints e o s, MonadLoggerIO io)
=> Persistence e o s
-> RuntimeSettings
-> StartupMode
-> io (Runtime e o)
forkLegionary persistence settings startupMode = do
logging <- askLoggerIO
liftIO . (`runLoggingT` logging) $ do
chan <- liftIO newChan
forkC "main legion thread" $
runLegionary persistence settings startupMode (chanToSource chan)
return Runtime {
rtMakeRequest = \key request -> liftIO $ do
responseVar <- newEmptyMVar
writeChan chan (Request key request (putMVar responseVar))
takeMVar responseVar,
rtSearch =
let
findNext :: SearchTag -> IO (Maybe IndexRecord)
findNext searchTag = do
responseVar <- newEmptyMVar
writeChan chan (SearchDispatch searchTag (putMVar responseVar))
takeMVar responseVar
in findNext
}
data Runtime e o = Runtime {
rtMakeRequest :: PartitionKey -> e -> IO o,
rtSearch :: SearchTag -> IO (Maybe IndexRecord)
}
makeRequest :: (MonadIO io) => Runtime e o -> PartitionKey -> e -> io o
makeRequest rt key = liftIO . rtMakeRequest rt key
search :: (MonadIO io) => Runtime e o -> SearchTag -> Source io IndexRecord
search rt tag =
liftIO (rtSearch rt tag) >>= \case
Nothing -> return ()
Just record@IndexRecord {irTag, irKey} -> do
yield record
search rt (SearchTag irTag (Just irKey))
data RuntimeMessage e o s
= P (PeerMessage e o s)
| R (RequestMsg e o)
| J (JoinRequest, JoinResponse -> LIO ())
| A (AdminMessage e o s)
instance (Show e, Show o, Show s) => Show (RuntimeMessage e o s) where
show (P m) = "(P " ++ show m ++ ")"
show (R m) = "(R " ++ show m ++ ")"
show (J (jr, _)) = "(J (" ++ show jr ++ ", _))"
show (A a) = "(A (" ++ show a ++ "))"
data RuntimeState e o s = RuntimeState {
self :: Peer,
forwarded :: Map MessageId (o -> LIO ()),
nextId :: MessageId,
cm :: ConnectionManager e o s,
searches :: Map
SearchTag
(Set Peer, Maybe IndexRecord, [Maybe IndexRecord -> LIO ()])
}
data JoinRequest = JoinRequest BSockAddr
deriving (Generic, Show)
instance Binary JoinRequest
data JoinResponse
= JoinOk Peer ClusterPowerState
| JoinRejected String
deriving (Generic)
instance Binary JoinResponse
lookupDelete :: (Ord k) => k -> Map k v -> (Maybe v, Map k v)
lookupDelete = Map.updateLookupWithKey (const (const Nothing))