说明

UCS(Universal Calculation Standard)要求数据是以块进行组织的:

  • 1 数据的存储要按块
  • 2 数据的处理也是按块

内容

1 已经完成的部分

假设,数据是按照数值顺序编号的。最常见的是mysql的自增ID,因为这种范式比较好,所以我在Mongo(主库)里也实现了一个机制,可以自动进行编号。

三个函数的作用:

  • 1 get_brick_name : 获取某个id对应的brick编号,这个编号看起来像ip地址 0.0.0.0
  • 2 get_brick_list: 给到两个编号,给出中间所有的brick编号
  • 3 get_brick_bounds:给到一个编号,给出上下界id

每个编号(brick)对应1万条记录,然后编号之前是千进位的。所以理论上可以容纳10万亿。

#  ===================  获取编号
def get_brick_name(some_int_idx=None, shard_unit=1e13, part_unit=1e10, block_unit=1e7, brick_unit=1e4):
    # 确保 some_int_idx 是整数
    some_int_idx = int(some_int_idx)

    # 计算 shard 索引
    shard_idx = some_int_idx // shard_unit
    some_int_idx -= shard_idx * shard_unit

    # 计算 part 索引
    part_idx = some_int_idx // part_unit
    some_int_idx -= part_idx * part_unit

    # 计算 block 索引
    block_idx = some_int_idx // block_unit
    some_int_idx -= block_idx * block_unit

    # 计算 brick 索引
    brick_idx = some_int_idx // brick_unit

    # 将所有索引转换为整数
    shard_idx = int(shard_idx)
    part_idx = int(part_idx)
    block_idx = int(block_idx)
    brick_idx = int(brick_idx)

    # 返回格式化后的字符串
    return f'{shard_idx}.{part_idx}.{block_idx}.{brick_idx}'


这个函数是用来将整数索引转换为一种层次命名方案的。以下是函数的简要说明:

它接受一个整数索引作为输入,并可选地接受不同层级的缩放因子。
它依次将整数索引除以这些缩放因子,以提取不同粒度级别的索引(shard、part、block、brick)。
然后将每个级别格式化为一个字符串,并用点号分隔。


# ===================   生成列表
def get_brick_list(brick_name1=None, brick_name2=None, shard_unit=1e13, part_unit=1e10, block_unit=1e7, brick_unit=1e4):
    # 如果没有给定砖块名,则直接返回空列表
    if brick_name1 is None or brick_name2 is None:
        return []

    # 将砖块名解析为四个索引
    shard_idx1, part_idx1, block_idx1, brick_idx1 = map(int, brick_name1.split('.'))
    shard_idx2, part_idx2, block_idx2, brick_idx2 = map(int, brick_name2.split('.'))

    # 计算起始和结束的整数索引
    index1 = (shard_idx1 * shard_unit) + (part_idx1 * part_unit) + (block_idx1 * block_unit) + (brick_idx1 * brick_unit)
    index2 = (shard_idx2 * shard_unit) + (part_idx2 * part_unit) + (block_idx2 * block_unit) + (brick_idx2 * brick_unit)

    # 确定索引范围
    start_idx = min(index1, index2)
    stop_idx = max(index1, index2)

    # 生成两个索引之间的所有砖块名
    brick_names = []
    for idx in range(int(start_idx), int(stop_idx), int(brick_unit)):
        brick_name = get_brick_name(idx, shard_unit, part_unit, block_unit, brick_unit)
        brick_names.append(brick_name)

    return brick_names

# 测试
# brick_names = get_brick_list("0.0.2.3", "0.0.3.5")
# print(brick_names)


# =================== 获得每一个编号的上下界
def get_brick_bounds(brick_name, shard_unit=1e13, part_unit=1e10, block_unit=1e7, brick_unit=1e4):
    shard_idx, part_idx, block_idx, brick_idx = map(int, brick_name.split('.'))
    lower_bound = (shard_idx * shard_unit) + (part_idx * part_unit) + (block_idx * block_unit) + (brick_idx * brick_unit)
    upper_bound = lower_bound + brick_unit 
    return lower_bound, upper_bound


这个函数看起来是用来计算给定brick名的边界值的。以下是这个函数的说明:

它接受一个brick名作为输入,并可选地接受不同层级的缩放因子。
它将brick名分割成四个部分(shard、part、block、brick),然后计算brick的下限和上限。
下限是所有部分索引相乘并加总,上限则是下限加上brick单元大小再减1

