| 1 | {-# LANGUAGE DeriveDataTypeable #-} |
|---|
| 2 | {-# LANGUAGE ScopedTypeVariables #-} |
|---|
| 3 | |
|---|
| 4 | import Control.Concurrent (forkIO, threadDelay) |
|---|
| 5 | import Control.Concurrent.MVar |
|---|
| 6 | import Control.Exception (handle, IOException) |
|---|
| 7 | import Control.Monad |
|---|
| 8 | import Data.Char (ord) |
|---|
| 9 | import Data.Ratio |
|---|
| 10 | import Data.Time.Clock.POSIX (getPOSIXTime) |
|---|
| 11 | import Foreign.Marshal.Alloc (mallocBytes) |
|---|
| 12 | import Network.Socket |
|---|
| 13 | import System.Console.CmdArgs |
|---|
| 14 | import System.IO |
|---|
| 15 | import System.IO.Unsafe |
|---|
| 16 | |
|---|
| 17 | import qualified Data.ByteString as B |
|---|
| 18 | import qualified Data.ByteString.Unsafe as U |
|---|
| 19 | |
|---|
| 20 | --import qualified Data.ByteString as B |
|---|
| 21 | |
|---|
| 22 | buffSize = 65000 :: Int |
|---|
| 23 | |
|---|
| 24 | data UdpCount = UdpCount { port :: Int |
|---|
| 25 | } deriving (Show, Data, Typeable) |
|---|
| 26 | |
|---|
| 27 | udpCount = cmdArgsMode $ UdpCount |
|---|
| 28 | { port = def &= argPos 0 &= typ "PORT" |
|---|
| 29 | } &= summary "udp v0.1 (c) Itai Zukerman 2011" |
|---|
| 30 | |
|---|
| 31 | counts = unsafePerformIO $ newMVar (0, 0) :: (MVar (Int, Int)) |
|---|
| 32 | timestamp = unsafePerformIO $ newMVar B.empty :: (MVar B.ByteString) |
|---|
| 33 | |
|---|
| 34 | space = B.pack [32] |
|---|
| 35 | eol = B.pack [10] |
|---|
| 36 | |
|---|
| 37 | main = do args <- cmdArgsRun udpCount |
|---|
| 38 | free <- newEmptyMVar |
|---|
| 39 | full <- newEmptyMVar |
|---|
| 40 | forkIO updateTime |
|---|
| 41 | replicateM_ 4 $ forkIO $ workerLoop free full |
|---|
| 42 | forkIO $ recvLoop free full $ port args |
|---|
| 43 | forever $ do threadDelay (10^6) |
|---|
| 44 | c <- readMVar counts |
|---|
| 45 | hPutStrLn stderr $ show c |
|---|
| 46 | |
|---|
| 47 | updateTime = do t <- getPOSIXTime |
|---|
| 48 | let b = t2b t |
|---|
| 49 | swapMVar timestamp b |
|---|
| 50 | threadDelay (10^3) |
|---|
| 51 | updateTime |
|---|
| 52 | where |
|---|
| 53 | t2b t = let i = toRational t |
|---|
| 54 | ms = (numerator i * 1000) `div` (denominator i) |
|---|
| 55 | in B.pack $ map (fromIntegral . ord) (show ms) |
|---|
| 56 | |
|---|
| 57 | recvLoop free full port = do sock <- socket AF_INET Datagram 0 |
|---|
| 58 | bindSocket sock $ SockAddrInet (fromIntegral port) iNADDR_ANY |
|---|
| 59 | forever $ do buff <- takeMVar free |
|---|
| 60 | (size, _) <- retryOnFail $ recvBufFrom sock buff buffSize |
|---|
| 61 | putMVar full (size, buff) |
|---|
| 62 | |
|---|
| 63 | retryOnFail action = handle (\(_ :: IOException) -> retryOnFail action) action |
|---|
| 64 | |
|---|
| 65 | workerLoop free full = do buff <- mallocBytes buffSize |
|---|
| 66 | putMVar free buff |
|---|
| 67 | loop |
|---|
| 68 | where |
|---|
| 69 | loop = do |
|---|
| 70 | (size, buff') <- takeMVar full |
|---|
| 71 | --bs <- U.unsafePackCStringLen (buff', size) |
|---|
| 72 | bs <- B.packCStringLen (buff', size) |
|---|
| 73 | ts <- readMVar timestamp |
|---|
| 74 | let line = B.concat [ts, space, bs, eol] |
|---|
| 75 | --B.hPutStr stdout line |
|---|
| 76 | (n, t) <- takeMVar counts |
|---|
| 77 | putMVar counts $! (n+1, t+size) |
|---|
| 78 | putMVar free buff' |
|---|
| 79 | loop |
|---|