本文介绍了如何在Clojure中从子流程执行非阻塞读取stdout?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望从clojure和通过标准流与此过程进行通信.

I wish to spawn a long-running sub-process from clojure andcommunicate with this process via the standard streams.

使用 conch 库,我可以生成并读取该过程,并从out流中读取数据:

Using the conch library, I canspawn and read the process, and read data from the out stream:

(def my-process (sh/proc "my_dumb_process"))
  ; read 10 lines from my-process's stdout. Will block until 10 lines taken
  (take 10 (line-seq (clojure.java.io/reader (:out p))))

我想在我的进程打印时调用异步回调到stdout-只要stdout流中有可用数据.

I want to invoke an asynchronous callback whenever my-process printsto stdout - whenever data is available in the stdout stream.

我对Clojure有点陌生-是否有一种惯用的Clojur-ey方法这?我已经看过core.async,这很好,但是我找不到流的非阻塞解决方案.

I'm a bit new to clojure - is there an idiomatic clojur-ey way to dothis? I've looked through core.async which is nice but I can't find anon-blocking solution for streams.

推荐答案

出于我们的目的(请确保使其可执行)的示例shell脚本,请将其放置在clojure项目的根目录中以方便测试:

A sample shell script for our purposes (be sure to make it executable), place it in the root of your clojure project for easy testing:

$ cat dumb.sh
#!/bin/bash

for i in 1 2 3 4 5
do
    echo "Loop iteration $i"
    sleep 2
done

现在,我们将定义要执行,启动它并获取stdout((.getInputStream process)),一次读取一行并循环直到完成的过程.实时读取.

Now we will define the process to execute, start it, and get stdout ((.getInputStream process)), read one line at a time and loop until we're done. Reads in real time.

(defn run-proc
  [proc-name arg-string callback]
  (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
        process (.start pbuilder)]
    (with-open [reader (clojure.java.io/reader (.getInputStream process))]
      (loop []
        (when-let [line (.readLine ^java.io.BufferedReader reader)]
          (callback line)
          (recur))))))

要测试:

(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil

此函数将被阻止,对您的callback的调用也将被阻止;如果您希望future在单独的线程中运行,则可以将其包装起来:

This function will block, as will the call to your callback; you can wrap in a future if you want it to run in a separate thread:

(future (callback line))

对于基于core.async的方法:

For a core.async-based approach:

(defn run-proc-async
  [proc-name arg-string callback]
  (let [ch (async/chan 1000 (map callback))]
    (async/thread
      (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
            process (.start pbuilder)]
        (with-open [reader (clojure.java.io/reader (.getInputStream process))]
          (loop []
            (when-let [line (.readLine ^java.io.BufferedReader reader)]
              (async/>!! ch line)
              (recur))))))
    ch))

这会将您的callback函数作为换能器应用到通道上,结果放置在该函数返回的通道上:

This applies your callback function as a transducer onto the channel, with the result being placed on the channel which the function returns:

(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
                                  (println "Counted" cnt "characters")
                                  cnt))

#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters

(async/<!! *1)
=> 16

在此示例中,通道上有1000个缓冲区.因此,除非您开始从该频道接听,否则在读取1000行之后,对>!!的调用将被阻止.您也可以将put!与回调一起使用,但是这里有一个内置的1024个限制,无论如何您都应该处理结果.

In this example there is a buffer of 1000 on the channel. So, unless you begin to take from the channel, calls to >!! will block after 1000 lines are read. You could alternatively use put! with a callback, but there is a built-in 1024 limit here, and you should be processing the result anyway.

这篇关于如何在Clojure中从子流程执行非阻塞读取stdout?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-16 05:47