module Network.Legion.Runtime (
forkLegionary,
StartupMode(..),
Runtime,
makeRequest,
search,
) where
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (writeChan, newChan, Chan)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
import Control.Monad (void, forever, join)
import Control.Monad.Catch (catchAll, try, SomeException, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (logWarn, logError, logInfo, LoggingT,
MonadLoggerIO, runLoggingT, askLoggerIO, logDebug)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State (StateT, runStateT, get, put, modify)
import Data.Binary (encode, Binary)
import Data.Conduit (Source, ($$), (=$=), yield, await, awaitForever,
transPipe, ConduitM, runConduit, Sink)
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.Serialization.Binary (conduitDecode)
import Data.Map (Map)
import Data.Set (Set)
import Data.String (IsString, fromString)
import Data.Text (pack)
import Data.Time (UTCTime, getCurrentTime)
import GHC.Generics (Generic)
import Network.Legion.Admin (runAdmin, AdminMessage(GetState, GetPart,
Eject, GetIndex, GetDivergent, GetStates))
import Network.Legion.Application (LegionConstraints, Persistence,
list, saveCluster)
import Network.Legion.BSockAddr (BSockAddr(BSockAddr))
import Network.Legion.ClusterState (ClusterPowerState)
import Network.Legion.Conduit (merge, chanToSink, chanToSource)
import Network.Legion.Distribution (Peer, newPeer)
import Network.Legion.Fork (forkC)
import Network.Legion.Index (IndexRecord(IndexRecord), irTag, irKey,
SearchTag(SearchTag), indexEntries, Indexable)
import Network.Legion.LIO (LIO)
import Network.Legion.Lift (lift2, lift3)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.PartitionState (PartitionPowerState)
import Network.Legion.PowerState (Event)
import Network.Legion.Runtime.ConnectionManager (newConnectionManager,
ConnectionManager, newPeers)
import Network.Legion.Runtime.PeerMessage (PeerMessage(PeerMessage),
PeerMessagePayload(ForwardRequest, ForwardResponse, ClusterMerge,
PartitionMerge, Search, SearchResponse, JoinNext, JoinNextResponse),
MessageId, newSequence, nextMessageId, JoinNextResponse(Joined,
JoinFinished))
import Network.Legion.Settings (RuntimeSettings(RuntimeSettings,
adminHost, adminPort, peerBindAddr, joinBindAddr))
import Network.Legion.StateMachine (partitionMerge, clusterMerge,
newNodeState, UserResponse(Forward, Respond), userRequest, eject,
minimumCompleteServiceSet, joinNext, joinNextResponse)
import Network.Legion.StateMachine.Monad (NodeState, runSM, ClusterAction,
SM, popActions, nsIndex)
import Network.Legion.UUID (getUUID)
import Network.Socket (Family(AF_INET, AF_INET6, AF_UNIX, AF_CAN),
SocketOption(ReuseAddr), SocketType(Stream), accept, bind,
defaultProtocol, listen, setSocketOption, socket, SockAddr(SockAddrInet,
SockAddrInet6, SockAddrUnix, SockAddrCan), connect, getPeerName, Socket)
import Network.Socket.ByteString.Lazy (sendAll)
import System.IO (stderr, hPutStrLn)
import qualified Data.Conduit.List as CL
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.ClusterState as C
import qualified Network.Legion.PowerState as PS
import qualified Network.Legion.Runtime.ConnectionManager as CM
import qualified Network.Legion.StateMachine as SM
import qualified Network.Legion.StateMachine.Monad as SMM
runLegionary :: (LegionConstraints e o s)
=> Persistence e o s
-> RuntimeSettings
-> StartupMode
-> Source IO (RequestMsg e o)
-> LoggingT IO ()
runLegionary
persistence
settings@RuntimeSettings {adminHost, adminPort}
startupMode
requestSource
= do
peerS <- loggingC =<< startPeerListener settings
adminS <- loggingC =<< runAdmin adminPort adminHost
joinS <- loggingC (joinMsgSource settings)
(self, nodeState, peers) <- makeNodeState persistence settings startupMode
rts <- newRuntimeState self peers
let
messageSource = transPipe lift (
(joinS =$= CL.map J) `merge`
(peerS =$= CL.map P) `merge`
(requestSource =$= CL.map R) `merge`
(adminS =$= CL.map A)
)
void . runRTS persistence nodeState rts . runConduit $
messageSource
=$= messageSink
where
newRuntimeState :: (Binary e, Binary o, Binary s)
=> Peer
-> Map Peer BSockAddr
-> LoggingT IO (RuntimeState e o s)
newRuntimeState self peers = do
cm <- newConnectionManager peers
firstMessageId <- newSequence
return RuntimeState {
forwarded = Map.empty,
nextId = firstMessageId,
cm,
self,
commClock = Map.empty,
searches = Map.empty
}
loggingC :: ConduitM e o LIO r -> LIO (ConduitM e o IO r)
loggingC c = do
logging <- askLoggerIO
return (transPipe (`runLoggingT` logging) c)
data RequestMsg e o
= Request PartitionKey e (o -> IO ())
| SearchDispatch SearchTag (Maybe IndexRecord -> IO ())
instance (Show e) => Show (RequestMsg e o) where
show (Request k e _) = "(Request " ++ show k ++ " " ++ show e ++ " _)"
show (SearchDispatch s _) = "(SearchDispatch " ++ show s ++ " _)"
messageSink :: (LegionConstraints e o s)
=> Sink (RuntimeMessage e o s) (RTS e o s) ()
messageSink = awaitForever (\msg -> do
$(logDebug) . pack $ "Receieved: " ++ show msg
lift $ do
case msg of
P (PeerMessage source _ _) ->
updateRecvClock source
_ -> return ()
handleMessage msg
updatePeers
clusterActions
)
clusterActions :: RTS e o s ()
clusterActions =
mapM_ clusterAction =<< popActions
where
clusterAction
:: ClusterAction e o s
-> RTS e o s ()
clusterAction (SMM.ClusterMerge peer ps) =
void $ send peer (ClusterMerge ps)
clusterAction (SMM.PartitionMerge peer key ps) =
void $ send peer (PartitionMerge key ps)
clusterAction (SMM.PartitionJoin peer keys) =
void $ send peer (JoinNext keys)
updatePeers :: RTS e o s ()
updatePeers = do
peers <- SM.getPeers
RuntimeState {cm} <- lift get
lift2 $ newPeers cm peers
handleMessage :: (LegionConstraints e o s)
=> RuntimeMessage e o s
-> RTS e o s ()
handleMessage
(P (PeerMessage source _ (JoinNextResponse _messageId response)))
=
joinNextResponse source (toMaybe response)
where
toMaybe
:: JoinNextResponse e o s
-> Maybe (PartitionKey, PartitionPowerState e o s)
toMaybe (Joined key partition) = Just (key, partition)
toMaybe JoinFinished = Nothing
handleMessage
(P (PeerMessage source messageId (JoinNext askKeys)))
=
joinNext source askKeys >>= \case
Nothing -> void $
send source (JoinNextResponse messageId JoinFinished)
Just (gotKey, partition) -> void $
send source (JoinNextResponse messageId (Joined gotKey partition))
handleMessage
(P (PeerMessage _ _ (PartitionMerge key ps)))
=
partitionMerge key ps
handleMessage
(P (PeerMessage _ _ (ClusterMerge cs)))
=
clusterMerge cs
handleMessage
(P (msg@(PeerMessage source mid (ForwardRequest key request))))
= do
output <- userRequest key request
case output of
Respond response -> void $ send source (ForwardResponse mid response)
Forward peer -> forward peer msg
handleMessage
(msg@(P (PeerMessage _ _ (ForwardResponse mid response))))
= do
rts <- lift get
case lookupDelete mid (forwarded rts) of
(Nothing, fwd) -> do
$(logWarn) . pack $ "Unsolicited ForwardResponse: " ++ show msg
(lift . put) rts {forwarded = fwd}
(Just respond, fwd) -> do
lift2 $ respond response
(lift . put) rts {forwarded = fwd}
handleMessage
(R (Request key request respond))
= do
output <- userRequest key request
case output of
Respond response -> lift3 (respond response)
Forward peer -> do
messageId <- send peer (ForwardRequest key request)
(lift . modify) $ \rts@RuntimeState {forwarded} -> rts {
forwarded = Map.insert messageId (lift . respond) forwarded
}
handleMessage
(R (SearchDispatch searchTag respond))
=
Map.lookup searchTag . searches <$> lift get >>= \case
Nothing -> do
mcss <- minimumCompleteServiceSet
mapM_ sendOne (Set.toList mcss)
rts@RuntimeState {searches} <- lift get
(lift . put) rts {
searches = Map.insert
searchTag
(mcss, Nothing, [lift . respond])
searches
}
Just (peers, best, responders) -> do
rts@RuntimeState {searches} <- lift get
(lift . put) rts {
searches = Map.insert
searchTag
(peers, best, (lift . respond):responders)
searches
}
where
sendOne :: Peer -> RTS e o s ()
sendOne peer =
void $ send peer (Search searchTag)
handleMessage
(P (PeerMessage source _ (Search searchTag)))
= do
output <- SM.search searchTag
void $ send source (SearchResponse searchTag output)
handleMessage
(msg@(P (PeerMessage source _ (SearchResponse searchTag response))))
=
Map.lookup searchTag . searches <$> lift get >>= \case
Nothing ->
$(logWarn) . pack $ "Unsolicited SearchResponse: " ++ show msg
Just (peers, best, responders) ->
if source `Set.member` peers
then
let peers2 = Set.delete source peers
in if null peers2
then do
lift2 $ mapM_ ($ bestOf best response) responders
rts@RuntimeState {searches} <- lift get
(lift . put) rts {searches = Map.delete searchTag searches}
else do
rts@RuntimeState {searches} <- lift get
(lift . put) rts {
searches = Map.insert
searchTag
(peers2, bestOf best response, responders)
searches
}
else
$(logWarn) . pack $ "Unsolicited SearchResponse: " ++ show msg
where
bestOf :: Maybe IndexRecord -> Maybe IndexRecord -> Maybe IndexRecord
bestOf (Just a) (Just b) = Just (min a b)
bestOf Nothing b = b
bestOf a Nothing = a
handleMessage
(J (JoinRequest addy, respond))
= do
(peer, cluster) <- SM.join addy
lift2 $ respond (JoinOk peer cluster)
handleMessage
(A (GetState respond))
=
lift2 . respond =<< SMM.getNodeState
handleMessage
(A (GetPart key respond))
=
lift2 . respond =<< SM.getPartition key
handleMessage
(A (Eject peer respond))
= do
eject peer
lift2 $ respond ()
handleMessage
(A (GetIndex respond))
=
lift2 . respond =<< SMM.nsIndex <$> SMM.getNodeState
handleMessage
(A (GetDivergent respond))
= do
RuntimeState {commClock} <- lift get
diverging <- divergentPeers . SMM.partitions <$> SMM.getNodeState
lift2 . respond $ Map.fromAscList [
(peer, r)
| (peer, (_, r)) <- Map.toAscList commClock
, peer `Set.member` diverging
]
where
divergentPeers :: Map PartitionKey (PartitionPowerState e o s) -> Set Peer
divergentPeers =
foldr Set.union Set.empty . fmap (PS.divergent . snd) . Map.toList
handleMessage
(A (GetStates respond))
= do
persistence <- SMM.getPersistence
lift2 . respond . Map.fromList =<< runConduit (
transPipe liftIO (list persistence)
=$= CL.consume
)
data StartupMode
= NewCluster
| JoinCluster SockAddr
| Recover Peer ClusterPowerState
deriving (Show, Eq)
startPeerListener :: (LegionConstraints e o s)
=> RuntimeSettings
-> LIO (Source LIO (PeerMessage e o s))
startPeerListener RuntimeSettings {peerBindAddr} =
catchAll (do
(inputChan, so) <- lift $ do
inputChan <- newChan
so <- socket (fam peerBindAddr) Stream defaultProtocol
setSocketOption so ReuseAddr 1
bind so peerBindAddr
listen so 5
return (inputChan, so)
forkC "peer socket acceptor" $ acceptLoop so inputChan
return (chanToSource inputChan)
) (\err -> do
$(logError) . pack
$ "Couldn't start incomming peer message service, because of: "
++ show (err :: SomeException)
throwM err
)
where
acceptLoop :: (LegionConstraints e o s)
=> Socket
-> Chan (PeerMessage e o s)
-> LIO ()
acceptLoop so inputChan =
catchAll (
forever $ do
(conn, _) <- lift $ accept so
remoteAddr <- lift $ getPeerName conn
logging <- askLoggerIO
let runSocket =
sourceSocket conn
=$= conduitDecode
$$ msgSink
void
. lift
. forkIO
. (`runLoggingT` logging)
. logErrors remoteAddr
$ runSocket
) (\err -> do
$(logError) . pack
$ "error in peer message accept loop: "
++ show (err :: SomeException)
throwM err
)
where
msgSink = chanToSink inputChan
logErrors :: SockAddr -> LIO () -> LIO ()
logErrors remoteAddr io = do
result <- try io
case result of
Left err ->
$(logWarn) . pack
$ "Incomming peer connection (" ++ show remoteAddr
++ ") crashed because of: " ++ show (err :: SomeException)
Right v -> return v
makeNodeState :: (Event e o s, Indexable s)
=> Persistence e o s
-> RuntimeSettings
-> StartupMode
-> LIO (Peer, NodeState e o s, Map Peer BSockAddr)
makeNodeState
persistence
settings@RuntimeSettings {peerBindAddr}
NewCluster
= do
verifyClearPersistence persistence
self <- newPeer
clusterId <- getUUID
let
cluster = C.new clusterId self peerBindAddr
makeNodeState persistence settings (Recover self cluster)
makeNodeState
persistence
settings@RuntimeSettings {peerBindAddr}
(JoinCluster addr)
= do
verifyClearPersistence persistence
$(logInfo) "Trying to join an existing cluster."
(self, cluster) <- joinCluster (JoinRequest (BSockAddr peerBindAddr))
makeNodeState persistence settings (Recover self cluster)
where
joinCluster :: JoinRequest -> LIO (Peer, ClusterPowerState)
joinCluster joinMsg = liftIO $ do
so <- socket (fam addr) Stream defaultProtocol
connect so addr
sendAll so (encode joinMsg)
sourceSocket so =$= conduitDecode $$ do
response <- await
case response of
Nothing -> fail
$ "Couldn't join a cluster because there was no response "
++ "to our join request!"
Just (JoinOk self cps) ->
return (self, cps)
makeNodeState persistence _ (Recover self cluster) = do
index <- runConduit . transPipe liftIO $
list persistence
=$= CL.fold addIndexRecords Set.empty
let
nodeState = (newNodeState self cluster) {nsIndex = index}
liftIO $ saveCluster persistence self cluster
return (self, nodeState, C.getPeers cluster)
where
addIndexRecords :: (Indexable s, Event e o s)
=> Set IndexRecord
-> (PartitionKey, PartitionPowerState e o s)
-> Set IndexRecord
addIndexRecords index (key, partition) =
let
newRecords =
Set.map
(`IndexRecord` key)
(indexEntries (PS.projectedValue partition))
in Set.union index newRecords
verifyClearPersistence :: (MonadLoggerIO io) => Persistence e o s -> io ()
verifyClearPersistence persistence =
liftIO (runConduit (list persistence =$= CL.head)) >>= \case
Just _ -> do
let
msg :: (IsString a) => a
msg = fromString
$ "We are trying to start up a new peer, but the persistence "
++ "layer already has data in it. This is an invalid state. "
++ "New nodes must be started from a totally clean, empty state."
$(logError) msg
liftIO $ do
hPutStrLn stderr msg
putStrLn msg
error msg
Nothing ->
return ()
joinMsgSource
:: RuntimeSettings
-> Source LIO (JoinRequest, JoinResponse -> LIO ())
joinMsgSource RuntimeSettings {joinBindAddr} = join . lift $
catchAll (do
(chan, so) <- lift $ do
chan <- newChan
so <- socket (fam joinBindAddr) Stream defaultProtocol
setSocketOption so ReuseAddr 1
bind so joinBindAddr
listen so 5
return (chan, so)
forkC "join socket acceptor" $ acceptLoop so chan
return (chanToSource chan)
) (\err -> do
$(logError) . pack
$ "Couldn't start join request service, because of: "
++ show (err :: SomeException)
throwM err
)
where
acceptLoop :: Socket -> Chan (JoinRequest, JoinResponse -> LIO ()) -> LIO ()
acceptLoop so chan =
catchAll (
forever $ do
(conn, _) <- lift $ accept so
logging <- askLoggerIO
(void . lift . forkIO . (`runLoggingT` logging) . logErrors) (
sourceSocket conn
=$= conduitDecode
=$= attachResponder conn
$$ chanToSink chan
)
) (\err -> do
$(logError) . pack
$ "error in join request accept loop: "
++ show (err :: SomeException)
throwM err
)
where
logErrors :: LIO () -> LIO ()
logErrors io = do
result <- try io
case result of
Left err ->
$(logWarn) . pack
$ "Incomming join connection crashed because of: "
++ show (err :: SomeException)
Right v -> return v
attachResponder
:: Socket
-> ConduitM JoinRequest (JoinRequest, JoinResponse -> LIO ()) LIO ()
attachResponder conn = awaitForever (\msg -> do
mvar <- liftIO newEmptyMVar
yield (msg, lift . putMVar mvar)
response <- liftIO $ takeMVar mvar
liftIO $ sendAll conn (encode response)
)
fam :: SockAddr -> Family
fam SockAddrInet {} = AF_INET
fam SockAddrInet6 {} = AF_INET6
fam SockAddrUnix {} = AF_UNIX
fam SockAddrCan {} = AF_CAN
forkLegionary :: (LegionConstraints e o s, MonadLoggerIO io)
=> Persistence e o s
-> RuntimeSettings
-> StartupMode
-> io (Runtime e o)
forkLegionary persistence settings startupMode = do
logging <- askLoggerIO
liftIO . (`runLoggingT` logging) $ do
chan <- liftIO newChan
forkC "main legion thread" $
runLegionary persistence settings startupMode (chanToSource chan)
return Runtime {
rtMakeRequest = \key request -> liftIO $ do
responseVar <- newEmptyMVar
writeChan chan (Request key request (putMVar responseVar))
takeMVar responseVar,
rtSearch =
let
findNext :: SearchTag -> IO (Maybe IndexRecord)
findNext searchTag = do
responseVar <- newEmptyMVar
writeChan chan (SearchDispatch searchTag (putMVar responseVar))
takeMVar responseVar
in findNext
}
data Runtime e o = Runtime {
rtMakeRequest :: PartitionKey -> e -> IO o,
rtSearch :: SearchTag -> IO (Maybe IndexRecord)
}
makeRequest :: (MonadIO io) => Runtime e o -> PartitionKey -> e -> io o
makeRequest rt key = liftIO . rtMakeRequest rt key
search :: (MonadIO io) => Runtime e o -> SearchTag -> Source io IndexRecord
search rt tag =
liftIO (rtSearch rt tag) >>= \case
Nothing -> return ()
Just record@IndexRecord {irTag, irKey} -> do
yield record
search rt (SearchTag irTag (Just irKey))
data RuntimeMessage e o s
= P (PeerMessage e o s)
| R (RequestMsg e o)
| J (JoinRequest, JoinResponse -> LIO ())
| A (AdminMessage e o s)
instance (Show e, Show o, Show s) => Show (RuntimeMessage e o s) where
show (P m) = "(P " ++ show m ++ ")"
show (R m) = "(R " ++ show m ++ ")"
show (J (jr, _)) = "(J (" ++ show jr ++ ", _))"
show (A a) = "(A (" ++ show a ++ "))"
data RuntimeState e o s = RuntimeState {
self :: Peer,
forwarded :: Map MessageId (o -> LIO ()),
nextId :: MessageId,
cm :: ConnectionManager e o s,
commClock :: Map Peer (Maybe UTCTime, Maybe UTCTime),
searches :: Map
SearchTag
(Set Peer, Maybe IndexRecord, [Maybe IndexRecord -> LIO ()])
}
newtype JoinRequest = JoinRequest BSockAddr
deriving (Generic, Show)
instance Binary JoinRequest
data JoinResponse
= JoinOk Peer ClusterPowerState
deriving (Generic)
instance Binary JoinResponse
lookupDelete :: (Ord k) => k -> Map k v -> (Maybe v, Map k v)
lookupDelete = Map.updateLookupWithKey (const (const Nothing))
type RTS e o s =
SM e o s (
StateT (RuntimeState e o s)
LIO)
runRTS
:: Persistence e o s
-> NodeState e o s
-> RuntimeState e o s
-> RTS e o s a
-> LIO (a, NodeState e o s, [ClusterAction e o s], RuntimeState e o s)
runRTS persistence ns rts =
fmap flatten
. (`runStateT` rts)
. runSM persistence ns
where
flatten ((a, b, c), d) = (a, b, c, d)
send :: Peer -> PeerMessagePayload e o s -> RTS e o s MessageId
send target payload = do
rts@RuntimeState {cm, self, nextId} <- lift get
(lift . put) rts {nextId = nextMessageId nextId}
lift2 $ CM.send cm target (PeerMessage self nextId payload)
return nextId
forward :: Peer -> PeerMessage e o s -> RTS e o s ()
forward target message = do
RuntimeState {cm} <- lift get
lift2 $ CM.send cm target message
updateRecvClock :: Peer -> RTS e o s ()
updateRecvClock peer = do
now <- liftIO getCurrentTime
(lift . modify) (\rts@RuntimeState {commClock} ->
let
newCommClock = case Map.lookup peer commClock of
Nothing -> Map.insert peer (Nothing, Just now) commClock
Just (s, _) -> Map.insert peer (s, Just now) commClock
in newCommClock `seq` rts {
commClock = newCommClock
}
)