Core.async pub/sub messaging in Clojure web application


#1

I’m thinking of using core.async publish / subscribe on the server side as a simple message bus to, well, send messages around. There’s a threading question that I can’t seem to wrap my head around and would ask for your help.

My setup is a ClojureScript / Clojure application that uses Sente and Http-Kit.
On the client I’m using a slighty adapted version of the event system https://github.com/pointslope/remit, which simple let’s you emit message and subscribe to them. I would like to do the same on the server side.

Now here’s my question:

Suppose a message comes in through Sente I would suppose it arrives in it’s own thread, or continuation, or whatever http-kit uses to parallelize incoming requests.

So I do my work, maybe fetch a record from the database and then emit an event to process that record further. Now imagine a couple users do the same at the same time, emitting the same event.

Would that mean, that in the end, all emmitted messages of the same type, but from different users, would end up in the same channel, effectively being queued and worked on one after the other, when if I had not used a message bus they would continue to be processed in parallel?

Asked the other way around- what do I have to do to ensure messages continue to be processed in parallel and do not end up in a bottle neck?

Thanks,
Torsten.


#2

Hi @tuhlmann,

Great question.

I’ve used Sente before but I haven’t used remit. I believe that Sente puts all messages onto the same channel. The consumption of the messages from that channel is up to you, and it’s what determines how they will be distributed to other threads.

Parallelization is a big topic, but there are some patterns you can use:

Imagine Sente’s message channel is called messages.

The go-block dispatcher

(go
  (loop []
    (let [msg (<! messages)] ;; take off one message
      (go (handle-message msg)))
    (recur)))

In this example, each message is dispatched to its own go block. Go blocks run in parallel, so that will add a lot of parallelism to your code. Just be sure whatever handle-message does is appropriate for a go block. (see http://www.lispcast.com/willy-wonka-core-async for more info about that).

Thread pool

(dotimes [x 10]
  (clojure.core.async/thread
    (loop []
      (try
        (let [msg (<!! messages)]
          (handle-message msg))
        (catch Throwable t
          (handle-error t)))
      (recur))))

This pattern creates 10 threads, each handling messages from the same channel. Java can handle a lot of threads, but remember it’s still finite. You could probably increase this a lot and still be fine, depending on what handle-message does (lots of IO vs lots of computation). You’re going to need to wrap it in a try/catch so that exceptions don’t crash the thread.

Backpressure

(def blocked-messages (chan 100))

(go
  (loop []
    (let [msg (<! messages)]
      (alt!
        (timeout 100)
        ([] (respond-unavailable msg))
        ;; put msg onto blocked-messages
        [blocked-messages msg]
        ;; do nothing
        ([] nil)))
    (recur)))

Backpressure is super important in a distributed or parallel system. If you’ve got clients sending you messages and they’re coming in too fast, the best thing to do is to respond “I’m busy!!!”. It’s better to quickly respond that you’re busy while you continue to process existing messages than to queue it up faster than you can process them and overload your system. Errors will start to cascade and no messages will get through.

This pattern uses two things: a blocking channel (returned by default from chan) and alt! with a timeout to attempt putting to the blocked channel for 100ms. If you can’t put within 100ms, you should reject the message and respond that the client needs to back off for a while because we’ve already got 100 queued messages and they’re not being processed fast enough (100ms). Of course, these numbers should be tuned for your system. And you’ll need something consuming messages from blocked-messages.

Alternatively, you could use a dropping buffer ((chan (dropping-buffer 100))) and check the return value of >!. It will be truthy if it actually put. But this does not let you do it with a timeout, which may work for your system and is simpler (only the buffer size to tweak).

Hybrid

You’re likely going to do some hybrid of these approaches. Use multiple threads to handle messages in parallel, use go blocks judiciously for small coordination work, and build strong, protective backpressure into the system.

Rock on!
Eric