2 本次要做的内容

还有一种常见的顺序编号是时间。将概念进行对应

shard ~ 年
part ~ 月
block ~ 日
brick ~ 时

例如: 2024.5.1.0。对应的进位关系一目了然,现在就看怎么进行函数封装。

嗯,想想发现,其实标准的时间格式,本来就做了层级的顺序编号。如2024-01-01 11:11:11,无非是要把2024,01,01,11提取出来即可。参照上面的三个函数:

  • 1 get_time_brick_name ~ get_brick_name, 只要将数据转换为字符,然后提取即可。从规范化的角度,可以是2024.01.01.11这样的格式。
  • 2 get_time_brick_bounds ~ get_brick_bounds, 获取当前时间,然后按规则下推一个就可以了。
  • 3 get_time_brick_list ~ get_brick_list ,这个应该是最需要修改的地方了。目前的基础是yymon_dict,
# op043_ATimer2.py
# 基于时间轴进行推算

import pandas as pd 
import numpy as np 
class ATimer2:

    @staticmethod
    # 分离月 + 日:时:分:秒
    def depart_mon_ddhhmmss(dt_str):
        pos = dt_str.rfind('-')
        yymon = dt_str[:pos]+'-01'
        ddhhmmss = dt_str[pos+1:]
        return yymon, ddhhmmss

    @staticmethod
    def ddhhmmss2s(dhms_str):
        _d, _hms = dhms_str.split()
        _h,_m,_s = _hms.split(':')
        return 86400*(int(_d)-1) + 3600 *(int(_h)) + 60 *(int(_m)) + int(_s)

    @staticmethod
    def gapts2day(some_ts):
        _d = some_ts//86400
        _res1 = some_ts % 86400
        _h = _res1 //3600
        _res2 =  _res1 % 3600
        _m = _res2 //60
        _s = _res2 % 60
        return "-{:02d} {:02d}:{:02d}:{:02d}".format( _d + 1,_h,_m,_s)

    # 初始化
    def __init__(self,yymon_dict = None):
        # 引入yymon_dict
        self.yymon_dict = yymon_dict
        # 获取有序列表的数值(时间戳)
        self.time_axis = np.array(list(yymon_dict.values()))
        # 获取有序列表的字符(日期)
        self.dt_axis = list(yymon_dict.keys())

    # 【变换】 标准时间字符:  2000-01-01 11:11:11
    def char2num(self, some_dt_str = None):
        yymon, dhms  =self.depart_mon_ddhhmmss(some_dt_str)
        the_ts = self.yymon_dict[yymon]  + self.ddhhmmss2s(dhms)
        return the_ts

    # 【变换】 
    def num2char(self, the_ts = None):
        # 将时间戳转为字符
        pos= max(np.where(self.time_axis <= the_ts)[0])
        # 月字符
        base_dt = self.dt_axis[pos]
        # 月字符对应的时间戳
        base_ts = self.time_axis[pos]
        gap_ts = the_ts - base_ts
        char_ts = base_dt[:7] + self.gapts2day(gap_ts)
        return char_ts

    # 【偏移】 - 默认的时间戳偏移是不必计算的
    def n_period_btw(self, start_ts = None, end_ts = None):
        pass 
    # 【偏移】- 通用的字符时间
    def c_period_btw(self, start_dt = None, end_dt = None):
        start_ts = self.char2num(start_dt)
        end_ts = self.char2num(end_dt)
        return end_ts - start_ts

    # 【偏移】 - 使用特定轴的时间偏移:采用轴 + 位置索引 + 偏差
    # 1 维持分钟级时间戳
    # 2 每分钟数据落库则更新时间戳(因此会有1分钟的延迟)
    # A_trade_slot_axis 传入交易的分钟级时间轴
    def n_period_btw_A_trade(self, start_ts = None, end_ts = None, A_trade_slot_axis = None):
        pass

思路:

根据起止时间,可以获得两个时间对应的起始月。因此,只要生成从起始月(1号)开始的所有天,然后再根据小时就可以生成所有的brick_name。然后再按照起止时间进行筛选即可。

