module Network.Server.ScalableServer ( -- * Introduction -- $intro runServer, RequestPipeline(..), RequestCreator, RequestProcessor) where import Network.Socket import Network.Socket.Enumerator (enumSocket) import qualified Network.Socket.ByteString as BinSock import Network.BSD import Control.Exception (finally, try, throwIO, SomeException) import Control.Monad (forever, liftM, replicateM, void) import Control.Monad.Trans (liftIO) import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.BoundedChan (newBoundedChan, writeChan, readChan, BoundedChan) import Data.Enumerator (($$), run_) import qualified Data.Enumerator as E import Data.Attoparsec.Enumerator (iterParser) import Blaze.ByteString.Builder (toByteStringIO, Builder) import Data.Enumerator (yield, continue, Iteratee, Stream(..)) import qualified Data.Attoparsec as Atto import qualified Data.Attoparsec.Char8 as AttoC import qualified Data.ByteString.Char8 as S import qualified Data.ByteString as WS import qualified Data.ByteString.Lazy.Char8 as B -- $intro -- -- 'ScalableServer' is a library that attempts to capture current best -- practices for writing fast/scalable socket servers in Haskell. -- -- Currently, that involves providing the right glue for hooking up -- to enumerator/attoparsec-enumerator/blaze-builder and network-bytestring -- -- It provides a relatively simple parse/generate toolchain for -- plugging into these engines -- -- Servers written using this library support "pipelining"; that is, a client -- can issue many requests serially before the server has responded to the -- first -- -- Server written using this library also can be invoked with +RTS -NX -- invocation for multicore support -- |The 'RequestPipeline' acts as a specification for your service, -- indicating both a parser/request object generator, the RequestCreator, -- and the processor of these requests, one that ultimately generates a -- response expressed by a blaze 'Builder' data RequestPipeline a = RequestPipeline (RequestCreator a) (RequestProcessor a) PipelineSize -- |The RequestCreator is an Attoparsec parser that yields some request -- object 'a' type RequestCreator a = Atto.Parser a -- |The RequestProcessor is a function in the IO monad (for DB access, etc) -- that returns a builder that can generate the response type RequestProcessor a = a -> IO Builder type RequestChannel a = BoundedChan (Maybe a) type PipelineSize = Int -- |Given a pipeline specification and a port, run TCP traffic using the -- pipeline for parsing, processing and response. -- -- Note: there is currently no way for a server to specific the socket -- should be disconnected runServer :: RequestPipeline a -> PortNumber -> IO () runServer pipe port = do proto <- getProtocolNumber "tcp" s <- socket AF_INET Stream proto setSocketOption s ReuseAddr 1 bindSocket s (SockAddrInet port iNADDR_ANY) serverListenLoop pipe s serverListenLoop :: RequestPipeline a -> Socket -> IO () serverListenLoop pipe s = do listen s 100 finally ( forever $ do (c, _) <- accept s forkIO $ connHandler pipe c ) $ sClose s connHandler :: RequestPipeline a -> Socket -> IO () connHandler (RequestPipeline reqParse reqProc size) s = do chan <- newBoundedChan size (do let enum = enumSocket 4096 s let parser = iterParser reqParse void $ forkIO $ processRequests chan reqProc s void $ run_ (enum $$ E.sequence parser $$ requestHandler chan s) ) `finally` ( (writeChan chan Nothing) >> sClose s ) requestHandler :: RequestChannel a -> Socket -> Iteratee a IO () requestHandler chan s = do continue requestConsume where requestConsume (Chunks mrs) = do liftIO $ mapM_ (\m -> writeChan chan $ Just m) mrs continue requestConsume requestConsume EOF = do yield () EOF processRequests :: RequestChannel a -> RequestProcessor a -> Socket -> IO () processRequests chan proc s = do next <- readChan chan case next of Just a -> do mresp <- try $ proc a case mresp of Right resp -> do toByteStringIO (BinSock.sendAll s) $ resp processRequests chan proc s Left (e :: SomeException) -> do sClose s throwIO e Nothing -> return ()