Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Synopsis
- data Actor message
- spawnStatefulIndividual :: state -> (state -> IO ()) -> (state -> message -> IO state) -> IO (Actor message)
- spawnStatefulBatched :: state -> (state -> IO ()) -> (state -> NonEmpty message -> IO state) -> IO (Actor message)
- spawnStatelessIndividual :: IO () -> (message -> IO ()) -> IO (Actor message)
- spawnStatelessBatched :: IO () -> (NonEmpty message -> IO ()) -> IO (Actor message)
- tell :: Actor message -> message -> IO ()
- kill :: Actor message -> IO ()
- wait :: Actor message -> IO ()
- firstAvailableOneOf :: [Actor message] -> Actor message
- byKeyHashOneOf :: (message -> Int) -> [Actor message] -> Actor message
- allOf :: [Actor message] -> Actor message
Documentation
Controls of an actor, which processes the messages of type message
.
The processing runs on a dedicated green thread.
Provides abstraction over the message channel, thread-forking and killing.
Monoid instance is not provided for the same reason it is not provided for numbers.
This type supports both sum and product composition. See allOf
, firstAvailableOneOf
and byKeyHashOneOf
.
Acquisition
spawnStatefulIndividual Source #
:: state | Initial state. |
-> (state -> IO ()) | Clean up when killed or exception is thrown. |
-> (state -> message -> IO state) | Process a message and update state. |
-> IO (Actor message) | Fork a thread to run the handler loop on and produce a handle to control it. |
Spawn an actor which processes messages in isolated executions and threads state.
:: state | Initial state. |
-> (state -> IO ()) | Clean up when killed or exception is thrown. |
-> (state -> NonEmpty message -> IO state) | Process a batch of messages and update state. |
-> IO (Actor message) | Fork a thread to run the handler loop on and produce a handle to control it. |
Spawn an actor which processes all available messages in one execution and threads state.
spawnStatelessIndividual Source #
:: IO () | Clean up when killed or exception is thrown. |
-> (message -> IO ()) | Interpret a message. |
-> IO (Actor message) | Fork a thread to run the handler loop on and produce a handle to control it. |
Spawn an actor which processes messages in isolated executions.
spawnStatelessBatched Source #
:: IO () | Clean up when killed or exception is thrown. |
-> (NonEmpty message -> IO ()) | Interpret a batch of messages. |
-> IO (Actor message) | Fork a thread to run the handler loop on and produce a handle to control it. |
Spawn an actor which processes all available messages in one execution.
Control
tell :: Actor message -> message -> IO () Source #
Add a message to the end of the queue of the messages to be processed by the provided actor.
wait :: Actor message -> IO () Source #
Block waiting for the actor to die either due to getting killed or due to its interpreter action throwing an exception. The exception will get rethrown here.
Composition
firstAvailableOneOf :: [Actor message] -> Actor message Source #
Distribute the message stream across actors. The message gets delivered to the first available one.
E.g., using this combinator in combination with replicateM
you can construct pools:
spawnPool :: Int -> IO (Actor message) -> IO (Actor message) spawnPool size spawn = firstAvailableOneOf <$> replicateM size spawn
You can consider this being an interface to the Sum monoid.
:: (message -> Int) | Function extracting the key from the message and hashing it. |
-> [Actor message] | Pool of actors. |
-> Actor message |
Dispatch the message across actors based on a key hash.
This lets you ensure of a property that messages with the same key will arrive to the same actor, letting you maintain a local associated state in the actors.
The implementation applies a modulo equal to the amount of actors to the hash and thus determines the index of the actor to dispatch the message to. This is inspired by how partitioning is done in Kafka.