使用到工具: 1. 存储 mongodb  2. 定时任务 crontab 数据存储空间( database = fs )说明:  collection =           functions(方法定义)           dispatch(调度定义)           joblist(待运行任务集)           result(结果)0. 调度逻辑 :   × 通过 定时任务 导入日志到 mongodb    × 根据个性统计需求,写 分析脚本 ( functions ) - 方法定义 ,参数说明    × 再 定义定时调度(dispatch) - 指定 定时周期 ,运行方法   × crontab 触发 调度集合 生成job类表,再另一个crontab运行 joblist中的已经实例化好的任务   × 结果存储到 result 中*/2 * * * * sh /data/shell/gmodel/dw_model/cron_mongodb.sh joblist_create 1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59 * * * *   sh /data/shell/gmodel/dw_model/cron_mongodb.sh job_run 定时 系统调度 :#!/bin/shact=$1echo "-$act----------------------- " >> /tmp/mongo.logif [ $act = "joblist_create" ]; then echo "joblist_create run .." >> /tmp/mongo.log echo `ps -ef|grep -v grep |grepcron_mongodb |grep joblist_create|wc -l` ; if [ `ps -ef|grep -v grep |grepcron_mongodb |grep joblist_create|wc -l` -lt "5" ] ;then  /root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval "    load('/data/shell/gmodel/dw_model/utils/utils.js');    print( 'joblist_create ',new Date,tojson(joblist_create()) ); " >> /tmp/mongo.log fifiif [ $act = "job_run" ]; then echo "job_run run .." >> /tmp/mongo.log if [ `ps -ef|grep -v grep |grepcron_mongodb |grep job_run|wc -l` -lt "5" ] ;then /root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval "    load('/data/shell/gmodel/dw_model/utils/utils.js');    print( 'job_run ',new Date,tojson( job_run() ) ); " >> /tmp/mongo.log fifi1. 初始化( init_mong.js ) collection  : var cfs = db.getMongo().getDB('fs').createCollection("functions") ;var dfs = db.getMongo().getDB('fs').createCollection("dispatch") ;var jfs = db.getMongo().getDB('fs').createCollection("joblist") ;var rfs = db.getMongo().getDB('fs').createCollection("result") ;var fs = db.getMongo().getDB('fs').functions ;var ds = db.getMongo().getDB('fs').dispatch ;var js = db.getMongo().getDB('fs').joblist ;var rs = db.getMongo().getDB('fs').result ;//方法名词 和版本号 联合 唯一fs.ensureIndex({ "name":1,"version":1}, {unique: true});//调度名称 唯一ds.ensureIndex({ "name":1 }, {unique: true});//任务list : 结果 = 名称 , 时间戳 , 动作戳( 方法参数 ) 联合唯一js.ensureIndex({ "name":1,"active_stamp":1,"timestamp":1 }, {unique: true});rs.ensureIndex({ "name":1,"active_stamp":1,"timestamp":1 }, {unique: true});/** */fs.drop();ds.drop();rs.drop() ;js.drop();2. function 定义 :fs.save({    // 包名    "package" : "statistics" ,    // function 名称 - 通过 utils 的 getf('statistics','test_statistics') 取得    "name" : "test_statistics" ,    // 描述    "desc" : "测试 调度,没有实现用处" ,    // 方法 版本号 getf('statistics','test_statistics','0.0')    // 如果 getf 没有第3个参数 ,默认取最后一个版本    "version" : "0.0" ,    // 方法运行 参数 描述    "param":{       "ab":"产品ID",       "st":"开始时间",       "et":"结束时间"    },    // 方法体 运行 取传参 为 this.param.ab ....   "body" : function(){      sleep(15000);      return {"ab":this.param.ab,"st":this.param.st,"et":this.param.et};   }});3. 调度  定义 :ds.save({    // 调度 名称 描述    "name" : "测试-1" ,    "desc" : "" ,    // 方法版本 和 方法名    "version" : "0.0" ,    "fun_name" : "test_statistics" ,    // 方法 参数 逻辑定义    "param" : {          //调度当前时间 可以使用 内置 "dispatch_time" new Date ,          //并且 可以使用 utils 中的方法          // 其 当参数 遇到 @ -          // 就会 load('utils.js') ; param.st = eval( .... ) ;          "ab": "100008" ,          "st": "@ time_displacement( dispatch_time ,'yyyy-MM-dd',-1 ) ",          "et": "@ time_displacement( dispatch_time ,'yyyy-MM-dd' ) "   },   //定时调度 仿照( crontab )   "run_timing" : {     "minute" : "00" , //0-59     "hour" : "*" , //0-23     "day" : "*" , //1-31     "weekday" : "*" //'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'   },   // 调度当前是否可用   "is_use" : "true",   // 此调度生成 job 最后次时间   // 作用 - 异常处理,可以通过 此属性,补没有生成的 joblist   "last_run" : "2010-04-21 00:00"});4. utils.js 时间操作 工具 http://www.mattkruse.com/javascript/date/source.html在其 load 上 添加一个 方法 /* 时间位移* time_displacement( new Date,'yyyy-MM-dd',-1 )*/function time_displacement(ndt,fmt,day,hour,min){  if(typeof ndate == "string") ndt = parseDate(ndt) ;  var ndate = new Date( ndt );  if(typeof day != "undefined") ndate.setDate( ndate.getDate()+day );  if(typeof hour != "undefined") ndate.setHours( ndate.getHours()+hour );  if(typeof min != "undefined") ndate.setMinutes( ndate.getMinutes()+min );  return formatDate(ndate,fmt);}5. utils.js  方法 function 取得/*** var fun = getf("assert","structure");* fun.body fun.varsion*/function getf(pack_age,fname,vers){  var dbs = 'fs';  var fs_coll = db.getMongo().getDB(dbs).functions ;  if(typeof vers == "undefined")     var fun = eval(' fs_coll.find({ "package":"'+pack_age+'", "name":"'+fname+'" }).sort({"version":-1})[0] ');  else    var fun = eval(' fs_coll.findOne({ "package":"'+pack_age+'", "name":"'+fname+'","version":"'+vers+'" }) ');  fun._id = "~";  return fun ;}6. 调度核心方法之一 : 调度时间戳生成 /*    生成 时间戳 :        根据 输入 时间差 ,        根据 定时 minute,hour,day,weekday 中查看是否有可能,触发运行        比如 : 2010-04-10 00:01 2010-04-11 23:59                在 00 10 * * 中 有 两次次触发    方法运行 :        get_timestamps( '2010-04-10 00:01','2010-04-11 23:59','00','10','*','*' )        就是 2010-04-10 10:00 , 2010-04-11 10:00    参数格式说明 :        stime,etime > new Date \ 'yyyy-MM-dd'        minute,hour,day = \d\d        weekday = 'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'*/function get_timestamps( stime,etime,        minute,hour,day,weekday ){    if(typeof stime == "string") stime = parseDate(stime);    if(typeof etime == "string") etime = parseDate(etime);    if(stime.getTime() > etime.getTime() )return [];    var et = formatDate(etime,'yyyy-MM-dd HH:mm');    var min_cycle = null ;    if( minute=="*" ) min_cycle = 1 ;    else if( min_cycle == null && hour=="*" ) min_cycle = 1*60 ;    else if ( min_cycle == null ) min_cycle = 1*60*24 ;    print( "min_cycle = ",min_cycle );    var tts = {};    //stime.setMinutes( stime.getMinutes() + min_cycle ) ;    while( stime.getTime() etime.getTime() ){        var st = formatDate(stime,'yyyy-MM-dd HH:mm');        if( minute != "*" && minute!="" ) st = st.replace(/(.*)(\d{2})$/,"$1"+minute);        if( hour != "*" && hour!="" ) st = st.replace(/(.*)(\d{2})(:\d{2})$/,"$1"+hour+"$3" ) ;        if( day != "*" && day!="" )st = st.replace(/(\d{4}-\d{2}-)(\d{2})(.*)/,"$1"+day+"$3") ;        if( ( weekday != "*" && weekday != ww ) || ( et st) ) {            stime.setMinutes( stime.getMinutes() + min_cycle ) ;            continue ;        }        // yyyy-MM-dd HH:mm        tts[ st ] = 0;        stime.setMinutes( stime.getMinutes() + min_cycle ) ;    }    var arr = [];    for(var c in tts ) { arr.push(c); }    return arr ;}joblist 生成 :/* 生成调度     ntime 调度运行 环境时间 默认当前joblist 生成 格式{    "_id" : ObjectId("4bd55d4a6d623399d78fd793"),    "name" : "测试-1",    "timestamp" : "2010-04-22 00:00",    "active_stamp" : "",    "func_obj" : {        "_id" : "~",        "package" : "statistics",        "name" : "test_statistics",        "desc" : "测试 调度,没有实现用处",        "version" : "0.0",        "param" : {            "ab" : "100008",            "st" : "2010-04-21",            "et" : "2010-04-22"        },        "body" : function cf__24__f_cf__12__f_() {                    sleep(15000);                    return {ab:this.param.ab, st:this.param.st, et:this.param.et};                }    },    "running_time" : {        "dispatch_start" : "",        "start" : "",        "end" : ""    },    "level" : "3",    "run_status" : "init",    "result" : ""}*/function joblist_create(ntime){  if(typeof ntime == "undefined") var nt = new Date ;  else if ( typeof ntime == "object" ) var nt = ntime ;  else var nt = parseDate(ntime);  var run_stat = {"all":0, "success":0,"error":0};  var dbs = 'fs';  var ds = db.getMongo().getDB(dbs).dispatch ;  var js = db.getMongo().getDB(dbs).joblist ;  var ff = ds.find({"is_use":"true"}) ;  //遍历出所有 可用 调度定义  while(ff.hasNext()){    run_stat.all += 1 ;    var disp = ff.next();    try{        //取得调度方法体        var fun = getf('statistics',disp.fun_name,disp.version);        //调度最晚生成 joblist 时间        if(typeof disp.last_run == "undefined" || disp.last_run=="" ) disp.last_run = formatDate(nt,'yyyy-MM-dd HH:mm:ss');        //根据 调度 最晚生成 joblist 时间 和 当前ntime 时间,取得 job 运行时间        var tks = get_timestamps( disp.last_run , nt ,                    disp.run_timing.minute,disp.run_timing.hour,disp.run_timing.day,disp.run_timing.weekday ) ;        for(var i=0;itks.length;i++ ){            // 调度参数生成中的 隐含对象 生成            var dispatch_time = parseDate( tks[i] ) ;            fun = getf('statistics',disp.fun_name,disp.version);            var pks = [];            // 参数实例化            for(var pk in fun.param){                pks.push(pk);                var pv = disp.param[pk] ;                if( /@/.test(pv) ) fun.param[pk] = eval( pv.replace(/^@(.*)$/,"$1") );                else fun.param[pk]=pv ;            }            // 生成动作戳            var active_stamp = "" ;            var pkss = pks.sort();            for(var ii;iipkss.length;ii++){                active_stamp += pkss[ii]+fun.param[pkss[ii]];            }            if( js.count({ "name": disp.name,"active_stamp":active_stamp ,"timestamp":tks[i]}) != 0 ) continue ;            js.save({                "name": disp.name ,                "timestamp" : tks[i] ,                "active_stamp":active_stamp,                "func_obj" : fun ,                "running_time":{                   "dispatch_start" : "" ,                   "start" : "" ,                   "end" : ""                },                "level" : "3" ,                "run_status" : "init" ,// 初始等待 init ,计算等待 wait , 运行中 running , 异常结束 error ,正常结束 end                "result" : "" // 结果去向 - 扩展            });            run_stat.success += 1 ;            disp.last_run = tks[i] ;        }        ds.save( disp );    }catch(err){ print(err); run_stat.error += 1 ;}  }  return run_stat ;}job_run :// 运行function job_run(ntime){  if(typeof ntime == "undefined") var nt = new Date ;  else if ( typeof ntime == "object" ) var nt = ntime ;  else var nt = parseDate(ntime);  var run_stat = {"all":0,"success":0,"error":0};  var dbs = 'fs';  var ds = db.getMongo().getDB(dbs).dispatch ;  var js = db.getMongo().getDB(dbs).joblist ;  var rs = db.getMongo().getDB(dbs).result ;  var ajob = [];  var ff = js.find({ "run_status":"init" }).sort({ "level":1 }).limit(5) ;  var dispatch_start = formatDate(nt,'yyyy-MM-dd HH:mm');  while(ff.hasNext()){    run_stat.all += 1 ;    try{        var job = ff.next();        job.run_status = "wait" ;        job.running_time.dispatch_start = dispatch_start ;        js.save(job);        ajob.push(job);    }catch(err){print(err);} } print("start run = ",run_stat.all ); for(var i=0;iajob.length;i++){    try{        var job = ajob[i] ;        if( rs.count({ "name":job.name,"active_stamp":job.active_stamp ,"timestamp":job.timestamp }) != 0 ) continue ;        var fun = job.func_obj;        job.run_status = "running" ;        job.running_time.start = formatDate(nt,'yyyy-MM-dd HH:mm');        js.save(job);        var res = fun.body() ;        rs.save({            "name" : job.name ,            "param" : fun.param ,            "timestamp" : job.timestamp ,            "active_stamp":job.active_stamp ,            "result" : res        });        job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm');        job.run_status = "end" ;        js.save(job);        run_stat.success += 1 ;    }catch(err){        print(err);        job.run_status = "error" ;        job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm');        js.save(job);        run_stat.error += 1 ;    } } return run_stat ;}
10-10 01:00