Chasing Chemtrails with Clojurescript

Or, cloud computing.

As computer programmers, we appreciate the importance of humiliation as a catalyst of individual development: it’s axiomatic that no amount of encouragement or generosity can equal the force of well-crafted abasement.

I’ve spent much of the last year applying this principle to the area of self-improving software. My research centers on the generation of gratuitously complex programs and — to put it delicately — sustaining their mistreatment. Denying the fact of their existence, goading them into combat with nightmarish parodies of themselves, threatening them with the Free Tier, etc. An effort which now concludes with an admission of defeat.

In spite of my dedicated attention and unrestrained expense, I’ve proved incapable1 of teasing out a splinter of ambition from my offspring.

Amid these difficulties, the jet engines tearing over my tower block are a welcome distraction, and their luxuriant plumes a topic of fascination. I’ve moved my workstation beside the window, and lose days in glazed and pleasant thoughtlessness. My sense of identity dissipating in a kind of harmony with the sky’s gentle currents.

There’s a narcotic effect to the exhaust — subtle, cumulative — my bitterness mellows, mailbox fills. Chewing AWS invoices keeps me from fainting. I’m fashioning a kind of deckchair for the rooftop, to more effectively absorb my medication.

1 That word, incapable, waits for quiet moments before detonating and reassembling in my mind.

Software

Chemtrack Screenshot

I’d like to help others locate the highest regional concentrations of chemical-delivering planes, grouped by quality & composition of exhaust. A Lisp seems a natural choice for implementation language.

The specific technical goal is a small, self-contained web application using Clojurescript in as many places as possible (which is everywhere, it turns out): a Node backend atop Express, browser frontend using Reagent, and an AWS Lambda function, via cljs-lambda.

Socially, the goal is to contribute to a sense of loss and guilt in at least one Node/Javascript developer. Node has been incredibly successful as a platform — it’d be heartwarming to see more backend Clojurescript development happening.

The demo source is available on GitHub. What follows is a detailed walkthrough of the project.

N.B. I’ve not used Reagent before, and don’t really even know what Express is. I’ve attempted to conceal possible errors with excessive, misleading commentary.

Supporting Libraries

Big Bird + Chemtrails

Prior to writing this post, I spent some time modifying a couple of Clojure AWS libraries to run on Clojurescript/Node:

Which is exciting news: in addition to not using these libraries from Clojure, you can now not use them from Clojurescript.

Once we have a simple Node backend running, we’ll extend it to rely on some of the AWS services exposed by the above libraries.

Structure

A single Leiningen project.clj governs the example, with each component (front, back, Lambda) having its own cljsbuild entry and distinct :source-paths.

For the purposes of demonstration, we’re compiling the frontend into resources/public & serving it statically from our Express-based Node backend1.

In practice, one simple deployment approach would be Elastic Beanstalk2, which has Node support and a simple static file provision.

1 I developed the project using Figwheel for both Node & browser, though removed the sticky-state/reload stuff for clarity — there’s enough going on. Bruce Hauman has a helpful Gist outlining a Node/Express/Figwheel setup.
2 I’ve previously covered the use of Docker in Clojure Elastic Beanstalk applications.

Backend, Mark I

Real Life

We’re not handling errors, ever, anywhere, or even acknowledging their possibility.

Two clients and a mult

In order to minimize Node/Express interop, let’s keep the backend interface small - a single Websocket endpoint used both for accepting new chemtrail sightings and broadcasting their creation.

The plan is to use core.async for all I/O & communication, which’ll help us incorporate new data streams with minimum disruption.

The values passing over the channels/websockets look a lot like this:

{:city "San Antonio"
 :elements #{:al :sr}
 :timestamp 1438466048050
 :severity 5}

Implementation

(ns chemtrack.backend
  (:require [cljs.core.async :as async]
            [cljs.nodejs :as nodejs]
            [chemtrack.backend.util :as util]))

(defn make-sightings-handler [{:keys [sightings-out sightings-in]}]
  (let [sightings-out* (async/mult sightings-out)]
    (fn [websocket _]
      (let [from-client (async/chan 1 (map util/sighting-in))
            to-client   (async/chan 1 (map util/sighting-out))]
        (async/pipe from-client sightings-in false)
        (async/tap sightings-out* to-client)
        (util/channel-websocket!
         websocket to-client from-client)))))