首先,添加一个函数

    # 生成一个月的brick
    def _gen_month_brick(self, month_idx = None ):
        cur_monyy_pos = month_idx
        next_monyy_pos = cur_monyy_pos + 1
        
        monyy_start = yymon_dict[self.dt_axis[cur_monyy_pos]]
        monyy_end =  yymon_dict[self.dt_axis[next_monyy_pos]]
        the_month_days = int((monyy_end - monyy_start)/86400)

        cur_yymonth = self.dt_axis[cur_monyy_pos][:7].replace('-','.')

        cur_month_bricks = []
        for i in range(1, the_month_days + 1):
            for j in range(24):
                _tem = '.'.join([cur_yymonth, str(i).zfill(2), str(j).zfill(2)])
                cur_month_bricks.append(_tem)
        return cur_month_bricks

at2._gen_month_brick(648)
['2024.01.01.00',
 '2024.01.01.01',
 '2024.01.01.02',
 ...
  '2024.01.31.21',
 '2024.01.31.22',
 '2024.01.31.23']

一个大月有744个brick,一年不到9000, 十年不到9万,感觉上这个切分还是可以的。

get_time_brick_name

    # 生成brick_name | 输入可以是标准时间字符或者是时间戳(int or float)
    def get_time_brick_name(self, char_or_num = None):
        if not isinstance(char_or_num, str):
            the_char = self.num2char(char_or_num)
        else:
            the_char = char_or_num

        dt, hms = the_char.split()
        brick_name = '.'.join(dt.split('-') +[hms.split(':')[0]] )
        return brick_name

# 样例字符
some_dt_str = '2024-01-31 11:11:11'
# 字符转数值
at2.char2num(some_dt_str)
# 数值转字符
at2.num2char(the_ts=1706670671)
'2024-01-31 11:11:11'
at2.get_time_brick_name(some_dt_str)
'2024.01.31.11'
at2.get_time_brick_name(1706670671)
'2024.01.31.11'

get_time_brick_bounds

    # 生成brick区间 | 可以返回两种区间:字符串区间和时间戳区间(数值型)
    def get_time_brick_bounds(self, some_brick_name = None, char_or_num = 'char'):
        y,m,d,h = some_brick_name.split('.')
        the_char_start = '-'.join([y,m,d]) + ' ' + ':'.join([h,'00','00'])
        the_num_start = self.char2num(the_char_start)
        the_num_end = the_num_start+3600
        the_char_end = self.num2char(the_num_end)
        if char_or_num =='char':
            return the_char_start,the_char_end
        else:
            return the_num_start,the_num_end

at2.get_time_brick_bounds('2024.01.31.11')
('2024-01-31 11:00:00', '2024-01-31 12:00:00')
at2.get_time_brick_bounds('2024.01.31.23')
('2024-01-31 23:00:00', '2024-02-01 00:00:00')
at2.get_time_brick_bounds('2024.01.31.11', char_or_num='num')
(1706670000, 1706673600)

get_time_brick_list

    # 生成brick_name区间
    def get_time_brick_list(self,brick_name1 = None,brick_name2= None):
        start_brick_name, end_brick_name = sorted(list([brick_name1, brick_name2]))
        start_y, start_m, start_d, start_h = start_brick_name.split('.')
        end_y, end_m, end_d, end_h = end_brick_name.split('.')

        start_yymon = '-'.join([start_y,start_m,'01'])
        end_yymon = '-'.join([end_y,end_m,'01'])

        start_pos = self.dt_axis.index(start_yymon)
        end_pos = self.dt_axis.index(end_yymon)

        brick_name_list = []
        for some_pos in range(start_pos, end_pos+1):
            tem_list = self._gen_month_brick(month_idx = some_pos)
            brick_name_list.append(tem_list)

        brick_name_list1 = sorted(self.flatten_list(brick_name_list))
        start_idx = brick_name_list1.index(start_brick_name)
        end_idx = brick_name_list1.index(end_brick_name)
        return brick_name_list1[start_idx:end_idx]

at2.get_time_brick_list('2024.01.31.11', '2024.01.31.13')
['2024.01.31.11', '2024.01.31.12']
at2.get_time_brick_list('2024.01.31.11', '2024.02.02.13')
['2024.01.31.11',
 '2024.01.31.12',
 '2024.01.31.13',
 '2024.01.31.14',
 ...
 '2024.02.02.10',
 '2024.02.02.11',
 '2024.02.02.12']

基本上ok了。

05-03 07:51