问题描述
我希望从clojure和通过标准流与此过程进行通信.
I wish to spawn a long-running sub-process from clojure andcommunicate with this process via the standard streams.
使用 conch 库,我可以生成并读取该过程,并从out
流中读取数据:
Using the conch library, I canspawn and read the process, and read data from the out
stream:
(def my-process (sh/proc "my_dumb_process"))
; read 10 lines from my-process's stdout. Will block until 10 lines taken
(take 10 (line-seq (clojure.java.io/reader (:out p))))
我想在我的进程打印时调用异步回调到stdout-只要stdout流中有可用数据.
I want to invoke an asynchronous callback whenever my-process printsto stdout - whenever data is available in the stdout stream.
我对Clojure有点陌生-是否有一种惯用的Clojur-ey方法这?我已经看过core.async,这很好,但是我找不到流的非阻塞解决方案.
I'm a bit new to clojure - is there an idiomatic clojur-ey way to dothis? I've looked through core.async which is nice but I can't find anon-blocking solution for streams.
推荐答案
出于我们的目的(请确保使其可执行)的示例shell脚本,请将其放置在clojure项目的根目录中以方便测试:
A sample shell script for our purposes (be sure to make it executable), place it in the root of your clojure project for easy testing:
$ cat dumb.sh
#!/bin/bash
for i in 1 2 3 4 5
do
echo "Loop iteration $i"
sleep 2
done
现在,我们将定义要执行,启动它并获取stdout((.getInputStream process)
),一次读取一行并循环直到完成的过程.实时读取.
Now we will define the process to execute, start it, and get stdout ((.getInputStream process)
), read one line at a time and loop until we're done. Reads in real time.
(defn run-proc
[proc-name arg-string callback]
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(callback line)
(recur))))))
要测试:
(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil
此函数将被阻止,对您的callback
的调用也将被阻止;如果您希望future
在单独的线程中运行,则可以将其包装起来:
This function will block, as will the call to your callback
; you can wrap in a future
if you want it to run in a separate thread:
(future (callback line))
对于基于core.async的方法:
For a core.async-based approach:
(defn run-proc-async
[proc-name arg-string callback]
(let [ch (async/chan 1000 (map callback))]
(async/thread
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(async/>!! ch line)
(recur))))))
ch))
这会将您的callback
函数作为换能器应用到通道上,结果放置在该函数返回的通道上:
This applies your callback
function as a transducer onto the channel, with the result being placed on the channel which the function returns:
(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
(println "Counted" cnt "characters")
cnt))
#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
(async/<!! *1)
=> 16
在此示例中,通道上有1000个缓冲区.因此,除非您开始从该频道接听,否则在读取1000行之后,对>!!
的调用将被阻止.您也可以将put!
与回调一起使用,但是这里有一个内置的1024个限制,无论如何您都应该处理结果.
In this example there is a buffer of 1000 on the channel. So, unless you begin to take from the channel, calls to >!!
will block after 1000 lines are read. You could alternatively use put!
with a callback, but there is a built-in 1024 limit here, and you should be processing the result anyway.
这篇关于如何在Clojure中从子流程执行非阻塞读取stdout?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!