polysemy-conc-0.2.0.0: Polysemy Effects for Concurrency
Safe HaskellNone
LanguageHaskell2010

Polysemy.Conc

Description

 
Synopsis

Introduction

This library provides an assortment of tools for concurrency-related tasks:

Queues

data Queue d :: Effect Source #

Abstracts queues like TBQueue.

For documentation on the constructors, see the module Polysemy.Conc.Data.Queue.

import Polysemy.Conc (Queue, QueueResult)
import Polysemy.Conc.Data.Queue as Queue

prog :: Member (Queue Int) r => Sem r (QueueResult Int)
prog = do
  Queue.write 5
  Queue.write 10
  Queue.read >>= \case
    QueueResult.Success i -> fmap (i +) <$> Queue.read
    r -> pure r

Instances

Instances details
type DefiningModule Queue Source # 
Instance details

Defined in Polysemy.Conc.Data.Queue

type DefiningModule Queue = "Polysemy.Conc.Data.Queue"

data QueueResult d Source #

Encodes failure reasons for queues.

For documentation on the constructors, see the module Polysemy.Conc.Data.QueueResult.

import qualified Polysemy.Conc.Data.QueueResult as QueueResult

Instances

Instances details
Functor QueueResult Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Methods

fmap :: (a -> b) -> QueueResult a -> QueueResult b #

(<$) :: a -> QueueResult b -> QueueResult a #

