I’ve used Sente before but I haven’t used remit. I believe that Sente puts all messages onto the same channel. The consumption of the messages from that channel is up to you, and it’s what determines how they will be distributed to other threads.
Parallelization is a big topic, but there are some patterns you can use:
Imagine Sente’s message channel is called
The go-block dispatcher
(let [msg (<! messages)] ;; take off one message
(go (handle-message msg)))
In this example, each message is dispatched to its own go block. Go blocks run in parallel, so that will add a lot of parallelism to your code. Just be sure whatever
handle-message does is appropriate for a go block. (see http://www.lispcast.com/willy-wonka-core-async for more info about that).
(dotimes [x 10]
(let [msg (<!! messages)]
(catch Throwable t
This pattern creates 10 threads, each handling messages from the same channel. Java can handle a lot of threads, but remember it’s still finite. You could probably increase this a lot and still be fine, depending on what
handle-message does (lots of IO vs lots of computation). You’re going to need to wrap it in a try/catch so that exceptions don’t crash the thread.
(def blocked-messages (chan 100))
(let [msg (<! messages)]
( (respond-unavailable msg))
;; put msg onto blocked-messages
;; do nothing
Backpressure is super important in a distributed or parallel system. If you’ve got clients sending you messages and they’re coming in too fast, the best thing to do is to respond “I’m busy!!!”. It’s better to quickly respond that you’re busy while you continue to process existing messages than to queue it up faster than you can process them and overload your system. Errors will start to cascade and no messages will get through.
This pattern uses two things: a blocking channel (returned by default from
alt! with a timeout to attempt putting to the blocked channel for 100ms. If you can’t put within 100ms, you should reject the message and respond that the client needs to back off for a while because we’ve already got 100 queued messages and they’re not being processed fast enough (100ms). Of course, these numbers should be tuned for your system. And you’ll need something consuming messages from
Alternatively, you could use a dropping buffer (
(chan (dropping-buffer 100))) and check the return value of
>!. It will be truthy if it actually put. But this does not let you do it with a timeout, which may work for your system and is simpler (only the buffer size to tweak).
You’re likely going to do some hybrid of these approaches. Use multiple threads to handle messages in parallel, use go blocks judiciously for small coordination work, and build strong, protective backpressure into the system.