


I wish to spawn a long-running sub-process from clojure andcommunicate with this process via the standard streams.

使用 conch 库,我可以生成并读取该过程,并从out流中读取数据:

Using the conch library, I canspawn and read the process, and read data from the out stream:

(def my-process (sh/proc "my_dumb_process"))
  ; read 10 lines from my-process's stdout. Will block until 10 lines taken
  (take 10 (line-seq (clojure.java.io/reader (:out p))))


I want to invoke an asynchronous callback whenever my-process printsto stdout - whenever data is available in the stdout stream.


I'm a bit new to clojure - is there an idiomatic clojur-ey way to dothis? I've looked through core.async which is nice but I can't find anon-blocking solution for streams.



A sample shell script for our purposes (be sure to make it executable), place it in the root of your clojure project for easy testing:

$ cat dumb.sh

for i in 1 2 3 4 5
    echo "Loop iteration $i"
    sleep 2

现在,我们将定义要执行,启动它并获取stdout((.getInputStream process)),一次读取一行并循环直到完成的过程.实时读取.

Now we will define the process to execute, start it, and get stdout ((.getInputStream process)), read one line at a time and loop until we're done. Reads in real time.

(defn run-proc
  [proc-name arg-string callback]
  (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
        process (.start pbuilder)]
    (with-open [reader (clojure.java.io/reader (.getInputStream process))]
      (loop []
        (when-let [line (.readLine ^java.io.BufferedReader reader)]
          (callback line)


(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil


This function will block, as will the call to your callback; you can wrap in a future if you want it to run in a separate thread:

(future (callback line))


For a core.async-based approach:

(defn run-proc-async
  [proc-name arg-string callback]
  (let [ch (async/chan 1000 (map callback))]
      (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
            process (.start pbuilder)]
        (with-open [reader (clojure.java.io/reader (.getInputStream process))]
          (loop []
            (when-let [line (.readLine ^java.io.BufferedReader reader)]
              (async/>!! ch line)


This applies your callback function as a transducer onto the channel, with the result being placed on the channel which the function returns:

(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
                                  (println "Counted" cnt "characters")

#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters

(async/<!! *1)
=> 16


In this example there is a buffer of 1000 on the channel. So, unless you begin to take from the channel, calls to >!! will block after 1000 lines are read. You could alternatively use put! with a callback, but there is a built-in 1024 limit here, and you should be processing the result anyway.


09-16 05:47