\ignore{ \begin{code}
{-#OPTIONS -cpp #-}
{-#LANGUAGE OverloadedStrings,MultiParamTypeClasses,DeriveGeneric,BangPatterns #-}
{-|
Module      :  Parry.Server
Copyright   :  (c) Pierre-Étienne Meunier 2014
License     :  GPL-3
Maintainer  :  pierre-etienne.meunier@lif.univ-mrs.fr
Stability   :  experimental
Portability :  All

Tools to build synchronization servers. For instance, to write a simple
server with just a web interface on port 8000, you would use:

> import Control.Concurrent
> import Parry.Server
> import Parry.WebUI
>
> main::IO ()
> main=do
>   state<-initState initial
>   _<-forkIO $ webUI 8000 state
>   server (defaultConfig public) state
-}
\end{code} } We now proceed to the proof of the server. The file is included for the sake of completeness. In particular, remark that the whole state of the server is defined as a single data type called {\tt State}. We will use this fact to prove invariants on the whole server state. \vspace{1em} \begin{code}
module Parry.Server (
  -- * Jobs on the server side
  Exhaustive(..),
  Result(..),
  -- * Server's internal state
  initState,
  stateFromFile,
  saveThread,
  State(..),
  -- * Server configuration and functions
  Config(..),defaultConfig,server
  ) where
import Control.Concurrent
import Control.Exception as E
import Control.Monad
import Control.Concurrent.MSem as Sem
import Network
import System.IO
import System.Directory
import Data.List
import Data.Time.Format()
import Data.Time.Clock.POSIX
#ifdef UNIX
import System.Posix.Signals
#endif

import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import qualified Data.Map as M
import qualified Data.Set as S
import GHC.Generics
import Data.Binary

import Codec.Crypto.RSA.Pure

import Parry.Protocol
import Parry.Util

-- | The class of jobs and job results that Parry can deal with. For
-- efficiency and to keep types simple, jobs and results are stored in
-- a single type.
class Exhaustive j where
  -- | Indication of the depth of a job in the explored tree. The server sends
  -- the least deep jobs first, as an optimization of network use.
  depth :: j->Int
  -- | Number of times a job has been killed. When a job is killed,
  -- either because it must be reshared, or because the client itself
  -- was killed, it is scheduled to be re-executed by the server.
  killed::j->Int
  -- | Called each time a job needs to be killed. For better resharing,
  -- this function must verify @killed (kill j) >= killed j@.
  kill::j->j

-- | The class of results, and how to combine them in the server state.
class Result j r where
  -- | A function to tell how to combine job results. That function will be
  -- called on the hostname of the reporting client, with the finished job it
  -- sent, and the current result from the server state.
  addResult::HostName->r->j->r

-- | This type is exposed mostly for writing alternative user interfaces.
-- Other operations must be done using the functions in this module, or
-- the correction of the protocol can be lost.
data State j r=State {
  -- | Available jobs
  jobs::S.Set (Int,j),
  -- | Map from the machine id to its hostname, its current job, its
  -- starting time, the last time we heard from it.
  ongoing::M.Map Integer (HostName,PublicKey,j,Double,Double),
  -- | Set of unemployed machines
  unemployed::S.Set Integer,
  -- | The results.
  results::r,
  -- | The smallest available machine id. In a run of the server, it
  -- is guaranteed that are never assigned the same.
  newId::Integer,
  -- | Total number of jobs killed from the beginning (for benchmarking purposes).
  killings::Int,
  -- | Number of jobs finished (for benchmarking purposes).
  solved::Integer,
  -- | The list of authorized RSA public keys.
  authorizedKeys::[PublicKey]
  } deriving (Show,Read,Generic)

instance (Binary j,Binary r)=>Binary (State j r)
\end{code} \hfill \begin{definition} In a server state {\tt st}, the \emph{current job} of a client is the job registered in the {\tt ongoing} field of {\tt st}. \end{definition} \begin{definition} \label{def:valid} We call a client \emph{valid} if, at the same time: \begin{enumerate} \item \label{valid:newjobs} Its {\tt NewJobs} messages contain all the results in subjobs of its current job that have been completely explored, and the subjobs of its current job that have not been completely explored, divided into three fields: the results it has found, its next current job, and other subjobs. \item \label{valid:jobdone} It does not send a {\tt JobDone} message before the task representing its current job is completely explored. \end{enumerate} \end{definition} The main function, {\tt answer}, keeps track of the clients. We now prove the following Lemma: \begin{lemma} \label{lem:answer} If {\tt st} is a state of the server containing (in the union of {\tt job st} and {\tt ongoing st}) jobs representing all the tasks that have not yet been explored, and for any job {\tt j}, {\tt j} and {\tt kill j} represent the same task, then for any message {\tt m} sent by a valid client, all values of {\tt host} and {\tt time}, {\tt answer host time st m} (the Haskell syntax for ``the value of function {\tt answer}, called with arguments {\tt t}, {\tt host}, {\tt st} and {\tt m}'') is a couple $(\mathtt{st'},\mathtt{m'})$, where {\tt st'} is a state of the server containing the roots of all subtrees that have not yet been explored ({\tt m'} is the message to be sent to the client). Moreover, all results sent by the clients are added to the server state using the {\tt addResult} function. \begin{proof} We prove it for all the cases. \begin{code}
answer::(Exhaustive j,Result j r,Eq j,Ord j,Binary j)=>
        Double->String->State j r->ClientMessage j
        ->(State j r,ServerMessage j)
answer t host st (GetJob num key)=
  case M.lookup num (ongoing st) of
    Just (ho,key0,j0,t0,_)->
\end{code} If the client is registered as an ``ongoing'' job, we can simply send it the job it is supposed to be working on. In this case, the invariant is still maintained, as we do not change its recorded current job (here, we only update the time at which we last saw this client). \begin{code}
      if ho==host && key0==key then
        (st { ongoing=M.insert num (ho,key,j0,t0,t) (ongoing st) },
         Job (not $ S.null $ unemployed st) j0)
      else
        (st,Die)
    Nothing->
\end{code} Else, client {\tt num} is not in the map of ongoing jobs. If there are no more jobs to be done: \begin{itemize} \item if there are no more jobs being worked on, we do not modify the state, and we tell the client to stop (with a {\tt Finished} message). \item else, we simply record that job as ``unemployed''. The next time a client reports its state, it will be asked to share its current job. This does not change the jobs registered in the server's state anyway. \end{itemize} \begin{code}
      if S.null (jobs st) then
        if M.null (ongoing st) then
          (st,Finished)
        else
          (st { unemployed=S.insert num (unemployed st) },Die)
\end{code} Else, if there are still jobs to be done, we pick any such job (using {\tt S.deleteFindMin}). According to the documentation of Haskell's {\tt Data.Map} module, $\mathtt{jobs\ st}$ is equal to $\{\mathtt{h}\}\cup\mathtt{nextJobs}$. Therefore, since {\tt num} is not a member of {\tt ongoing st}, the returned state contains, in the union of its {\tt ongoing} and {\tt jobs} fields, exactly the same jobs as in {\tt st}. \begin{code}
      else
        let ((_,h),nextJobs)=S.deleteFindMin (jobs st)
            shareIt=killed h>0
        in
         (st { jobs=nextJobs,
               unemployed=S.delete num (unemployed st),
               ongoing=M.insert num (host,key,h,t,t) (ongoing st) },
          Job shareIt h)
\end{code} \hfill Another message the server can receive is the {\tt NewJobs} message, when clients reshare their work: In this case, the client sends its number {\tt num}, the initial job {\tt initialJob} it was given, the new job {\tt job} that it will now work on, a list {\tt js} of jobs that need to be shared, and a list of results. We can think of this message as equivalent to \emph{``I, valid client {\tt num}, hereby RSA-certify that job {\tt currentJob j} you gave me has subjobs {\tt newJobs j}, and results {\tt results j}''}. \hfill \begin{code}
answer t host st j@(NewJobs {})=
  case M.lookup (clientId j) (ongoing st) of
    Nothing->(st,Die)
    Just (ho,key,j0,t0,_)->
      if host==ho && j0==currentJob j then
        (st { jobs=foldl' (\s x->S.insert (depth x,x) s) (jobs st) (newJobs j),
              ongoing=M.insert (clientId j)
                      (host,key,nextJob j,t0,t)
                      (ongoing st),
              results=foldl' (addResult host) (results st) (jobResults j) }, Ack)
      else
        (st, Die)
\end{code} \hfill If the client is not registered as an ``ongoing job'', this message is ignored, the state is not modified, and the client is sent the {\tt Die} message. Else, we assumed that this {\tt NewJobs} message can only be sent by a valid client. Therefore, it contains all subjobs of its current job that have not been explored, along with the job it will start working on, and the list of all results that have been found during the exploration of the other subjobs of its current job. Since all these subjobs are stored in the {\tt jobs} field of the state, and the {\tt ongoing} field is updated with the client's new current job, our claim still holds. \hfill \begin{code}
answer _ _ st j@(JobDone {})=
  case M.lookup (clientId j) (ongoing st) of
    Nothing->(st,Die);
    Just (host,_,j0,_,_)->
      if j0==currentJob j then
        (st { ongoing=M.delete (clientId j) (ongoing st),
              results=foldl' (addResult host) (results st) (jobResults j),
              solved=solved st+1 }, Ack)
      else
        (st,Die)
\end{code} \hfill In this case, if the client is not registered as an ongoing job, we do not modify the state. Else, we can safely delete the corresponding job from the state, and add its results to the state's results field: indeed, since we assumed that this message is sent by a valid client, that job has been explored completely. The intuitive version of this message is \emph{``I, valid client {\tt num}, hereby RSA-certify that I have explored job {\tt currentJob j} completely, and that it contains exactly results {\tt results j}''}. \hfill The last case of {\tt answer} is when the client sends an ``Alive'' message: \begin{code}
answer t host st (Alive num)=
  case M.lookup num (ongoing st) of
    Nothing->(st,Die);
    Just (ho,key,j,t0,_)->
      if ho==host && (S.null (unemployed st) || (not $ S.null $ jobs st)) then
        (st { ongoing=M.insert num (ho,key,j,t0,t) (ongoing st) },Ack)
      else
        (st,Die)
\end{code} \hfill In this case, the set of jobs is not modified, and hence our claim holds. \end{proof} \end{lemma} Our next task is to prove {\tt reply}, the network interface to the {\tt answer} function. We first need hypotheses on how this interface works, and especially how the messages are written and read at the ends of the connection. \begin{definition} A client is \emph{fluent} if the messages it sends on the network are of exactly two kinds: \begin{itemize} \item Messages with a single line containing exactly {\tt Hello}. \item Messages with two lines: \begin{itemize} \item the first line is the encoding via {\tt encode16l} of $m$, where $m$ is the encoding via {\tt encode} of a constructor of the {\tt ClientMessage} type. \item the second line is the RSA signature, using the client's private key, of $m$. \end{itemize} \end{itemize} \end{definition} \begin{lemma} \label{lem:reply} If all the clients that have their public key in {\tt authorizedKeys st}, where {\tt st} is the state of the server, are valid and fluent, and {\tt st} contains all the jobs that have not been completely explored (in the {\tt ongoing} and {\tt jobs} fields), then so does it after one run of {\tt reply}, assuming that $\mathtt{decode}\circ\mathtt{encode}$ (from Haskell's {\tt Data.Binary} module) is the identity, and $\mathtt{decode16}\circ\mathtt{encode16l}$ (from module {\tt Parry.Util}) is the identity. \begin{proof} We prove this invariant on the code of the {\tt reply} function, which handles every connection to our server.\vspace{1em} \begin{code}
reply::(Binary j,Exhaustive j,Result j r,Ord j)=>
       MVar (State j r) -> Handle -> HostName -> IO ()
reply state rhandle host=(do
  l<-B.hGetLine rhandle
  if l==B.pack "Hello" then
    modifyMVar_ state $ \st->do
      LB.hPutStrLn rhandle $ encode16l $ encode $ newId st
      return $ st { newId=newId st+1 }
\end{code} \hfill When the first line is the initial {\tt Hello} message, the claim holds: indeed, the only field of the server state that is modified is the {\tt newId} one, which represents the first unused client number. In all other cases, we do the following: \hfill \begin{code}
    else do
    st<-withMVar state return
    sig<-B.hGetLine rhandle
    let dec=LB.fromStrict $ decode16 l
        msg=decode dec
        num=case msg of
          GetJob x _->x
          JobDone x _ _->x
          NewJobs x _ _ _ _->x
          Alive x->x
        key=case msg of
          GetJob _ pub->
            if any (==pub) (authorizedKeys st) then
              Just pub
            else
              Nothing
          _->(case M.lookup num $ ongoing st of
                 Just (_,pub,_,_,_)->Just pub
                 Nothing->Nothing)
\end{code} \hfill We will now verify the message signature, using either the public key registered for this client in the {\tt ongoing} field of the server's state, or the public key sent by the client itself, in the case of the {\tt GetJob} message (if that key is registered in the {\tt authorizedKeys} field of the server state): \hfill \begin{code}
    message<-case key of
      Nothing->return Die
      Just pub->
        case verify pub dec (LB.fromStrict $ decode16 sig) of
          Right True->do
            t<-getPOSIXTime
            modifyMVar state $ \st0->
              let (!a,!b)=answer (realToFrac t) host st0 msg in
              return (a,b)
          _->return Die
    LB.hPutStrLn rhandle (encode16l $ encode message)
  )
\end{code} \hfill Since we assumed that $\mathtt{decode}\circ\mathtt{encode}$ and $\mathtt{decode16}\circ\mathtt{encode16l}$ are both the identity function, variable {\tt msg} contains the message sent by the client. Because the client is valid (because its public key is in the {\tt authorizedKeys} field of the server state), we can conclude using Lemma \ref{lem:answer} that the invariant is maintained by the {\tt reply} function, because the only call modifying the state is a call to {\tt answer}. \end{proof} \end{lemma} The last piece of server code that we need to prove is the {\tt cleanupThread} function, whose aim is to collect all dead machines. We do need this function, especially on standard clusters with small walltimes compared to the task. \begin{lemma} \label{lem:cleanupThread} If the {\tt kill} function, defined on jobs, does not change the task represented by the job, and {\tt state} is a server state containing (in the {\tt ongoing} and {\tt jobs} fields) all the jobs that have not been explored, then so is it after one run of {\tt cleanupThread state}. \begin{proof} In the following function: the state is only modified by partitionning the {\tt ongoing st} map into two maps {\tt a} and {\tt b}, and adding all the jobs of {\tt a} to the {\tt jobs st} set, possibly calling {\tt kill} on some of them. Therefore, the tasks represented by jobs in the union of {\tt jobs st} and {\tt ongoing st} is not modified. \hfill \begin{code}
cleanupThread::(Ord j,Exhaustive j)=>MVar (State j r)->IO ()
cleanupThread state=do {
  t_<-getPOSIXTime;
  let { t=realToFrac t_ };
  modifyMVar_ state $ \st0->do {
    -- Find machines that have not given news for more than 10 minutes.
    let { (a,b)=M.partition (\(_,_,_,_,t1)->(t-t1) > 600) (ongoing st0);
          st=st0 { jobs=
                      M.foldl' (\set (_,_,job,t0,_)->
                                 S.insert (depth job,
                                           if t-t0 > 3600 then
                                             kill job
                                           else job) set)
                      (jobs st0)
                      a,
                   ongoing=b }
        };
    return st
    };
  -- Sleep 30 seconds, and clean again.
  threadDelay 30000000;
  cleanupThread state
  }
\end{code} \end{proof} \end{lemma} Finally, the entry point to our server library is the {\tt server} function: \begin{lemma} \label{lem:server} If: \begin{itemize} \item all tasks that have not been completely explored have job representants in the {\tt ongoing} and {\tt jobs} fields of the {\tt state} argument to {\tt server}, \item all clients that sign their messages with a private RSA key whose corresponding public key is in the {\tt state} variable are valid and fluent, \item $\mathtt{decode}\circ\mathtt{encode}$ and $\mathtt{decode16}\circ\mathtt{encode16l}$ are both the identity function, \end{itemize} then after any number of messages received by the server, variable {\tt state} also contains jobs representing tasks that have not been completely explored, in the union of its {\tt ongoing} and {\tt jobs} fields. \begin{proof} Clearly, everything {\tt server} does is calling functions that maintain this invariant, by Lemmas \ref{lem:reply} for {\tt reply} and \ref{lem:cleanupThread} for {\tt cleanupThread}. \begin{code}
-- | Starts the synchronization server.
server::(Ord j, Binary j, Exhaustive j, Result j r)=>
        Config->MVar (State j r)->IO ()
server config state=withSocketsDo $ do {
#ifdef UNIX
  installHandler sigPIPE Ignore Nothing;
#endif
  threads<-Sem.new $ maxThreads config;
  _<-forkIO $ cleanupThread state;
  forever $ do {
    E.catch (bracket (listenOn (port config)) sClose $
             \sock->forever $ do {
               bracket (do { (s,a,_)<-accept sock; wait threads; return (s,a) })
               (\(s,_)->do { signal threads; hClose s})
               (\(s,a)->reply state s a)
               })
    (\e->let _=e::SomeException in appendFile (logFile config) (show e++"\n"));
    threadDelay 100000;
    };
  }
\end{code} \end{proof} \end{lemma} \ignore{ \begin{code}
-- | Server configuration
data Config=Config {
  -- | The network port the synchronization server will listen on.
  port::PortID,
  -- | The maximal number of simultaneous threads that can be launched.
  maxThreads::Int,
  -- | Log file
  logFile::FilePath
  }

-- | Default server configuration, matching the client. Note that you
-- must provide your own public key for signing the messages.

defaultConfig::Config
defaultConfig=Config { port=PortNumber 5129, maxThreads=20, logFile="parry.err" }

-- | Creates a valid server state from an initial job.
initState::(Exhaustive j,Ord j,Result j r)=>[j]->r->IO (MVar (State j r))
initState initial r0=
  newMVar $ State { jobs=foldl (\s j->S.insert (depth j,j) s) S.empty initial,
                    ongoing=M.empty,
                    results=r0,
                    unemployed=S.empty,
                    newId=0,
                    killings=0,solved=0,
                    authorizedKeys=[] };
\end{code} \begin{code}
-- | Reads initial state from a file, or calls 'initState' if the file does
-- not exist.
stateFromFile::(Binary r,Binary j,Exhaustive j, Result j r,Ord j)=>FilePath->[j]->r->IO (MVar (State j r))
stateFromFile f initial r0=do {
  e<-doesFileExist f;
  state<-if e then do {
    st<-decodeFile f;
    newMVar st
    } else initState initial r0;
  return state
  }

-- | Saves state to the given file with the given periodicity, in
-- microseconds. This function does not return, so calling it inside a
-- 'Control.Concurrent.forkIO' is probably the best thing to do.
saveThread::(Binary r,Binary j)=>FilePath->Int->MVar (State j r)->IO ()
saveThread f del state=
  let { save=do {
           e<-doesFileExist f;
           if e then renameFile f (f++".last") else return ();
           withMVar state $ \st->LB.writeFile f $ encode st;
           threadDelay del;
           save
           }}
  in save
\end{code} }