Apache服务器传输数据的核心算法之桶队算法(bucket brigade algorithm)

水桶队(Bucket Brigade)是指一种消防技术,它通过人的链条将水引至火灾中。这种方法主要在机械化消防设备出现之前使用,在世界上一些不发达地区仍然可以使用。在这项技术中,一个人在水源附近装满一个水桶,水桶里的水可以用来灭火然后手把手地传递到最后一个扔到火上的人手中。水桶队仍然偶尔被用作消防演示和比赛的一个项目。

bucket
美: [ˈbʌkət]
英: [ˈbʌkɪt]
n. 吊桶;大量;(有提梁的)桶;水斗
v. 用桶装;〈美〉用桶打(水);〈英口〉催(马)猛奔
网络 水桶;铲斗;桶子

brigade
美: [brɪˈɡeɪd]
英: [brɪ’ɡeɪd]
n. 旅(陆军编制单位);(主张相同或其他某方面相似的)伙
v. 【军】把…编成旅;把…编成队[组]
网络 大队;班;大部队

Bucket Brigade 算法基于Bucket和Brigade的概念,是Apache服务器内部用来传输数据的算法。它通过创建一个包含多个Bucket的Brigade对象,并将其传递给下一个处理程序,从而实现对请求和响应数据流的管理和传输。

在Bucket Brigade算法中,每个模块可以使用Bucket API来创建和操作Bucket,然后将其添加到Brigade中。而Brigade则是由多个Bucket链接而成的链表结构,其中每个Bucket都有一个指向下一个Bucket的指针,这种链接方式类似于“接力棒”,因此得名Bucket Brigade

当请求或响应数据到达一个模块时,Bucket Brigade会将其打包为一个Brigade对象,并传递给下一个模块进行处理。下一个模块可以读取Brigade中的Bucket,并向其中添加新的Bucket,然后再将Brigade传递给下一个模块。这样,请求或响应数据就可以在一系列模块之间流动,并逐步被处理和修改,最终返回给客户端。

什么是网络流处理算法Bucket Brigad

网络流处理算法Bucket Brigade是一种基于Bucket Brigade传统概念的优化算法,它用于解决网络流问题。网络流问题是指在一个有向图中,从源节点到汇节点之间的最大或最小流量问题。

Bucket Brigade算法将网络流问题转化为多阶段的装载-卸载过程。它将每个节点看作一个桶,桶的容量表示节点的流量限制。算法从源节点开始,将数据通过每个节点(即桶)进行传递,并将数据带入目标节点(即汇节点)。不同于其他网络流处理算法,Bucket Brigade算法通过对数据进行拆分和合并来实现流量的平衡。

具体地说,Bucket Brigade算法使用了两个遗传算法操作:交叉和变异。在交叉操作中,算法从一个桶中选择一个元素,并将其插入到另一个桶中。在变异操作中,算法随机地改变一个元素在其所在的桶中的位置。这样在多次迭代后,算法能够找到一个最优的流量分配方案。

Bucket Brigade算法在解决网络流问题时表现出了较好的效果。它可以在较短时间内找到一个接近最优解的解决方案,并且能够处理大规模的网络流问题。

Bucket Brigad算法使用场景

Bucket Brigade算法是一种用于处理网络流的算法,它可以用于解决诸如路由、负载均衡和防火墙等问题。Bucket Brigade算法不依赖于Apache模块开发场景,也可以在其他网络应用中使用。

在路由方面,Bucket Brigade算法可以实现基于内容的路由,根据目标地址和请求内容来选择最佳的服务器进行响应。在负载均衡方面,Bucket Brigade算法可以帮助将流量尽可能平均地分配到可用的服务器上,从而提高系统的吞吐量和可靠性。在防火墙方面,Bucket Brigade算法可以协助检测和过滤网络流量中的恶意请求,从而保护网络安全。

总之,Bucket Brigade算法是一种通用的网络流处理算法,可以在各种网络应用中使用,并能够提高网络性能和安全性。

Bucket Brigade算法在开源产品中使用

  1. Apache Traffic Server:Apache Traffic Server是一个高性能的代理服务器和缓存,使用Bucket Brigade算法来提高网络性能和可靠性。
  2. NGINX:NGINX是一个流行的Web服务器和反向代理,使用Bucket Brigade算法来加速HTTP请求和响应处理。
  3. HAProxy:HAProxy是一种开源的负载均衡器和代理服务器,使用Bucket Brigade算法来平衡流量并提高系统的可伸缩性。
  4. Varnish Cache:Varnish Cache是一个高性能的HTTP加速器和反向代理,使用Bucket Brigade算法来提高响应速度和减少延迟。
  5. Squid Cache:Squid Cache是一个流行的代理缓存服务器,使用Bucket Brigade算法来提高请求和响应处理的效率。

