微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

造数据pandas -sqlalchemy

import pandas as pd
from sqlalchemy import create_engine
from urllib import parse
import uuid
import numpy as  np
import  time

def write_data_MysqL(db_conf, mode='r', table_name=None,dbname = 'finance_task_center_0',times=6,batch_size=500000):
    """
    :param db_conf: 数据库配置
    :param mode: 模式开启:r读取, w写入
    :param table_name: 分表名称
    :param dbname: 数据库db名
    :param times:  分几次写入
    :param batch_size: 每次写入多少行数据
    :return: None,认每次写入300w
    """
    user = db_conf["user"]
    pwd = parse.quote_plus(db_conf["pwd"])
    host = db_conf["host"]
    port = db_conf["port"]
    # dbname = 'finance_task_center_0'
    cols = ['task_id', 'id', 'company_id', 'zid', 'uid', 'type_id', 'sid', 'seller_id', 'run_type', 'origin_type',
            'execution_time', 'expiration_time', 'finish_time', 'market_id', 'market_code', 'repeat', 'message_id',
            'min_version', 'version', 'priority', 'status', 'data', 'result', 'gmt_create', 'gmt_modified']

    connect_info = f"MysqL+pyMysqL://{user}:{pwd}@{host}:{port}/" +f"{dbname}"
    engine = create_engine(connect_info)
    if mode == 'r':
        df = pd.read_sql_query('select * from {} limit 1'.format(table_name), engine)
        print('===========读取数据==============:')
        print(df.columns.tolist())
        print(df.values.tolist()[0])
    if mode == 'w':
        for t in range(times):
            st =time.time()
            genrator = gen_data(batch_size)
            # for i,v in enumerate(genrator):
            df = pd.DataFrame(genrator, columns=cols)
            print(df)
            end = time.time()
            df.to_sql(name=table_name, con=engine, if_exists='append', schema=dbname, index=False)
            print(f"=第{t+1}次写入行数{batch_size}==========finish=============== table_name : {table_name},cost time {end-st} 秒")
        print(f"\n@@@@@@@@=========={table_name}写入{(times*batch_size)} 行完成=================@@@@@@@@@@@@@@")

def gen_data(nums):
    """
    """
    data =[]
    for k in range(nums):
        taskid = abs(hash(uuid.uuid1().hex))
        type_id = np.random.choice([1, 2, 3, 4, 5], 1, True, [0.4, 0.3, 0.1, 0.1, 0.1])[0]
        row_data=[
            taskid,  # task id
             4,# id
            9012499355034237,  # companyid,
            100026,  # zid
             0, # UID
            type_id,  # type_id,
            117,  # `sid`,
            'A303OEQ77COZA8',  # `seller_id`,
            1  ,  #  runtype
            1, # origin type
            '2022-05-28 19:26:00', # excute time
            '2022-05-28 20:00:00' , # expire time
            '0000-00-00 00:00:00', # finish time
            1,  # `market_id`,
            'US',  # market_code`,
            0,  # `repeat`,
            '', # message  id
            '*',  # `min_version`,
            '', # version
            12,  # `priority`,
            0, #status,
            '{"report_date_type":"custom", "report_date_month":"2022-04", ' \
            '"report_custom_date_end":"2022-04-02", "report_custom_date_start":"2022-04-02"}',  # `data`,
            '' , # `result
            '2022-05-31 17:37:58',
            '2022-05-31 17:37:58'
        ]
        data.append(row_data)
    return  data
if __name__ == '__main__':
    db_conf2 = {
        "user": "test",
        "pwd": "test@vW9JMvL",
        "host": "192.188.119.1",
        "port": "3306",
        "db_name": "finance_task_center_0"
    }

    table_name = 'client_tasks_33'  # 'runoob_tbl'
    s = write_data_MysqL(db_conf2, table_name=table_name, mode='w',times=2,batch_size=1)

  

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