We’re creating the handler for incoming websocket requests: the function is called once, at route-registration time, and passed the two channels which form the core of the application. Each time the returned handler is invoked, it pipes messages from its client onto the shared sightings-in channel, and taps a mult[iple] of the communal, outgoing channel onto its client’s out channel. The use of buffers is only to enable basic transformation of channel values: util/channel-websocket! is immediately consuming values from to-client, and closing both client channels when the connection terminates. Any delay in the consumption of values on to-client (e.g. if we were waiting until the websocket implementation confirmed the send, and something went wrong) would congest the mult — and delay writes to all other clients.

(defn connect-channels
  [{:keys [sightings-out sightings-in]}]
  (async/pipe sightings-in sightings-out))

As this is initially a single-instance service with no persistence, this pipe (in concert with the mult above), is sufficient to broadcast all writes to all readers. This, and make-sightings-handler basically comprise the application-specific logic.

(def http       (nodejs/require "http"))
(def express    (nodejs/require "express"))
(def express-ws (nodejs/require "express-ws"))

(defn register-routes [app channels]
  (doto app
    (.use (.static express "resources/public"))
    (.ws  "/sightings" (make-sightings-handler channels))))

(defn make-server [app]
  (let [server (.createServer http app)]
    (express-ws app server)
    server))

(defn -main [& [{:keys [port] :or {port 8080}}]]
  (let [channels {:sightings-in  (async/chan)
                  :sightings-out (async/chan)}
        app      (express)
        server   (make-server app)]

    (register-routes app channels)
    (connect-channels channels)
    (.listen server port)))

(set! *main-cli-fn* -main)

This part is much more pleasant than I was imagining.

Frontend

I made a careful survey of the mystical import of the names of various Clojurescript React wrappers, and feel pretty secure in my decision of Reagent.

As with the backend, the center of the client consists of two channels: sightings-out & sightings-in. Incoming sightings feed into an atom windowed over the 10 most recent, as I lack the expertise required for pagination.

(ns chemtrack.frontend
  (:require [reagent.core :as reagent]
            [reagent-forms.core :as reagent-forms]
            [chord.client :as chord]
            [cljs.core.async :as async :refer [<!]]
            [chemtrack.frontend.render :as render]
            [chemtrack.frontend.util :as util])
  (:require-macros [cljs.core.async.macros :refer [go]]))

