module Network.Server.ScalableServer (
runServer,
RequestPipeline(..), RequestCreator,
RequestProcessor) where
import Network.Socket
import Network.Socket.Enumerator (enumSocket)
import qualified Network.Socket.ByteString as BinSock
import Network.BSD
import Control.Exception (finally, try, throwIO, SomeException)
import Control.Monad (forever, liftM, replicateM, void)
import Control.Monad.Trans (liftIO)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.BoundedChan (newBoundedChan, writeChan, readChan, BoundedChan)
import Data.Enumerator (($$), run_)
import qualified Data.Enumerator as E
import Data.Attoparsec.Enumerator (iterParser)
import Blaze.ByteString.Builder (toByteStringIO, Builder)
import Data.Enumerator (yield, continue, Iteratee, Stream(..))
import qualified Data.Attoparsec as Atto
import qualified Data.Attoparsec.Char8 as AttoC
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString as WS
import qualified Data.ByteString.Lazy.Char8 as B
data RequestPipeline a = RequestPipeline (RequestCreator a) (RequestProcessor a) PipelineSize
type RequestCreator a = Atto.Parser a
type RequestProcessor a = a -> IO Builder
type RequestChannel a = BoundedChan (Maybe a)
type PipelineSize = Int
runServer :: RequestPipeline a -> PortNumber -> IO ()
runServer pipe port = do
proto <- getProtocolNumber "tcp"
s <- socket AF_INET Stream proto
setSocketOption s ReuseAddr 1
bindSocket s (SockAddrInet port iNADDR_ANY)
serverListenLoop pipe s
serverListenLoop :: RequestPipeline a -> Socket -> IO ()
serverListenLoop pipe s = do
listen s 100
finally (
forever $ do
(c, _) <- accept s
forkIO $ connHandler pipe c
) $ sClose s
connHandler :: RequestPipeline a -> Socket -> IO ()
connHandler (RequestPipeline reqParse reqProc size) s = do
chan <- newBoundedChan size
(do
let enum = enumSocket 4096 s
let parser = iterParser reqParse
void $ forkIO $ processRequests chan reqProc s
void $ run_ (enum $$ E.sequence parser $$ requestHandler chan s)
) `finally` ( (writeChan chan Nothing) >> sClose s )
requestHandler :: RequestChannel a -> Socket -> Iteratee a IO ()
requestHandler chan s = do
continue requestConsume
where
requestConsume (Chunks mrs) = do
liftIO $ mapM_ (\m -> writeChan chan $ Just m) mrs
continue requestConsume
requestConsume EOF = do
yield () EOF
processRequests :: RequestChannel a -> RequestProcessor a -> Socket -> IO ()
processRequests chan proc s = do
next <- readChan chan
case next of
Just a -> do
mresp <- try $ proc a
case mresp of
Right resp -> do
toByteStringIO (BinSock.sendAll s) $ resp
processRequests chan proc s
Left (e :: SomeException) -> do
sClose s
throwIO e
Nothing -> return ()