```{-# LANGUAGE RankNTypes #-}
( JoinResult (..)
, joinSources
, joinResumableSources
) where

import Data.Conduit        (ConduitT, SealedConduitT, await, leftover, sealConduitT, yield, (\$\$++))

{-| A result value of joining two sources.

When sources are joined, the result value can be a value or
it be a leftover on either left or right side in case if one of the
streams is ehausted before another.
-}
data JoinResult a v b
= LeftoverL a     -- ^ Leftover on the left side, the right side is exhausted
| JoinValue v     -- ^ Result value
| LeftoverR b     -- ^ Leftover on the right side, the left side is exhausted
deriving (Show, Eq)

{-| Joins sources with the provided merging function.
Leftovers are considered valid values and are retuned as a part of a result stream.

@
import Data.Conduit
import Data.Conduit.List as CL

\-\- combining function just sums both values
comb :: (Ord a, Num a) => a -> a -> ([a], [a], [a])
comb a b
| a > b     = ([a - b], [b], [])
| b > a     = ([], [a], [b - a])
| otherwise = ([], [a], [])

let lst1 = CL.sourceList [1,2,3]
let lst2 = CL.sourceList [1,2,3,4,5]
runConduit \$ joinSources comb lst1 lst2 \$\$ CL.take 1000

['JoinValue' 2,'JoinValue' 4,'JoinValue' 6,'LeftoverR' 4,'LeftoverR' 5]
@
-}
=> (a -> b -> ([a], [v], [b]))
-- ^ Function to merge values.
--   The result contains values @v@ and possible leftovers @a@ and @b@
--   for left and right streams.
-> ConduitT () a m ()
-- ^ Left side source
-> ConduitT () b m ()
-- ^ Right side source
-> ConduitT () (JoinResult a v b) m ()
-- ^ Result source that can contain a value or leftovers on each side
joinSources f as bs = joinResumableSources f (sealConduitT as) (sealConduitT bs)

=> (a -> b -> ([a], [v], [b]))
-- ^ Function to merge values.
--   The result contains values @v@ and possible leftovers @a@ and @b@
--   for left and right streams.
-> SealedConduitT () a m ()
-- ^ Left side source
-> SealedConduitT () b m ()
-- ^ Right side source
-> ConduitT () (JoinResult a v b) m ()
-- ^ Result source that can contain a value or leftovers on each side
joinResumableSources f = go
where
go ras rbs = do
(ras', ma) <- lift \$ ras \$\$++ await
(rbs', mb) <- lift \$ rbs \$\$++ await
case (ma, mb) of
(Nothing, Nothing) -> pure ()
(Nothing, Just b)  -> yield (LeftoverR b) >> go ras' rbs'
(Just a,  Nothing) -> yield (LeftoverL a) >> go ras' rbs'
(Just a,  Just b)  -> do
let (ls, vs, rs) = f a b
mapM_ (yield . JoinValue) vs
ras'' <- lift \$ pushLeftovers ras' ls
rbs'' <- lift \$ pushLeftovers rbs' rs
go ras'' rbs''
pushLeftovers s ls = foldM (\vs' l -> fst <\$> (vs' \$\$++ leftover l)) s (reverse ls)

--         => Source m a
--         -> Source m b
--         -> Source m (Maybe a, Maybe b)
-- fullZip (ConduitM left0) (ConduitM right0) = ConduitM \$ \rest -> let
--     go (Leftover left ()) right = go left right
--     go left (Leftover right ())  = go left right
--     go (Done ()) (Done ()) = rest ()
--     go (Done ()) (HaveOutput src close y) = HaveOutput (go (Done ()) src) close (Nothing, Just y)
--     go (HaveOutput src close x) (Done ()) = HaveOutput (go src (Done ())) close (Just x, Nothing)
--     go (Done ()) (PipeM _) = rest ()
--     go (PipeM _) (Done ()) = rest ()
--     go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
--     go (PipeM mx) y@HaveOutput{} = PipeM ((`go` y) <\$> mx)
--     go x@HaveOutput{} (PipeM my) = PipeM (go x <\$>  my)
--     go (HaveOutput srcx closex x) (HaveOutput srcy closey y) =
--       HaveOutput (go srcx srcy) (closex >> closey) (Just x, Just y)
--     go (NeedInput _ c) right = go (c ()) right
--     go left (NeedInput _ c) = go left (c ())
--     in go (left0 Done) (right0 Done)
```