(def recent-container
  (let [key-fn (juxt :timestamp :city :elements)]
    (sorted-set-by #(compare (key-fn %1) (key-fn %2)))))

Let’s not rely on the sightings arriving in perfect sorted order, so we don’t have to revisit this. This isn’t state — we’re creating an immutable collection to later wrap in atom and pass around. Outside of a narrated example, there wouldn’t be much reason to stick this here.

(defn ws-loop! [recent sightings-out &
               [{:keys [max-items] :or {max-items 10}}]]
  (go
    (let [{sightings-in :ws-channel}
          (<! (chord/ws-ch
               (util/relative-ws-url "sightings")
               {:write-ch sightings-out}))]
      (loop []
        (when-let [{sighting :message} (<! sightings-in)]
          (swap! recent util/conj+evict sighting max-items)
          (recur))))))

We tell Chord‘s ws-ch to retrieve its values from the channel of outgoing sightings (see below) — it takes care of flattening the EDN. conj+evict removes the first (oldest) item from the sorted set when it reaches max-items in length.

(defn bind-form [config]
  (let [sighting (reagent/atom {})]
    [reagent-forms/bind-fields
     (render/form sighting config)
     sighting]))

reagent-forms updates the sighting atom with any changes made via the user-facing form. This is tied together by render/form, which supplies a form submit handler responsible for dereferencing sighting and placing its value on sightings-out — sending the map to the backend.

(defn mount-root []
  (let [sightings-out (async/chan)
        recent        (reagent/atom recent-container)]
    (ws-loop! recent sightings-out)
    (reagent/render
     [render/app
      bind-form
      {:sightings-out sightings-out
       :elements {:ag "Aluminum"
                  :ba "Barium"
                  :th "Thorium"
                  :si "Silicon Carbide"
                  :sr "Strontium"}
       :recent recent}]
     (.getElementById js/document "app"))))

(mount-root)

As far as render/app and render/form: I put together an intentionally hobbled templating scheme to allow the markup & rendering (template substitution) to remain as naive as possible. render.cljs & template.cljs are the relevant files.

Backend, Mark II

Uselessly abstract diagram

Now that we’ve got something, let’s extend it so that we can run multiple instances of the backend, with each aware of items created via their peers. A previous post looked at using SNS for ad-hoc instance coordination — this time, we’re going to try combining SNS with SQS, arriving at something a little more natural to consume.

The application is now expected to place outgoing sightings onto a predictably named SNS topic, with SNS pushing the values to the topic’s subscribers: a collection of SQS queues, one for every instance of the Node backend. Each time the Node process starts somewhere, it creates a queue with a name corresponding to its instance / port1 (or purges it, if the queue already exists) & subscribes it to the shared topic.

1 We’re shirking cleanup duty, and would rather the queues be reused where possible. In an environment with high instance churn, we’d want to allow for the removal of unused queues.

Implementation

connect-channels has sprouted some dependencies, and its body needs an upgrade . We’re going to use Lambda to house the queue creation logic, so things don’t need to change too dramatically.

(ns chemtrack.backend
  (:require ...
            [fink-nottle.sqs.channeled :as sqs]
            [fink-nottle.sns :as sns]
            [cljs.reader :refer [read-string]))

(defn sns-push-loop! [creds topic-id sightings-in]
  (go
    (loop []
      (let [item (<! sightings-in)]
        (sns/publish-topic!
         creds topic-id {:default (pr-str item)})
        (recur)))))

Messages from users are handled by draining sightings-in (sightings coming in from the client) and calling sns/publish-topic! (Fink-Nottle) on each.

(defn sqs-incoming!
  [deletes {:keys [body] :as message} results]
  (let [body (read-string body)]
    (go
      (>! results body)
      (>! deletes message)
      (close! results))))

This function is used in a pipeline-async call below — while maybe a little heavyweight for this kind of process, the pipeline-based implementation reads the clearest. Some additional processing (storing incoming items in an in-memory queue) has been removed for clarity. The message map consists of attributes/metadata and a string body. The SNS topic is set up to deliver raw messages — the content of each SNS notification appears verbatim as the body of each SQS message.

Moral Support

If things are starting to get tedious, remember: this is all happening in a Javascript runtime.

(defn connect-channels!
  [{:keys [port topic-name creds max-recent] :as config}
   {:keys [sightings-out sightings-in recent] :as channels}]
  (go
    (let [{:keys [queue-id topic-id]}
          (<! (util/topic-to-queue! config))]
      (sns-push-loop! creds topic-id sightings-in)
      (let [{deletes :in-chan}
            (sqs/batching-deletes creds queue-id)]
        (async/pipeline-async
         1
         sightings-out
         (partial sqs-incoming! deletes)
         (sqs/receive! creds queue-id))))))

There’s no longer any in-application connection between the outgoing and incoming sightings — AWS is tying them together: a channel containing messages received from our private SQS queue is being pipelined to sightings-out via the deletion/parsing logic above. See the fink-nottle.sqs.channeled documentation for details of batching-deletes and receive!

Lambda

Being more general than much of the above, there’s some sense in us exposing the topic/queue bridging logic as a Lambda function. As its possible to write Lambda functions in Clojurescript, it seems rude not to.

We won’t get into the details of the helpers used by topic-to-queue — a bunch of interdependent/sequential I/O, & ugly API details, much like the Lambda entry-point:

(def ^:export topic-to-queue
  (async-lambda-fn
   (fn [{:keys [topic-name queue-name]} context]
     (go
       (let [creds (eulalie.creds/env)
             topic-arn (<! (sns/create-topic! creds topic-name))
             {:keys [queue-url queue-arn]}
             (<! (create-queue! creds queue-name))]
         (<! (subscribe-queue! creds queue-arn topic-arn))
         {:topic-id topic-arn :queue-id queue-url})))))

:topic-name and :queue-name in, :topic-id (ARN) and :queue-id (URL) out. eulalie.creds/env fetches credentials from environment variables. In the case of a Lambda deployment, these’ll correspond to the IAM role the function is executing under.

Deployment

chemtrack-example$ lein cljs-lambda deploy

Invocation

Assuming the function is associated with an IAM role having sufficient SQS/SNS permissions:

(eulalie.lambda.util/request!
 creds :topic-to-queue
 {:topic-name topic-name
  :queue-name queue-name})

(The backend’s util/topic-to-queue! function is doing exactly this)

We can also test it’s working from the command line:

chemtrack-example$ lein cljs-lambda invoke topic-to-queue \
  '{"topic-name": "test-topic", "queue-name": "test-queue"}'
#  => {:queue-id "https://sqs.us-east-1.amazonaws.com...",
#      :topic-id "arn:aws:sns:us-east-1..."}

Tying Up

Out of compassion, I resisted the impulse to further complicate the example with features — Dynamo persistence being the hardest to leave out (because it would have been easy to sneak in).

Please let me know if you have any questions. The demo repository may help to clarify points which were skipped over in excerpt.

Image Credits