{-# LANGUAGE Rank2Types #-} module Sarsi.Producer where import Codec.Sarsi (Event (..), Level (..), Message (..), putEvent) import Control.Concurrent.Async (async, cancel, wait) import Control.Concurrent.Chan (dupChan, newChan, readChan, writeChan) import Control.Concurrent.MVar (modifyMVar_, newMVar, readMVar) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TQueue (newTQueue, tryReadTQueue, writeTQueue) import Control.Exception (IOException, bracket, tryJust) import Data.Binary.Machine (processPut) import Data.List (foldl') import Data.Machine (ProcessT, prepended, runT_, (<~)) import Data.Machine.Process (takingJusts) import Network.Socket (Socket, accept, bind, close, listen, socketToHandle) import Sarsi (Topic, createSockAddr, createSocket, removeTopic, title) import System.Console.ANSI import System.IO (Handle, IOMode (WriteMode), hClose) import System.IO.Machine (byChunk, sinkHandle, sinkIO, sourceIO) finishPrint :: Int -> Int -> IO () finishPrint :: Int -> Int -> IO () finishPrint Int e Int w = do [SGR] -> IO () setSGR (Int -> Int -> [SGR] forall a a. (Eq a, Eq a, Num a, Num a) => a -> a -> [SGR] sgr Int e Int w) String -> IO () putStr (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String title [SGR] -> IO () setSGR [SGR Reset] String -> IO () putStr (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String ": " String -> IO () putStrLn (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ Event -> String forall a. Show a => a -> String show Event event where sgr :: a -> a -> [SGR] sgr a 0 a 0 = [ConsoleLayer -> ColorIntensity -> Color -> SGR SetColor ConsoleLayer Foreground ColorIntensity Dull Color Green] sgr a 0 a _ = [ConsoleLayer -> ColorIntensity -> Color -> SGR SetColor ConsoleLayer Foreground ColorIntensity Dull Color Yellow] sgr a _ a _ = [ConsoleLayer -> ColorIntensity -> Color -> SGR SetColor ConsoleLayer Foreground ColorIntensity Vivid Color Red] event :: Event event = Int -> Int -> Event Finish Int e Int w finishCreate :: [Event] -> (Int, Int) finishCreate :: [Event] -> (Int, Int) finishCreate [Event] xs = ((Int, Int) -> Event -> (Int, Int)) -> (Int, Int) -> [Event] -> (Int, Int) forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b foldl' (Int, Int) -> Event -> (Int, Int) forall b a. (Num b, Num a) => (a, b) -> Event -> (a, b) f (Int, Int) empty [Event] xs where empty :: (Int, Int) empty = (Int 0, Int 0) f :: (a, b) -> Event -> (a, b) f (a e, b w) (Notify (Message Location _ Level Warning [Text] _)) = (a e, (b w b -> b -> b forall a. Num a => a -> a -> a + b 1)) f (a e, b w) (Notify (Message Location _ Level Error [Text] _)) = ((a e a -> a -> a forall a. Num a => a -> a -> a + a 1), b w) f (a, b) finish Event _ = (a, b) finish produce :: Topic -> (ProcessT IO Event Event -> IO a) -> IO a produce :: Topic -> (ProcessT IO Event Event -> IO a) -> IO a produce Topic t ProcessT IO Event Event -> IO a f = do TQueue (Async ()) conns <- STM (TQueue (Async ())) -> IO (TQueue (Async ())) forall a. STM a -> IO a atomically (STM (TQueue (Async ())) -> IO (TQueue (Async ()))) -> STM (TQueue (Async ())) -> IO (TQueue (Async ())) forall a b. (a -> b) -> a -> b $ STM (TQueue (Async ())) forall a. STM (TQueue a) newTQueue Chan (Maybe Event) chan <- IO (Chan (Maybe Event)) forall a. IO (Chan a) newChan MVar [Event] state <- [Event] -> IO (MVar [Event]) forall a. a -> IO (MVar a) newMVar [] Async Any server <- IO Any -> IO (Async Any) forall a. IO a -> IO (Async a) async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any) forall a b. (a -> b) -> a -> b $ IO Socket -> (Socket -> IO ()) -> (Socket -> IO Any) -> IO Any forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c bracket IO Socket bindSock Socket -> IO () close ((Handle -> IO (Maybe Any)) -> Socket -> IO Any forall a. (Handle -> IO (Maybe a)) -> Socket -> IO a serve (TQueue (Async ()) -> Chan (Maybe Event) -> MVar [Event] -> Handle -> IO (Maybe Any) forall a. TQueue (Async ()) -> Chan (Maybe Event) -> MVar [Event] -> Handle -> IO (Maybe a) process TQueue (Async ()) conns Chan (Maybe Event) chan MVar [Event] state)) Async a feeder <- IO a -> IO (Async a) forall a. IO a -> IO (Async a) async (IO a -> IO (Async a)) -> IO a -> IO (Async a) forall a b. (a -> b) -> a -> b $ ProcessT IO Event Event -> IO a f ((Event -> IO ()) -> SinkIO IO Event forall a (m :: * -> *). (a -> IO ()) -> SinkIO m a sinkIO ((Event -> IO ()) -> SinkIO IO Event) -> (Event -> IO ()) -> SinkIO IO Event forall a b. (a -> b) -> a -> b $ Chan (Maybe Event) -> MVar [Event] -> Event -> IO () feed Chan (Maybe Event) chan MVar [Event] state) a a <- Async a -> IO a forall a. Async a -> IO a wait Async a feeder [Event] es <- MVar [Event] -> IO [Event] forall a. MVar a -> IO a readMVar MVar [Event] state let (Int errs, Int warns) = [Event] -> (Int, Int) finishCreate [Event] es Chan (Maybe Event) -> Maybe Event -> IO () forall a. Chan a -> a -> IO () writeChan Chan (Maybe Event) chan (Maybe Event -> IO ()) -> Maybe Event -> IO () forall a b. (a -> b) -> a -> b $ Event -> Maybe Event forall a. a -> Maybe a Just (Event -> Maybe Event) -> Event -> Maybe Event forall a b. (a -> b) -> a -> b $ Int -> Int -> Event Finish Int errs Int warns Chan (Maybe Event) -> Maybe Event -> IO () forall a. Chan a -> a -> IO () writeChan Chan (Maybe Event) chan (Maybe Event -> IO ()) -> Maybe Event -> IO () forall a b. (a -> b) -> a -> b $ Maybe Event forall a. Maybe a Nothing Int -> Int -> IO () finishPrint Int errs Int warns TQueue (Async ()) -> IO () waitFinish TQueue (Async ()) conns Async Any -> IO () forall a. Async a -> IO () cancel Async Any server Topic -> IO () removeTopic Topic t a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return a a where bindSock :: IO Socket bindSock = do Socket sock <- IO Socket createSocket SockAddr addr <- Topic -> IO SockAddr createSockAddr Topic t Socket -> SockAddr -> IO () bind Socket sock SockAddr addr Socket -> Int -> IO () listen Socket sock Int 1 Socket -> IO Socket forall (m :: * -> *) a. Monad m => a -> m a return Socket sock process :: TQueue (Async ()) -> Chan (Maybe Event) -> MVar [Event] -> Handle -> IO (Maybe a) process TQueue (Async ()) conns Chan (Maybe Event) chan' MVar [Event] state Handle h = do Chan (Maybe Event) chan <- Chan (Maybe Event) -> IO (Chan (Maybe Event)) forall a. Chan a -> IO (Chan a) dupChan Chan (Maybe Event) chan' [Event] es <- MVar [Event] -> IO [Event] forall a. MVar a -> IO a readMVar MVar [Event] state Async () conn <- IO () -> IO (Async ()) forall a. IO a -> IO (Async a) async (IO () -> IO (Async ())) -> IO () -> IO (Async ()) forall a b. (a -> b) -> a -> b $ do MachineT IO Any Any -> IO () forall (m :: * -> *) (k :: * -> *) b. Monad m => MachineT m k b -> m () runT_ (MachineT IO Any Any -> IO ()) -> MachineT IO Any Any -> IO () forall a b. (a -> b) -> a -> b $ IODataMode ByteString -> Handle -> SinkIO IO ByteString forall a (m :: * -> *). IODataMode a -> Handle -> SinkIO m a sinkHandle IODataMode ByteString forall a. IOData a => IODataMode a byChunk Handle h ProcessT IO ByteString Any -> MachineT IO Any ByteString -> MachineT IO Any Any forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (Event -> Put) -> ProcessT IO Event ByteString forall (m :: * -> *) a. Monad m => (a -> Put) -> ProcessT m a ByteString processPut Event -> Put putEvent ProcessT IO Event ByteString -> MachineT IO Any Event -> MachineT IO Any ByteString forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ ([Event] -> Process Event Event forall (f :: * -> *) a. Foldable f => f a -> Process a a prepended ([Event] -> Process Event Event) -> [Event] -> Process Event Event forall a b. (a -> b) -> a -> b $ [Event] -> [Event] forall a. [a] -> [a] reverse [Event] es) ProcessT IO Event Event -> MachineT IO Any Event -> MachineT IO Any Event forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ MachineT IO (Is (Maybe Event)) Event forall a. Process (Maybe a) a takingJusts MachineT IO (Is (Maybe Event)) Event -> MachineT IO Any (Maybe Event) -> MachineT IO Any Event forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (IO (Maybe Event) -> SourceIO IO (Maybe Event) forall a (m :: * -> *). IO a -> SourceIO m a sourceIO (IO (Maybe Event) -> SourceIO IO (Maybe Event)) -> IO (Maybe Event) -> SourceIO IO (Maybe Event) forall a b. (a -> b) -> a -> b $ Chan (Maybe Event) -> IO (Maybe Event) forall a. Chan a -> IO a readChan Chan (Maybe Event) chan) Handle -> IO () hClose Handle h STM () -> IO () forall a. STM a -> IO a atomically (STM () -> IO ()) -> STM () -> IO () forall a b. (a -> b) -> a -> b $ TQueue (Async ()) -> Async () -> STM () forall a. TQueue a -> a -> STM () writeTQueue TQueue (Async ()) conns Async () conn Maybe a -> IO (Maybe a) forall (m :: * -> *) a. Monad m => a -> m a return Maybe a forall a. Maybe a Nothing feed :: Chan (Maybe Event) -> MVar [Event] -> Event -> IO () feed Chan (Maybe Event) chan MVar [Event] state Event e = do MVar [Event] -> ([Event] -> IO [Event]) -> IO () forall a. MVar a -> (a -> IO a) -> IO () modifyMVar_ MVar [Event] state (([Event] -> IO [Event]) -> IO ()) -> ([Event] -> IO [Event]) -> IO () forall a b. (a -> b) -> a -> b $ case Event e of (Start Text _) -> IO [Event] -> [Event] -> IO [Event] forall a b. a -> b -> a const (IO [Event] -> [Event] -> IO [Event]) -> IO [Event] -> [Event] -> IO [Event] forall a b. (a -> b) -> a -> b $ [Event] -> IO [Event] forall (m :: * -> *) a. Monad m => a -> m a return [Event e] Event _ -> [Event] -> IO [Event] forall (m :: * -> *) a. Monad m => a -> m a return ([Event] -> IO [Event]) -> ([Event] -> [Event]) -> [Event] -> IO [Event] forall b c a. (b -> c) -> (a -> b) -> a -> c . (:) Event e Chan (Maybe Event) -> Maybe Event -> IO () forall a. Chan a -> a -> IO () writeChan Chan (Maybe Event) chan (Maybe Event -> IO ()) -> Maybe Event -> IO () forall a b. (a -> b) -> a -> b $ Event -> Maybe Event forall a. a -> Maybe a Just Event e waitFinish :: TQueue (Async ()) -> IO () waitFinish TQueue (Async ()) conns = do Maybe (Async ()) conn <- STM (Maybe (Async ())) -> IO (Maybe (Async ())) forall a. STM a -> IO a atomically (STM (Maybe (Async ())) -> IO (Maybe (Async ()))) -> STM (Maybe (Async ())) -> IO (Maybe (Async ())) forall a b. (a -> b) -> a -> b $ TQueue (Async ()) -> STM (Maybe (Async ())) forall a. TQueue a -> STM (Maybe a) tryReadTQueue TQueue (Async ()) conns Either () () _ <- (IOException -> Maybe ()) -> IO () -> IO (Either () ()) forall e b a. Exception e => (e -> Maybe b) -> IO a -> IO (Either b a) tryJust IOException -> Maybe () io (IO () -> IO (Either () ())) -> IO () -> IO (Either () ()) forall a b. (a -> b) -> a -> b $ IO () -> (Async () -> IO ()) -> Maybe (Async ()) -> IO () forall b a. b -> (a -> b) -> Maybe a -> b maybe (() -> IO () forall (m :: * -> *) a. Monad m => a -> m a return ()) Async () -> IO () forall a. Async a -> IO a wait Maybe (Async ()) conn () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () where io :: IOException -> Maybe () io :: IOException -> Maybe () io IOException _ = () -> Maybe () forall a. a -> Maybe a Just () serve :: (Handle -> IO (Maybe a)) -> Socket -> IO a serve :: (Handle -> IO (Maybe a)) -> Socket -> IO a serve Handle -> IO (Maybe a) f Socket sock = IO Handle -> (Handle -> IO ()) -> (Handle -> IO a) -> IO a forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c bracket IO Handle acceptHandle Handle -> IO () hClose Handle -> IO a process where acceptHandle :: IO Handle acceptHandle = do (Socket conn, SockAddr _) <- Socket -> IO (Socket, SockAddr) accept Socket sock Handle h <- Socket -> IOMode -> IO Handle socketToHandle Socket conn IOMode WriteMode Handle -> IO Handle forall (m :: * -> *) a. Monad m => a -> m a return Handle h process :: Handle -> IO a process Handle h = do Maybe a a <- Handle -> IO (Maybe a) f Handle h IO a -> (a -> IO a) -> Maybe a -> IO a forall b a. b -> (a -> b) -> Maybe a -> b maybe ((Handle -> IO (Maybe a)) -> Socket -> IO a forall a. (Handle -> IO (Maybe a)) -> Socket -> IO a serve Handle -> IO (Maybe a) f Socket sock) a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return Maybe a a