总之,Bucket Brigade算法是一个通用的网络流处理算法,被广泛应用于各种开源产品中,并且已经被证明可以显著提高网络性能和可靠性。

Bucket Brigade算法并不仅限于HTTP场景,它可以用于任何需要处理网络流的情况。除了HTTP协议,Bucket Brigade算法还可以应用于TCP、UDP等协议上。

例如,在TCP和UDP协议中,Bucket Brigade算法可以帮助实现数据包的拆分和合并,从而提高传输效率和可靠性。在SSH、FTP等应用中,Bucket Brigade算法可以帮助传输文件和命令,并提高数据传输的速度和稳定性。

Bucket Brigade算法的优点是可以快速处理大量的网络数据流,同时还能够有效地减少内存消耗和CPU负载,因此在各种网络应用中都有应用前景。

Apache Modules编程中的bucket和brigade

bucket和brigade在Filter的开发中是必须要搞明白的东西。在Apache2.x的输出Filter中,Apache采用的块方式传递数据给每个Filter。

Bucket是Apache服务器处理请求和响应时内部传输数据的基本单元。它可以包含任何类型的数据,并且可以被链接到其他的Bucket来构建数据流。常见的Bucket包括文件、字符串、头信息、EOS(end of stream)等。

Brigade则是一个Bucket的集合,它们按照特定的顺序组成了一条数据链。这条链可以在各个模块之间传递,实现对请求或响应的处理。Brigade通常由三种类型的Bucket组成:Input Bucket、Output Bucket 和 Control Bucket。Input Bucket 用于存储请求数据,而 Output Bucket 用于存储响应数据。Control Bucket 则用于控制数据流的行为,如修改内容或结束数据流的传输。

当 Apache 处理输入数据时,它会将数据交给 input filter 链上的第一个 filter 进行处理,随后经过一系列 filter 处理后最终到达 handler。而在处理输出数据时,则是先由 handler 生成输出内容,再通过 output filter 链上的一系列 filter 进行处理后发送给客户端。

Apache根据Content Handler产生内容的大小(目前是固定的切分),把输出流按照一定规则切分成1个大块,用brigade封装(由1个以上buckets组成)。

bucket其实就是一个数据块,brigade其实就是把一系列bucket串起来的双向链表环,我们知道每次传给Filter一个块的时候,我们经常需要删除其中的数据、插入数据、移动数据、修改数据,每次数据操作完之后都要维护其链表不被破坏,这些都是由brigade来维护的,bucket是一个具体的数据操作对象。

bucket才是负责具体小块数据生命周期的,所以当我们遍历bridage中的一系列bucket的时候,每个bucket不一定已经准备好数据,所以我们必须要进行一次读取操作(其实质就是调用一次bucket指向的read方法)。

过滤器所加工的数据,存储在一种称为 桶 bucket 的容器中。 bucket 的实际存储可以是 文件\管道(pipe)\流(socket stream )堆内存(heap)甚至是栈内存(stack)。apache 提供了apr_bucket_read 方法,将 bucket 中的数据读取到用户指定的内存中。

开源代码实例

modsecrity v2.9.7 源码
apache2\apache2_io.c

/**
 * Reads request body from a client.
 */
