使比较2张表的速度更快Postgres / SQLAlchemy

如何解决使比较2张表的速度更快Postgres / SQLAlchemy

我用python写了一个代码来操作数据库中的表。我正在使用SQL Alchemy。基本上我有表1,其中有2 500 000条目。我还有另一个具有20000个条目的表2。基本上,我想做的是将表1中的源ip和dest ip与表2中的源ip和dest ip进行比较。如果存在匹配,我将表1中的ip源和ip dest替换为以下数据:匹配表2中的ip source和ip dest,并在表3中添加条目。我的代码还检查条目是否不在新表中。如果是这样,它将跳过它,然后继续下一行。 我的问题是它非常慢。我昨天启动了我的脚本,在24小时内,它仅经历了2500万个条目中的47 000个条目。我想知道是否仍然可以加快该过程。这是一个postgres db,我无法确定花费这么多时间的脚本是否合理,或者是否有问题。如果有人对类似的事情有类似的经验,那么完成之前需要花费多少时间? 非常感谢。

session = Session()
i = 0
start_id = 1
flows = session.query(Table1).filter(Table1.id >= start_id).all()
result_number = len(flows)
vlan_list = {"['0050']","['0130']","['0120']","['0011']","['0110']"}
while i < result_number:
    for flow in flows:
        if flow.vlan_destination in vlan_list:
            usage = session.query(Table2).filter(Table2.ip ==
                                                                                     str(flow.ip_destination)).all()
            if len(usage) > 0:
                usage = usage[0].usage
            else:
                usage = str(flow.ip_destination)
            usage_ip_src = session.query(Table2).filter(Table2.ip ==
                                                                                                    str(flow.ip_source)).all()
            if len(usage_ip_src) > 0:
                usage_ip_src = usage_ip_src[0].usage
            else:
                usage_ip_src = str(flow.ip_source)
            if flow.protocol == "17":
                protocol = func.REPLACE(flow.protocol,"17",'UDP')
            elif flow.protocol == "1":
                protocol = func.REPLACE(flow.protocol,"1",'ICMP')
            elif flow.protocol == "6":
                protocol = func.REPLACE(flow.protocol,"6",'TCP')
            else:
                protocol = flow.protocol
            is_in_db = session.query(Table3).filter(Table3.protocol ==
                                                                                            protocol)\
                .filter(Table3.application == flow.application)\
                .filter(Table3.destination_port == flow.destination_port)\
                .filter(Table3.vlan_destination == flow.vlan_destination)\
                .filter(Table3.usage_source == usage_ip_src)\
                .filter(Table3.state == flow.state)\
                .filter(Table3.usage_destination == usage).count()
            if is_in_db == 0:
                to_add = Table3(usage_ip_src,usage,protocol,flow.application,flow.destination_port,flow.vlan_destination,flow.state)
                session.add(to_add)
                session.flush()
                session.commit()
                print("added " + str(i))
            else:
                print("usage already in DB")
        i = i + 1

session.close()

编辑根据要求,这里有更多详细信息:表1有11列,我们感兴趣的两列是源ip和dest ip。 Table 1 在这里,我有表2:Table 2。它有一个IP和一个用法。我的脚本在做什么,它从表1中获取源ip和dest ip,并在表2中查找是否匹配。如果是,它将按用法替换ip地址,并将其与ip中的某些列一起添加表3中的表1:[Table3] [3] 为此,将协议列添加到表3中时,它将写协议名称而不是数字,只是为了使其更具可读性。

编辑2 我试图以不同的方式思考这个问题,所以我绘制了一个问题图Diagram (X problem) 我要弄清楚的是我的代码(Y解决方案)是否按预期工作。我只用python编码一个月,感觉好像搞砸了。我的代码应该从表1中获取每一行,然后将其与表2进行比较,然后将数据添加到表3中。我的表1拥有超过200万个条目,可以理解,虽然需要花费一些时间,但是它太慢了。例如,当我不得不将数据从API加载到数据库时,它的速度比我试图处理数据库中所有数据的比较要快。我正在具有足够内存的虚拟机上运行我的代码,因此我确定这是我的代码不足,因此我需要改进的方向。我的表格的屏幕截图:

表2

Table2

表3

Table3

表1

Table1

编辑3 :Postgresql查询

SELECT
  coalesce(table2_1.usage,table1.ip_source) AS coalesce_1,coalesce(table2_2.usage,table1.ip_destination) AS coalesce_2,CASE table1.protocol WHEN %(param_1) s THEN %(param_2) s WHEN %(param_3) s THEN %(param_4) s WHEN %(param_5) s THEN %(param_6) s ELSE table1.protocol END AS anon_1,table1.application AS table1_application,table1.destination_port AS table1_destination_port,table1.vlan_destination AS table1_vlan_destination,table1.state AS table1_state
FROM
  table1
  LEFT OUTER JOIN table2 AS table2_2 ON table2_2.ip = table1.ip_destination
  LEFT OUTER JOIN table2 AS table2_1 ON table2_1.ip = table1.ip_source
WHERE
  table1.vlan_destination IN (
    %(vlan_destination_1) s,%(vlan_destination_2) s,%(vlan_destination_3) s,%(vlan_destination_4) s,%(vlan_destination_5) s
  )
  AND NOT (
    EXISTS (
      SELECT
        1
      FROM
        table3
      WHERE
        table3.usage_source = coalesce(table2_1.usage,table1.ip_source)
        AND table3.usage_destination = coalesce(table2_2.usage,table1.ip_destination)
        AND table3.protocol = CASE table1.protocol WHEN %(param_1) s THEN %(param_2) s WHEN %(param_3) s THEN %(param_4) s WHEN %(param_5) s THEN %(param_6) s ELSE table1.protocol END
        AND table3.application = table1.application
        AND table3.destination_port = table1.destination_port
        AND table3.vlan_destination = table1.vlan_destination
        AND table3.state = table1.state
    )
  )


解决方法

鉴于当前的问题,我认为这至少与您可能追求的目标接近。这个想法是在数据库中执行整个操作,而不是获取所有内容(整个2,500,000行)并在Python中进行过滤等。

from sqlalchemy import func,case
from sqlalchemy.orm import aliased


def newhotness(session,vlan_list):
    # The query needs to join Table2 twice,so it has to be aliased
    dst = aliased(Table2)
    src = aliased(Table2)

    # Prepare required SQL expressions
    usage = func.coalesce(dst.usage,Table1.ip_destination)
    usage_ip_src = func.coalesce(src.usage,Table1.ip_source)
    protocol = case({"17": "UDP","1": "ICMP","6": "TCP"},value=Table1.protocol,else_=Table1.protocol)

    # Form a query producing the data to insert to Table3
    flows = session.query(
            usage_ip_src,usage,protocol,Table1.application,Table1.destination_port,Table1.vlan_destination,Table1.state).\
        outerjoin(dst,dst.ip == Table1.ip_destination).\
        outerjoin(src,src.ip == Table1.ip_source).\
        filter(Table1.vlan_destination.in_(vlan_list),~session.query(Table3).
                   filter_by(usage_source=usage_ip_src,usage_destination=usage,protocol=protocol,application=Table1.application,destination_port=Table1.destination_port,vlan_destination=Table1.vlan_destination,state=Table1.state).
                   exists())

    stmt = insert(Table3).from_select(
        ["usage_source","usage_destination","protocol","application","destination_port","vlan_destination","state"],flows)

    return session.execute(stmt)

如果vlan_list是选择性的,换句话说,过滤掉了大多数行,这将在数据库中执行更少的操作。根据{{​​1}}的大小,您可能会受益于索引Table2,但首先进行测试。如果相对较小,我猜想PostgreSQL将在那里执行散列或嵌套循环连接。如果用于过滤Table2.ip中重复项的列中的某些列是唯一的,则可以执行Table3而不是使用INSERT ... ON CONFLICT ... DO NOTHING子查询表达式删除SELECT中的重复项(哪个PostgreSQL将作为反连接执行)。如果NOT EXISTS查询可能会产生重复,请向其添加一个对flows的调用。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res