module Network.Server.ScalableServer (runServer, RequestPipeline(..), RequestCreator, RequestProcessor) where import Network.Socket import qualified Network.Socket.ByteString as BinSock import Network.BSD import Control.Exception (finally) import Control.Monad (forever, liftM, replicateM, void) import Control.Monad.Trans (liftIO) import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.STM (newTChanIO, writeTChan, readTChan, atomically, TChan) import Data.Enumerator (($$), run_) import qualified Data.Enumerator as E import Data.Enumerator.Binary (enumHandle) import Data.Attoparsec.Enumerator (iterParser) import System.IO (hSetBuffering, hClose, BufferMode(..), IOMode(..), Handle) import Blaze.ByteString.Builder (toByteStringIO, Builder) import Data.Enumerator (yield, continue, Iteratee, Stream(..)) import qualified Data.Attoparsec as Atto import qualified Data.Attoparsec.Char8 as AttoC import qualified Data.ByteString.Char8 as S import qualified Data.ByteString as WS import qualified Data.ByteString.Lazy.Char8 as B -- 'ScalableServer' is a library that attempts to capture current best -- practices for writing fast/scalable socket servers in Haskell. -- -- Currently, that involves providing the right glue for hooking up -- to enumerator/attoparsec-enumerator/blaze-builder and network-bytestring -- -- It provides a relatively simple parse/generate toolchain for -- plugging into these engines -- -- Servers written using this library support "pipelining"; that is, a client -- can issue many requests serially before the server has responded to the -- first -- -- Server written using this library also can be invoked with +RTS -NX -- invocation for multicore support -- |The 'RequestPipeline' acts as a specification for your service, -- indicating both a parser/request object generator, the RequestCreator, -- and the processor of these requests, one that ultimately generates a -- response expressed by a blaze 'Builder' data RequestPipeline a = RequestPipeline (RequestCreator a) (RequestProcessor a) -- |The RequestCreator is an Attoparsec parser that yields some request -- object 'a' type RequestCreator a = Atto.Parser a -- |The RequestProcessor is a function in the IO monad (for DB access, etc) -- that returns a builder that can generate the response type RequestProcessor a = a -> IO Builder type RequestChannel a = TChan (Maybe a) -- |Given a pipeline specification and a port, run TCP traffic using the -- pipeline for parsing, processing and response. -- -- Note: there is currently no way for a server to specific the socket -- should be disconnected runServer :: RequestPipeline a -> PortNumber -> IO () runServer pipe port = do proto <- getProtocolNumber "tcp" s <- socket AF_INET Stream proto setSocketOption s ReuseAddr 1 bindSocket s (SockAddrInet port iNADDR_ANY) serverListenLoop pipe s serverListenLoop :: RequestPipeline a -> Socket -> IO () serverListenLoop pipe s = do listen s 100 forever $ do (c, _) <- accept s h <- socketToHandle c ReadMode hSetBuffering h NoBuffering forkIO $ connHandler pipe c h connHandler :: RequestPipeline a -> Socket -> Handle -> IO () connHandler (RequestPipeline reqParse reqProc) s h = do chan <- newTChanIO (do let enum = enumHandle 32768 h let parser = iterParser reqParse void $ forkIO $ processRequests chan reqProc s void $ run_ (enum $$ E.sequence parser $$ requestHandler chan s) ) `finally` ( (atomically $ writeTChan chan Nothing) >> hClose h ) requestHandler :: RequestChannel a -> Socket -> Iteratee a IO () requestHandler chan s = do continue requestConsume where requestConsume (Chunks mrs) = do liftIO $ mapM_ (\m -> atomically $ writeTChan chan $ Just m) mrs continue requestConsume requestConsume EOF = do yield () EOF processRequests :: RequestChannel a -> RequestProcessor a -> Socket -> IO () processRequests chan proc s = do next <- atomically $ readTChan chan case next of Just a -> do resp <- proc a -- XXX handle exceptions? toByteStringIO (BinSock.sendAll s) $ resp processRequests chan proc s Nothing -> return ()