module Data.Conduit.FoldDebounceSpec (main, spec) where import Test.Hspec import Control.Applicative ((<$>)) import Control.Concurrent (threadDelay, myThreadId) import Control.Concurrent.STM (atomically, TVar, newTVarIO, writeTVar, readTVar, retry) import Control.Monad (forM_, void) import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans.Resource (ResourceT, runResourceT, register) import Data.Conduit (Source, ConduitM, ($$), yield, addCleanup) import qualified Data.Conduit.FoldDebounce as F import qualified Data.Conduit.List as CL import Data.Maybe (isJust) import Data.Monoid (Monoid) import System.Timeout (timeout) main :: IO () main = hspec spec delayedSource :: [(Int, a)] -> Source (ResourceT IO) a delayedSource [] = return () delayedSource ((delay, item):rest) = do liftIO $ threadDelay delay yield item delayedSource rest periodicSource :: Int -> [a] -> Source (ResourceT IO) a periodicSource interval items = delayedSource $ zip (repeat interval) items terminationDetector :: IO (TVar Bool, (ConduitM i o (ResourceT IO) r -> ConduitM i o (ResourceT IO) r)) terminationDetector = do terminated <- newTVarIO False return (terminated, addCleanup (\completed -> if not completed then liftIO $ atomically $ writeTVar terminated True else return () ) ) attachResource :: Source (ResourceT IO) a -> IO (TVar Bool, Source (ResourceT IO) a) attachResource src = do released <- newTVarIO False let src' = do void $ register $ atomically $ writeTVar released True src return (released, src') debSum :: Int -> Source (ResourceT IO) Int -> Source (ResourceT IO) Int debSum delay = F.debounce F.Args { F.init = 0, F.fold = (+), F.cb = undefined } F.def { F.delay = delay } debMonoid :: Monoid i => Int -> Source (ResourceT IO) i -> Source (ResourceT IO) i debMonoid delay = F.debounce F.forMonoid F.def { F.delay = delay } shouldSatisfyEventually :: Int -> TVar a -> (a -> Bool) -> IO () shouldSatisfyEventually timeout_usec got predicate = do eventually_got <- timeout timeout_usec $ atomically $ doCheck fmap predicate eventually_got `shouldBe` Just True where doCheck = do ret <- readTVar got if predicate ret then return ret else retry shouldSatisfyEventually' :: TVar a -> (a -> Bool) -> IO () shouldSatisfyEventually' = shouldSatisfyEventually 20000000 spec :: Spec spec = do describe "debounce" $ do it "should fold inputs" $ do ret <- runResourceT $ debSum 500000 (periodicSource 10000 [1..10]) $$ CL.consume ret `shouldBe` [sum [1..10]] it "should debounce source" $ do let s = delayedSource [(1000, "a"), (1000, "b"), (200000, "c"), (1000, "d"), (1000, "e"), (200000, "f")] ret <- runResourceT $ debMonoid 100000 s $$ CL.consume ret `shouldBe` ["ab", "cde", "f"] it "should terminate debounced Source immediately if the original Source terminates immediately" $ do ret <- timeout 50000000 $ runResourceT $ debMonoid 60000000 (CL.sourceList ["A", "B", "C", "D", "E"]) $$ CL.consume ret `shouldBe` Just ["ABCDE"] it "should terminate the Sink for the original Source if the Sink for the debounced Source terminates" $ do (terminated, detector) <- terminationDetector let orig_source = detector $ periodicSource 10000 (repeat "a") ret <- runResourceT $ debMonoid 50000 orig_source $$ CL.take 4 length ret `shouldBe` 4 forM_ ret (`shouldContain` "a") terminated `shouldSatisfyEventually'` (== True) it "should terminate the debounced Source gracefully if the original Source throws exception" $ do -- For now, the exception in the original Source is not handled, -- i.e., we just let it terminate the thread. So we'll see the -- error message while running the test. let s = (periodicSource 1000 ["a", "b"]) >> error "Exception in origSource" >> (periodicSource 1000 ["c", "d"]) ret <- runResourceT $ debMonoid 100000 s $$ CL.consume ret `shouldBe` ["ab"] it "should terminated the original Source gracefully if the downstream Sink throws exception" $ do (terminated, detector) <- terminationDetector let orig_source = detector $ periodicSource 10000 (repeat "a") ret_sink = do taken <- CL.take 4 _ <- error "Exception in retSink" return taken runResourceT (debMonoid 50000 orig_source $$ ret_sink) `shouldThrow` errorCall "Exception in retSink" terminated `shouldSatisfyEventually'` (== True) it "should connect the original Source in another thread" $ do this_thread <- myThreadId orig_thread_t <- newTVarIO Nothing let orig_source = do liftIO $ atomically . writeTVar orig_thread_t . Just =<< myThreadId yield 10 return () ret <- runResourceT $ debSum 10000 orig_source $$ CL.consume ret `shouldBe` [10] orig_thread <- atomically (readTVar orig_thread_t) orig_thread `shouldSatisfy` isJust orig_thread `shouldSatisfy` (/= Just this_thread) it "should release the resource in the original Source when the original Source finishes" $ do (released, orig_source) <- attachResource $ periodicSource 10000 [1,2,3,4] ret <- runResourceT $ debSum 500000 orig_source $$ CL.consume ret `shouldBe` [10] released `shouldSatisfyEventually'` (== True) it "should release the resource in the original Source when the downstream Sink throws exception" $ do (released, orig_source) <- attachResource $ periodicSource 10000 $ repeat "a" let sink = do taken <- CL.take 4 _ <- error "Exception in downstream" return taken runResourceT (debMonoid 500000 orig_source $$ sink) `shouldThrow` errorCall "Exception in downstream" released `shouldSatisfyEventually'` (== True) it "should release the resource in the original Source when the original Source throws exception" $ do -- Because the error is not handled, we'll see the error message -- while running the test. (See above for the case "original -- Source throwing exception") (released, orig_source) <- attachResource (periodicSource 10000 ["a", "b"] >> error "Exception in source") ret <- runResourceT $ debMonoid 500000 orig_source $$ CL.consume ret `shouldBe` ["ab"] released `shouldSatisfyEventually'` (== True)