threaded: Manage concurrently operating threads without having to spark them

[ bsd3, concurrent, library ] [ Propose Tags ]

Please see the README on GitHub at https://github.com/athanclark/threaded#readme


[Skip to Readme]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

  • No Candidates
Versions [RSS] 0.0.0
Change log ChangeLog.md
Dependencies async, base (>=4.7 && <5), chan, extractable-singleton, hashable, monad-control-aligned, mtl, stm, tmapmvar [details]
License BSD-3-Clause
Copyright 2020 Athan Clark
Author Athan Clark
Maintainer athan.clark@gmail.com
Category Concurrent
Home page https://github.com/athanclark/threaded#readme
Bug tracker https://github.com/athanclark/threaded/issues
Source repo head: git clone https://github.com/athanclark/threaded
Uploaded by athanclark at 2020-01-13T01:45:30Z
Distributions
Reverse Dependencies 1 direct, 0 indirect [details]
Downloads 521 total (10 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs available [build log]
Last success reported on 2020-01-13 [all 1 reports]

Readme for threaded-0.0.0

[back to package description]

threaded

Aims to make managed, horizontal scaling easier - given some process that reads from a concurrent channel, writes to a concurrent channel, and returns when it's finished, then it should be horizontally scalable with respect to some thread identifier:

main :: IO ()
main = do
  let mult inputs outputs = do
        -- get first input
        x <- atomically (readTChanRW inputs)
        -- get second input
        y <- atomically (readTChanRW inputs)
        let o :: Integer
            o = (x :: Integer) * (y :: Integer)

        -- write output
        atomically (writeTChanRW outputs o)
        -- return

  -- incoming messages for specific threads
  incoming <- writeOnly <$> atomically newTChanRW

  (mainThread, outgoing) <- threaded incoming mult

  echoingThread <- async $ forever $ do
    -- do something with each thread's output
    (k,o) <- atomically (readTChanRW outgoing)
    putStrLn $ show k ++ ": " ++ show o

  atomically $ writeTChanRW incoming ("one",1)
  atomically $ writeTChanRW incoming ("two",2)
  atomically $ writeTChanRW incoming ("three",3)
  atomically $ writeTChanRW incoming ("one",1)
  atomically $ writeTChanRW incoming ("two",2)
  atomically $ writeTChanRW incoming ("three",3)

  threadDelay 1000000

  cancel echoingThread
  cancel mainThread

If the thread's identifier doesn't exist when sending an input, then the threaded manager will spark a new one. If it does exist, then it just plumbs it to its input channel. Once the process returns, the thread with that identifier is killed and garbage collected.