微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

消息处理之时间格式转化 | Pandas 真的加速吗?

​前言

消息中字符串转时间戳是比较耗时间的,Pandas 在这块是尖刀,不过用法上还是要留点心的,不小心就白干了一场。

消息处理与耗时

  • 单条信息为JSON,大约572字节
  • 消息时间为 '2021/09/28 00:03:45.227895784'
  • 单次批量为 1000条

原始处理抽象代码如下:

def to_timestamp(dt):
    """dt转化为时间戳"""
    return time.mktime((dt.timetuple()))


def make_key(message):
    """生成唯一建"""
    return '%s_%s' % (message.get('VolId'), message.get('Id'))


def formatter(message):
    """
    简单时间处理
    :param message:
        create_time: '2021/08/12 01:01:19.220461019' 
        Wait: '6333991us'
    :return:
    """
    wait = message.get('Wait')
    u_wait = wait[:-2]
    wait_delta = timedelta(microseconds=int(u_wait))

    log_time = message.get("create_time")
    u_time = log_time.split('.')[0]
    u_time_obj = datetime.strptime(u_time, '%Y/%m/%d %H:%M:%s')

    alert_begin = u_time_obj - wait_delta
    alert_ts = to_timestamp(alert_begin)
    message['begin_ts'] = alert_ts
    message['alert_ts'] = alert_ts
    message['alert_count'] = 0
    message['Wait'] = float(u_wait) / 1e6
    return message


def cls_message_pure(raws):
    """消息分拣"""
    slow_dict = {}
    pending_dict = {}
    for message in raws:
        key = make_key(message)
        if message.get('flag') == 'Pending':
            pending_dict[key] = formatter(message)
        elif message.get('flag') == 'Slow':
            slow_dict[key] = formatter(message)
        else:
            pass
    return slow_dict, pending_dict

对消息做初步处理,生产主键,消息创建时间格式化,以及一些报警相关初始化。然后再做数据进行分拣,简单统计耗时如下

def run(data):
    t = time.time()
    cls_message_pure(data)
    print("formatter dt items expand %s" %(time.time() - t))


formatter dt items expand 0.0396201610565

才1千条数据分拣,业务逻辑还没上,就花了39毫秒,其实很慢了。

Pandas粉墨登场 | 批量加速?

对照 Pandas手册,全部都能搞定,代码如下:

def pandas_formatter(raws):
    t = time.time()
    df = pandas.DataFrame(raws)
    print "init %s" % (time.time() - t)

    slow_dict = {}
    pending_dict = {}
    # 一行直接批量转为时间戳,真香!
    df['create_time'] = pd.to_datetime(df['create_time'], utc='Asia/Shanghai').astype('int64')/1e9
    df['Wait'] = df['Wait'].str[:-2].astype('int64')/1e6
    df['alert_ts'] = df['begin_ts'] = df['create_time'] - df['Wait']
    df['alert_count'] = 0
    # 直接字段合并生产主键,看起来也香(实际性能不太行)
    df['key'] = df['VolId'].str.cat(df['Id'].astype('str').str, sep='_')
    print "traslate %s" % (time.time() - t)


    # 直接分拣,看起来也香!(实际真呵呵)
    groups = df.groupby(df.flag)
    slow_df = groups.get_group('Slow')
    pending_df = groups.get_group('Pending')
    print "cls %s" % (time.time() - t)

    # 还有 to_dict,还真是贴心。(实际呵呵的 N 次方)
    for _, k in slow_df.iterrows():
        item = k.to_dict()
        slow_dict[item['key']] = item
    for _, k in pending_df.iterrows():
        item = k.to_dict()
        pending_dict[item['key']] = item

    return slow_dict, pending_dict

一套批量操作全搞定,感觉不错,加个统计日志,来验证下:

init 0.0113050937653
traslate 0.0349180698395
cls 0.0542259216309
formatter run_pd items expand 0.358073949814

看到这个结果,我瞬间石化了!

各取所长,综合实战

个人认为 Pandas 比较擅长列处理,在时间处理上有大幅度优化。在列表与 DataFrame 来回转化耗时很大,按列直接输出性能很高。最终代码如下:

def pandas_formatter2(raws):
    t = time.time()
    ts = list(i['create_time'] for i in raws)
    series = pd.to_datetime(ts, utc='Asia/Shanghai').astype('int64')/1e9
    df = series.to_list()

    slow_dict = {}
    pending_dict = {}
    for index, message in enumerate(raws):
        wait = message.get('Wait')
        u_wait = float(wait[:-2]) / 1e6
        message['alert_ts'] = message['begin_ts'] = df[index] - u_wait
        message['alert_count'] = 0
        message['Wait'] = u_wait

        key = make_key(message)
        if message.get('flag') == 'Pending':
            pending_dict[key] = message
        elif message.get('flag') == 'Slow':
            slow_dict[key] = message
        else:
            pass
    return slow_dict, pending_dict

 测试结果如下:

formatter run_pd2 items expand 0.00854301452637

终于起到加速效果

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