module Network.Legion.Conduit (
chanToSource,
chanToSink,
mergeE,
merge,
) where
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan)
import Control.Monad (void, forever)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Conduit (Source, Sink, ($$), ($=), yield, awaitForever)
import qualified Data.Conduit.List as CL (map)
chanToSource :: (MonadIO io) => Chan a -> Source io a
chanToSource chan = forever $ yield =<< liftIO (readChan chan)
chanToSink :: (MonadIO io) => Chan a -> Sink a io ()
chanToSink chan = awaitForever (liftIO . writeChan chan)
mergeE :: (MonadIO io) => Source IO a -> Source IO b -> Source io (Either a b)
mergeE left right = do
chan <- liftIO newChan
(liftIO . void . forkIO) (left $= CL.map Left $$ chanToSink chan)
(liftIO . void . forkIO) (right $= CL.map Right $$ chanToSink chan)
chanToSource chan
merge :: (MonadIO io) => Source IO a -> Source IO a -> Source io a
merge left right = mergeE left right $= CL.map unEither
where
unEither :: Either a a -> a
unEither (Left a) = a
unEither (Right a) = a