apr_status_t read_request_body(modsec_rec *msr, char **error_msg) {
    request_rec *r = msr->r;
    unsigned int finished_reading;
    apr_bucket_brigade *bb_in;
    apr_bucket *bucket;

    if (error_msg == NULL) return -1;
    *error_msg = NULL;

    if (msr->reqbody_should_exist != 1) {
        if (msr->txcfg->debuglog_level >= 4) {
            msr_log(msr, 4, "Input filter: This request does not have a body.");
        }
        return 0;
    }

    if (msr->txcfg->reqbody_access != 1) {
        if (msr->txcfg->debuglog_level >= 4) {
            msr_log(msr, 4, "Input filter: Request body access not enabled.");
        }
        return 0;
    }

    if (msr->txcfg->debuglog_level >= 4) {
        msr_log(msr, 4, "Input filter: Reading request body.");
    }
    if (modsecurity_request_body_start(msr, error_msg) < 0) {
        return -1;
    }

    finished_reading = 0;
    msr->if_seen_eos = 0;
    bb_in = apr_brigade_create(msr->mp, r->connection->bucket_alloc);
    if (bb_in == NULL) return -1;
    do {
        apr_status_t rc;

        rc = ap_get_brigade(r->input_filters, bb_in, AP_MODE_READBYTES, APR_BLOCK_READ, HUGE_STRING_LEN);
        if (rc != APR_SUCCESS) {
            /* NOTE Apache returns AP_FILTER_ERROR here when the request is
             *      too large and APR_EGENERAL when the client disconnects.
             */
            switch(rc) {
                case APR_INCOMPLETE :
                    *error_msg = apr_psprintf(msr->mp, "Error reading request body: %s", get_apr_error(msr->mp, rc));
                    return -7;
                case APR_EOF :
                    *error_msg = apr_psprintf(msr->mp, "Error reading request body: %s", get_apr_error(msr->mp, rc));
                    return -6;
                case APR_TIMEUP :
                    *error_msg = apr_psprintf(msr->mp, "Error reading request body: %s", get_apr_error(msr->mp, rc));
                    return -4;
                case AP_FILTER_ERROR :
                    *error_msg = apr_psprintf(msr->mp, "Error reading request body: HTTP Error 413 - Request entity too large. (Most likely.)");
                    return -3;
                case APR_EGENERAL :
                    *error_msg = apr_psprintf(msr->mp, "Error reading request body: Client went away.");
                    return -2;
                default :
                    *error_msg = apr_psprintf(msr->mp, "Error reading request body: %s", get_apr_error(msr->mp, rc));
                    return -1;
            }
        }

        /* Loop through the buckets in the brigade in order
         * to extract the size of the data available.
         */
        for(bucket = APR_BRIGADE_FIRST(bb_in);
                bucket != APR_BRIGADE_SENTINEL(bb_in);
                bucket = APR_BUCKET_NEXT(bucket))
        {
            const char *buf;
            apr_size_t buflen;

            rc = apr_bucket_read(bucket, &buf, &buflen, APR_BLOCK_READ);
            if (rc != APR_SUCCESS) {
                *error_msg = apr_psprintf(msr->mp, "Failed reading input / bucket (%d): %s", rc, get_apr_error(msr->mp, rc));
                return -1;
            }

            if (msr->txcfg->debuglog_level >= 9) {
                msr_log(msr, 9, "Input filter: Bucket type %s contains %" APR_SIZE_T_FMT " bytes.",
                        bucket->type->name, buflen);
            }

            /* Check request body limit (should only trigger on chunked requests). */
            if (msr->reqbody_length + buflen > (apr_size_t)msr->txcfg->reqbody_limit) {
                if((msr->txcfg->is_enabled == MODSEC_ENABLED) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_REJECT)) {
                    *error_msg = apr_psprintf(msr->mp, "Request body is larger than the "
                            "configured limit (%ld).", msr->txcfg->reqbody_limit);
                    return -5;
                } else if((msr->txcfg->is_enabled == MODSEC_ENABLED) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_PARTIAL)) {

                    *error_msg = apr_psprintf(msr->mp, "Request body is larger than the "
                            "configured limit (%ld).", msr->txcfg->reqbody_limit);

                } else if ((msr->txcfg->is_enabled == MODSEC_DETECTION_ONLY) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_PARTIAL)){

                    *error_msg = apr_psprintf(msr->mp, "Request body is larger than the "
                            "configured limit (%ld).", msr->txcfg->reqbody_limit);

                } else if ((msr->txcfg->is_enabled == MODSEC_DETECTION_ONLY) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_REJECT)){

                    *error_msg = apr_psprintf(msr->mp, "Request body is larger than the "
                            "configured limit (%ld).", msr->txcfg->reqbody_limit);

                } else  {

                    *error_msg = apr_psprintf(msr->mp, "Request body is larger than the "
                            "configured limit (%ld).", msr->txcfg->reqbody_limit);

                    return -5;
                }
            }

            if (msr->txcfg->stream_inbody_inspection == 1)   {
#ifndef MSC_LARGE_STREAM_INPUT
                msr->stream_input_length+=buflen;
                modsecurity_request_body_to_stream(msr, buf, buflen, error_msg);
#else
                if (modsecurity_request_body_to_stream(msr, buf, buflen, error_msg) < 0) {
                    return -1;
                }
#endif
            }

            msr->reqbody_length += buflen;

            if (buflen != 0) {
                int rcbs = modsecurity_request_body_store(msr, buf, buflen, error_msg);

                if (msr->reqbody_length > (apr_size_t)msr->txcfg->reqbody_limit && msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_PARTIAL) {
                    finished_reading = 1;
                }

                if (rcbs < 0) {
                    if (rcbs == -5) {
                        if((msr->txcfg->is_enabled == MODSEC_ENABLED) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_REJECT)) {
                            *error_msg = apr_psprintf(msr->mp, "Request body no files data length is larger than the "
                                    "configured limit (%ld).", msr->txcfg->reqbody_no_files_limit);
                            return -5;
                        } else if ((msr->txcfg->is_enabled == MODSEC_ENABLED) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_PARTIAL)) {
                            *error_msg = apr_psprintf(msr->mp, "Request body no files data length is larger than the "
                                    "configured limit (%ld).", msr->txcfg->reqbody_no_files_limit);
                        } else if ((msr->txcfg->is_enabled == MODSEC_DETECTION_ONLY) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_PARTIAL)) {
                            *error_msg = apr_psprintf(msr->mp, "Request body no files data length is larger than the "
                                    "configured limit (%ld).", msr->txcfg->reqbody_no_files_limit);
                        } else {
                            *error_msg = apr_psprintf(msr->mp, "Request body no files data length is larger than the "
                                    "configured limit (%ld).", msr->txcfg->reqbody_no_files_limit);
                            return -5;
                        }
                    }

                    if((msr->txcfg->is_enabled == MODSEC_ENABLED) && (msr->txcfg->if_limit_action == REQUEST_BODY_LIMIT_ACTION_REJECT))
                        return -1;
                }

            }

            if (APR_BUCKET_IS_EOS(bucket)) {
                finished_reading = 1;
                msr->if_seen_eos = 1;
            }
        }

        apr_brigade_cleanup(bb_in);
    } while(!finished_reading);

    apr_status_t rcbe = modsecurity_request_body_end(msr, error_msg);

    if (msr->txcfg->debuglog_level >= 4) {
        msr_log(msr, 4, "Input filter: Completed receiving request body (length %" APR_SIZE_T_FMT ").",
                msr->reqbody_length);
    }

    msr->if_status = IF_STATUS_WANTS_TO_RUN;

    return rcbe;
}