Eq d => Eq (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Ord d => Ord (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Show d => Show (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Generic (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Associated Types

type Rep (QueueResult d) :: Type -> Type #

Methods

from :: QueueResult d -> Rep (QueueResult d) x #

to :: Rep (QueueResult d) x -> QueueResult d #

Semigroup d => Semigroup (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Monoid d => Monoid (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) = D1 ('MetaData "QueueResult" "Polysemy.Conc.Data.QueueResult" "polysemy-conc-0.2.0.0-inplace" 'False) (C1 ('MetaCons "Success" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 d)) :+: (C1 ('MetaCons "NotAvailable" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Closed" 'PrefixI 'False) (U1 :: Type -> Type)))

resultToMaybe :: QueueResult d -> Maybe d Source #

Turn a Success into Just.

Interpreters

interpretQueueTBM Source #

Arguments

:: forall d r. Members [Resource, Race, Embed IO] r 
=> Int

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBMQueue.

interpretQueueTB Source #

Arguments

:: forall d r. Members [Race, Embed IO] r 
=> Natural

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBQueue.

interpretQueueListReadOnlyAtomic :: forall d r. Member (Embed IO) r => [d] -> InterpreterFor (Queue d) r Source #

Variant of interpretQueueListReadOnlyAtomicWith that interprets the AtomicState.

interpretQueueListReadOnlyAtomicWith :: forall d r. Member (AtomicState [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as AtomicState with a list that cannot be written to. Useful for testing.

interpretQueueListReadOnlyState :: forall d r. Member (Embed IO) r => [d] -> InterpreterFor (Queue d) r Source #

Variant of interpretQueueListReadOnlyAtomicWith that interprets the State.

interpretQueueListReadOnlyStateWith :: forall d r. Member (State [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as State with a list that cannot be written to. Useful for testing.

Combinators

loop :: Member (Queue d) r => (d -> Sem r ()) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse.

loopOr :: Member (Queue d) r => Sem r Bool -> (d -> Sem r Bool) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse if it returns True. When no element is available, evaluate na and recurse if it returns True.

MVars

An MVar is abstracted as Sync since it can be used to synchronize threads.

data Sync d :: Effect Source #

Abstracts an MVar.

For documentation on the constructors, see the module Polysemy.Conc.Effect.Sync.

import Polysemy.Conc (Sync)
import qualified Polysemy.Conc.Effect.Sync as Sync

prog :: Member (Sync Int) r => Sem r Int
prog = do
  Sync.putTry 5
  Sync.takeBlock

Instances

Instances details
type DefiningModule Sync Source # 
Instance details

Defined in Polysemy.Conc.Effect.Sync

type DefiningModule Sync = "Polysemy.Conc.Effect.Sync"

type ScopedSync res a = Scoped (SyncResources res) (Sync a) Source #

Interpreters

interpretSync :: forall d r. Members [Race, Embed IO] r => InterpreterFor (Sync d) r Source #

Interpret Sync with an empty MVar.

interpretSyncAs :: forall d r. Members [Race, Embed IO] r => d -> InterpreterFor (Sync d) r Source #

Interpret Sync with an MVar containing the specified value.

withSync :: forall d res r. Member (Scoped (SyncResources res) (Sync d)) r => InterpreterFor (Sync d) r Source #

Run an action with a locally scoped Sync variable.

interpretScopedSync :: forall d r. Members [Resource, Race, Embed IO] r => InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r Source #

Interpret Sync for locally scoped use with an empty MVar.

interpretScopedSyncAs :: forall d r. Members [Resource, Race, Embed IO] r => d -> InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r Source #

Interpret Sync for locally scoped use with an MVar containing the specified value.

Racing

Racing works like this:

prog =
 Polysemy.Conc.race (httpRequest "hackage.haskell.org") (readFile "/path/to/file") >>= \case
   Left _ -> putStrLn "hackage was faster"
   Right _ -> putStrLn "file was faster"

When the first thunk finishes, the other will be killed.

data Race :: Effect Source #

Abstract the concept of running two programs concurrently, aborting the other when one terminates. Timeout is a simpler variant, where one thread just sleeps for a given interval.

Instances

Instances details
type DefiningModule Race Source # 
Instance details

Defined in Polysemy.Conc.Data.Race

type DefiningModule Race = "Polysemy.Conc.Data.Race"

race :: forall a b r. Member Race r => Sem r a -> Sem r b -> Sem r (Either a b) Source #

Run both programs concurrently, returning the result of the faster one.

race_ :: Member Race r => Sem r a -> Sem r a -> Sem r a Source #

Specialization of race for the case where both thunks return the same type, obviating the need for Either.

timeout :: forall a b u r. TimeUnit u => Member Race r => a -> u -> Sem r b -> Sem r (Either a b) Source #

Return the fallback value if the given program doesn't finish within the specified interval.

timeout_ :: TimeUnit u => Member Race r => a -> u -> Sem r a -> Sem r a Source #

Specialization of timeout for the case where the thunk return the same type as the fallback, obviating the need for Either.

timeoutU :: TimeUnit u => Member Race r => u -> Sem r () -> Sem r () Source #

Specialization of timeout for unit actions.

retrying Source #

Arguments

:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe a) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

retryingWithError Source #

Arguments

:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d, Embed IO] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe (Either e a)) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

If the action failed at least once, the last error will be returned in case of timeout.

Interpreters

interpretRace :: Member (Final IO) r => InterpreterFor Race r Source #

Interpret Race in terms of race and timeout. Since this has to pass higher-order thunks as IO arguments, it is interpreted in terms of 'Final IO'.

Signal Handling

data Interrupt :: Effect Source #

The interrupt handler effect allows three kinds of interaction for interrupt signals:

  • Execute a callback when a signal is received
  • Block a thread until a signal is received
  • Kill a thread when a signal is received

For documentation on the constructors, see the module Polysemy.Conc.Data.Interrupt.

import qualified Polysemy.Conc.Data.Interrupt as Interrupt

prog = do
  Interrupt.register "task 1" (putStrLn "interrupted")
  Interrupt.killOnQuit $ forever do
   doSomeWork

Instances

Instances details
type DefiningModule Interrupt Source # 
Instance details

Defined in Polysemy.Conc.Data.Interrupt

type DefiningModule Interrupt = "Polysemy.Conc.Data.Interrupt"

Interpreters

interpretInterrupt :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r Source #

Interpret Interrupt by installing a signal handler.

Event Channels

data Events (token :: Type) (e :: Type) :: Effect Source #

An event publisher that can be consumed from multiple threads.

Instances

Instances details
type DefiningModule Events Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

type DefiningModule Events = "Polysemy.Conc.Effect.Events"

publish :: forall token e r. MemberWithError (Events token e) r => e -> Sem r () Source #

consume :: forall e r. MemberWithError (Consume e) r => Sem r e Source #

subscribe :: forall e token r. Member (Scoped (EventToken token) (Consume e)) r => InterpreterFor (Consume e) r Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream. To be used with interpretEventsChan.

subscribeWhile :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback. Stop when the action returns False.

subscribeLoop :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback.

data EventToken token Source #

Marker for the Scoped token for Events.

Instances

Instances details
Eq token => Eq (EventToken token) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

Methods

(==) :: EventToken token -> EventToken token -> Bool #

(/=) :: EventToken token -> EventToken token -> Bool #

Show token => Show (EventToken token) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

Methods

showsPrec :: Int -> EventToken token -> ShowS #

show :: EventToken token -> String #

showList :: [EventToken token] -> ShowS #

Generic (EventToken token) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

Associated Types

type Rep (EventToken token) :: Type -> Type #

Methods

from :: EventToken token -> Rep (EventToken token) x #

to :: Rep (EventToken token) x -> EventToken token #

type Rep (EventToken token) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

type Rep (EventToken token) = D1 ('MetaData "EventToken" "Polysemy.Conc.Effect.Events" "polysemy-conc-0.2.0.0-inplace" 'True) (C1 ('MetaCons "EventToken" 'PrefixI 'True) (S1 ('MetaSel ('Just "unEventToken") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 token)))

type EventChan e = EventToken (OutChan e) Source #

Convenience alias for the default EventToken that uses an OutChan.

type ChanEvents e = Events (OutChan e) e Source #

Convenience alias for the default Events that uses an OutChan.

type EventConsumer token e = Scoped (EventToken token) (Consume e) Source #

Convenience alias for the consumer effect.

type ChanConsumer e = Scoped (EventChan e) (Consume e) Source #

Convenience alias for the consumer effect using the default implementation.

Interpreters

interpretEventsChan :: forall e r. Members [Resource, Race, Async, Embed IO] r => InterpretersFor [Events (OutChan e) e, ChanConsumer e] r Source #

Interpret Events and Consume together by connecting them to the two ends of an unagi channel. Consume is only interpreted in a Scoped manner, ensuring that a new duplicate of the channel is created so that all consumers see all events (from the moment they are connected).

This should be used in conjunction with subscribe:

interpretEventsChan do
  async $ subscribe do
    putStrLn =<< consume
  publish "hello"

Whenever subscribe creates a new scope, this interpreter calls dupChan and passes the duplicate to interpretConsumeChan.

Exceptions

data Critical :: Effect Source #

An effect that catches exceptions.

Provides the exact functionality of fromExceptionSem, but pushes the dependency on Final IO to the interpreter, and makes it optional.

Instances

Instances details
type DefiningModule Critical Source # 
Instance details

Defined in Polysemy.Conc.Data.Critical

type DefiningModule Critical = "Polysemy.Conc.Data.Critical"

Interpreters

interpretCritical :: Member (Final IO) r => InterpreterFor Critical r Source #

Interpret Critical in terms of Final IO.

interpretCriticalNull :: InterpreterFor Critical r Source #

Interpret Critical by doing nothing.

Other Combinators

interpretAtomic :: forall a r. Member (Embed IO) r => a -> InterpreterFor (AtomicState a) r Source #

Convenience wrapper around runAtomicStateTVar that creates a new TVar.

withAsyncBlock :: Members [Resource, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.

When cancelling, this variant will wait indefinitely for the thread to be gone.

withAsync :: Members [Resource, Race, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.

When cancelling, this variant will wait for 500ms for the thread to be gone.

withAsync_ :: Members [Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Discards the handle, expecting the async action to either terminate or be cancelled.

When cancelling, this variant will wait for 500ms for the thread to be gone.