我试图通过node.js和typescript中的udp数据报实现自定义通信协议。在这个协议中,我有一些命令必须以特定的顺序发送给微控制器,每个命令都必须等待微控制器的前一个ACK才能发送。但是,鉴于No.js’ccc>模块的异步和“套接字中心”的思想,我很难找到正确的方法来实现这一点。
到目前为止,我创建和dgram,以及各种具体的孩子(abstract class ProtocolCommandStartFirmwareUpgradeCommandWriteCommand)。所有类都由一个EndFirmwareUpgradeCommand类使用,该类应该协调所有要执行的命令。我将在下面附加抽象类和一个示例类。此外,命令的数量是可变的(更具体地说,在StimeStdRealSub升级之后,我有可变数量的写入命令,在其中,我将固件字节发送给微控制器)。
Protocol命令:

import q = require('q');

export abstract class ProtocolCommand {
    protected socket:dgram.Socket;
    protected ip:string;
    protected port:number;
    protected deferred;

    constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
        this.socket = socket;
        this.ip = ip;
        this.port = port;
        this.deferred = deferred;
    }

    protected callback(data, sender) {
        this.socket.removeListener('message', this.callback);
        this.deferred.resolve(data);
    }

    abstract executeCommand():void;
}

startfirmwareupgrade命令:
import dgram = require('dgram');
import {ProtocolCommand} from "./ProtocolCommand";
import CRC = require('./CRC');
import q = require('q');

export class StartFirmwareUpgradeCommand extends ProtocolCommand {
    private header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00];
    private data = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36];

    constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
        super(socket, ip, port, deferred);
    }

    executeCommand() {
        let commandBytes = this.header.concat(this.data);
        let crcBytes = CRC.CRC16(commandBytes);
        commandBytes = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
        this.socket.on('message', (data, sender) => {
            this.callback(data, sender);
        });
        this.socket.send(new Buffer(commandBytes), 0, commandBytes.length, this.port, this.ip);
        return this.deferred.promise;
    }
}

协议:
import dgram = require('dgram');
import {StartFirmwareUpgradeCommand} from "./StartFirmwareUpgradeCommand";
import {EndFirmwareUpgradeCommand} from "./EndFirmwareUpgradeCommand";
import {DiscoveryCommand} from "./DiscoveryCommand";
import q = require('q');

export class Protocol {
    private socket;
    private ip:string;
    private port:number;

    constructor(ip:string, port:number) {
        this.ip = ip;
        this.port = port;
        this.socket = dgram.createSocket('udp4');
        this.socket.bind();
    }

    upgradeFirmware(data:Uint8Array) {
        let globalDeferred = q.defer();

        //FIXME UGLY AS HELL!
        new StartFirmwareUpgradeCommand(this.socket, this.ip, this.port, globalDeferred).executeCommand()
            .then((data) => {

            })
            .then((data) => {

            });
        //TODO send n*write firmware command, wait for every ack
        for (let i = 0; i < data.length / 128; i++) {

        }
        //new EndFirmwareUpgradeCommand(this.socket, this.ip, this.port).executeCommand();
        //TODO send end firmware command, wait for ack
    }
}

正如你所看到的,我目前正在使用Protocol使用承诺,尽量避免回调,但我真的很努力找到一个体面的方式来编码一切。任何帮助都将不胜感激。

最佳答案

根据@gilamran的建议,这里有一个rxjs实现。它只等待收到响应后再发送下一个请求。它无法处理错误要求您刷新队列的情况。

import dgram = require('dgram');
import Rx    = require('rx');   // or use rx.lite if you need something smaller.

let commandQueue = Rx.Subject();
let socket       = dgram.createSocket('udp4');
let ip           = '127.0.0.1';
let port         = '10000';

// let `req` be an object with { ip, port, header, data }.
// sendCommand :: Request -> Observable of Responses
function sendCommand (req) {
    // return this Observable so we can use Rx.Observable.concat later to
    // block while waiting for a response.
    return Rx.Observable.create(obs => {
        let   commandBytes = req.header.concat(req.data);
        const crcBytes     = CRC.CRC16(commandBytes);
        commandBytes       = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
        this.socket.on('message', (data, sender) => {
            // pass this information on for further processing?
            obs.onNext({
                data,
                sender,
            });
            obs.onCompleted();  // close this observable so `.concat` switches to next request.
        });
        socket.send(new Buffer(commandBytes), 0, commandBytes.length, req.port, req.ip)
    });
}

// ok. let's setup the downstream side of our queue.
// 1. take a request, send a packet, return an observable of one response.
// 2. wait for the observable of one response to complete.

let responses = commandQueue
    .concatMap(sendCommand);    // takes command requests and turns them into data/sender responses.

// we must subscribe to pull values through our observable chain.
responses.subscribe();

let cmdStartFirmwareUpgrade = (ip, port) => {
    return {
        ip,
        port,
        header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00],
        data   = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36],
    };
};

let cmdDiscovery = (ip, port) => { /* ... */ };
let cmdEndFirmwareUpgrade = (ip, port) => { /* ... */ };

// now let's put some commands in the queue.
commandQueue.onNext(cmdStartFirmwareUpgrade(ip, port));
commandQueue.onNext(cmdDiscovery(ip, port));
commandQueue.onNext(cmdEndFirmwareUpgrade(ip, port));

这个例子当然可能很枯燥,但是我更喜欢使用ramda curried函数而不是类,所以我没有提到这一点。
您可以在https://runkit.com/boxofrox/rxjs-queue找到这个设计的顺序性质的演示。

09-18 04:22