这段代码是一个名为“read_request_body”的函数,它从HTTP客户端读取请求正文。它有两个参数:一个名为“msr”的modsec_rec结构指针和一个指向char指针的指针“error_msg”。

该函数首先检查是否存在要读取的请求正文,并检查是否启用了请求正文访问。如果不是,则返回0。如果这些检查通过,则函数开始读取请求正文。

该函数设置了一个**桶队列以从客户端读取数据。然后它进入一个循环,直到所有数据都被读取为止。在循环的每次迭代中,它使用Apache的ap_get_brigade函数读取下一块数据。然后它检查brigade中的每个bucket并从中提取数据。**它还检查请求体的大小是否超过了配置的限制,并处理请求体过大的情况。

如果启用了stream_inbody_inspection配置,则该函数将请求正文存储在一个流缓冲区中以供检查。该函数还更新请求正文长度,并将请求正文存储在临时缓冲区中以进行进一步处理。

一旦所有数据都被读取,该函数清理bucket brigade,结束请求正文处理,并设置一个状态标志,表示函数已成功完成。它返回apr_status_t值,指示操作的结果。

APR_BRIGADE_SENTINEL是Apache Portable Runtime (APR)库中的一个宏,用于返回一个指向bucket brigade的哨兵元素。bucket brigade是一种用于在Apache中处理数据的特殊数据结构,在输入输出过滤器中广泛使用。

这个宏定义的作用是获取指向bucket brigade结尾的哨兵元素,方便遍历bucket brigade。在bucket brigade中,每个bucket都是一个带有数据的数据单元,而哨兵元素是一个特殊的、空的bucket,用于标志bucket brigade的结尾,通常被称为“哨兵bucket”。

因此, 在对bucket brigade进行循环遍历时,可以使用APR_BRIGADE_FIRST宏获取第一个bucket,然后通过不断访问APR_BUCKET_NEXT来得到下一个bucket,直到遍历到哨兵元素(APR_BRIGADE_SENTINEL),表示bucket brigade已经遍历完毕。

03-30 07:11