Google Cloud Spanner使用Google API合并Python中的SQL等效过程

如何解决Google Cloud Spanner使用Google API合并Python中的SQL等效过程

如何使用Google API在Google Cloud Spanner中执行以下所述的合并SQL?

MERGE INTO TABLE2 B 
USING (SELECT COL1,COL2,SUM(TOTAL_CNT) 
 FROM TABLE1 GROUP BY COL1,COL2) A 
   ON (B.COL1=A.COL1 AND B.COL2 = A.COL2)
 WHEN MATCHED THEN
UPDATE SET B.TOTAL_CNT = B.TOTAL_CNT + A.TOTAL_CNT) 
 WHEN NOT MATCHED THEN
INSERT (COL1,TOTAL_CNT) 
VALUES (A.COL1.A.COL2,A.TOTAL_CNT)

解决方法

无论何时必须执行合并SQL,都需要将其分解为2个步骤。 第一步是对目标表进行左联接,并获得所需的值和结果集,我们必须执行批处理Insert_or_update。这将节省大量查找,并且效率更高。我已将Batch Insert_or_update设置为多线程,以便您可以触发更多线程,并且处理将更快地完成。如果您不需要那么花哨的话,可以将其作为内联代码。

    '''
    import threading
    import pandas as pd
    import datetime
    import time
    from merge_ins_upd_using_df import merge_ins_upd_using_df 
    from google.cloud import spanner

    # Instantiate a client.
    spanner_client = spanner.Client()

    # Your Cloud Spanner instance ID.
    instance_id = 'spanner-instance'

    # Get a Cloud Spanner instance by ID.
    instance = spanner_client.instance(instance_id)

    # Your Cloud Spanner database ID.
    database_id = 'database-id'

    max_thread_cnt = 30
    threadLimiter = threading.BoundedSemaphore(max_thread_cnt)
    thread_list = []
    thread_count = 0
    thread_cnt_before = 0
    thread_counter = 0

         
    sql_stmt = """ (SELECT A.COL1,A.COL2,SUM(A.TOTAL_CNT +  COALESCE(B.TOTAL_CNT,0)) AS TOTAL_CNT
                      FROM (SELECT COL1,COL2,SUM(TOTAL_CNT) AS TOTAL_CNT 
                      FROM TABLE1 GROUP BY COL1,COL2) A 
                      LEFT JOIN TABLE2 B on (A.COL1 = B.COL1 AND A.COL2 = B.COL2) """
                    

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id )
    database = instance.database(database_id)
    with database.snapshot() as snapshot:
         results = snapshot.execute_sql(sql_stmt)

    df = pd.DataFrame(results)
    df.columns = ['COL1','COL2','TOTAL_CNT']

    process_cnt = 10  # set this count based on the number of columns/index updates so that it wont go beyond 20,000 mutations limit
    rec_cnt = df.shape[0]
    print('Total Rec Count: ' + str(rec_cnt))
    total_rec_processed = 0 
    from_index = 0 
    to_index = 0
    dest_table = 'TABLE2'

    ### Build the threads

    while True:
        from_index = to_index
        to_index = to_index + process_cnt
        thread_counter = thread_counter + 1
        if to_index > rec_cnt:
           to_index = rec_cnt
        df1 = df[from_index:to_index]
        thread_count += 1
        t = threading.Thread(target=merge_ins_upd_using_df,args=(instance_id,database_id,df1,thread_counter,dest_table))
        thread_list.append(t)
        total_rec_processed = total_rec_processed + process_cnt
    #    print("Threads Added: " + str(thread_count) + "Proc Count:" + str(total_rec_processed ))
        if total_rec_processed >= rec_cnt:
           break 

    begin = datetime.datetime.now()

    print("Thread Kick-off has Started : " + str(begin))

    print ("Thread Count before :" +  str(threading.active_count()))

    thread_cnt_before = threading.active_count()

    # Starts threads
    for thread in thread_list:
        while threading.active_count() >= max_thread_cnt:
              time.sleep(.05)
        thread.start()
        
    print ("Thread Count after :" +  str(threading.active_count()))

    print("All Threads have been kicked off : " + str(datetime.datetime.now()))    

    if thread_count > 0:
       while threading.active_count() > thread_cnt_before:
             time.sleep(2)

    end = datetime.datetime.now()
    diff = end-begin
    print("Total time for completion in minutes : " + str(diff.total_seconds()/60))


   #######  function - merge_ins_upd_using_df
 class merge_ins_upd_using_df:
  def __init__(self,cs_instance,cs_database,df,dest_table):
    self.cs_instance = cs_instance
    self.cs_database = cs_database
    self.thread_counter = thread_counter
    self.df = df
    self.dest_table = dest_table

    from google.cloud import spanner
    import datetime

    begin = datetime.datetime.now()
               
    spanner_client = spanner.Client()
    instance = spanner_client.instance(cs_instance)
    database = instance.database(cs_database)

    with database.batch() as batch:
         batch.insert_or_update(
          table=dest_table,columns=df.columns,values=df.values.tolist())
    
    end = datetime.datetime.now()
    diff = end-begin
### add logic to handle exceptions
,

我想说,您可以使用类似的SQL子句(例如union和intersect)来实现您的目标,这个post详细说明了目标。我认为您对使用Join的响应的估计也不错。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res