如何解决Google Cloud Spanner使用Google API合并Python中的SQL等效过程
如何使用Google API在Google Cloud Spanner中执行以下所述的合并SQL?
MERGE INTO TABLE2 B
USING (SELECT COL1,COL2,SUM(TOTAL_CNT)
FROM TABLE1 GROUP BY COL1,COL2) A
ON (B.COL1=A.COL1 AND B.COL2 = A.COL2)
WHEN MATCHED THEN
UPDATE SET B.TOTAL_CNT = B.TOTAL_CNT + A.TOTAL_CNT)
WHEN NOT MATCHED THEN
INSERT (COL1,TOTAL_CNT)
VALUES (A.COL1.A.COL2,A.TOTAL_CNT)
解决方法
无论何时必须执行合并SQL,都需要将其分解为2个步骤。 第一步是对目标表进行左联接,并获得所需的值和结果集,我们必须执行批处理Insert_or_update。这将节省大量查找,并且效率更高。我已将Batch Insert_or_update设置为多线程,以便您可以触发更多线程,并且处理将更快地完成。如果您不需要那么花哨的话,可以将其作为内联代码。
'''
import threading
import pandas as pd
import datetime
import time
from merge_ins_upd_using_df import merge_ins_upd_using_df
from google.cloud import spanner
# Instantiate a client.
spanner_client = spanner.Client()
# Your Cloud Spanner instance ID.
instance_id = 'spanner-instance'
# Get a Cloud Spanner instance by ID.
instance = spanner_client.instance(instance_id)
# Your Cloud Spanner database ID.
database_id = 'database-id'
max_thread_cnt = 30
threadLimiter = threading.BoundedSemaphore(max_thread_cnt)
thread_list = []
thread_count = 0
thread_cnt_before = 0
thread_counter = 0
sql_stmt = """ (SELECT A.COL1,A.COL2,SUM(A.TOTAL_CNT + COALESCE(B.TOTAL_CNT,0)) AS TOTAL_CNT
FROM (SELECT COL1,COL2,SUM(TOTAL_CNT) AS TOTAL_CNT
FROM TABLE1 GROUP BY COL1,COL2) A
LEFT JOIN TABLE2 B on (A.COL1 = B.COL1 AND A.COL2 = B.COL2) """
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id )
database = instance.database(database_id)
with database.snapshot() as snapshot:
results = snapshot.execute_sql(sql_stmt)
df = pd.DataFrame(results)
df.columns = ['COL1','COL2','TOTAL_CNT']
process_cnt = 10 # set this count based on the number of columns/index updates so that it wont go beyond 20,000 mutations limit
rec_cnt = df.shape[0]
print('Total Rec Count: ' + str(rec_cnt))
total_rec_processed = 0
from_index = 0
to_index = 0
dest_table = 'TABLE2'
### Build the threads
while True:
from_index = to_index
to_index = to_index + process_cnt
thread_counter = thread_counter + 1
if to_index > rec_cnt:
to_index = rec_cnt
df1 = df[from_index:to_index]
thread_count += 1
t = threading.Thread(target=merge_ins_upd_using_df,args=(instance_id,database_id,df1,thread_counter,dest_table))
thread_list.append(t)
total_rec_processed = total_rec_processed + process_cnt
# print("Threads Added: " + str(thread_count) + "Proc Count:" + str(total_rec_processed ))
if total_rec_processed >= rec_cnt:
break
begin = datetime.datetime.now()
print("Thread Kick-off has Started : " + str(begin))
print ("Thread Count before :" + str(threading.active_count()))
thread_cnt_before = threading.active_count()
# Starts threads
for thread in thread_list:
while threading.active_count() >= max_thread_cnt:
time.sleep(.05)
thread.start()
print ("Thread Count after :" + str(threading.active_count()))
print("All Threads have been kicked off : " + str(datetime.datetime.now()))
if thread_count > 0:
while threading.active_count() > thread_cnt_before:
time.sleep(2)
end = datetime.datetime.now()
diff = end-begin
print("Total time for completion in minutes : " + str(diff.total_seconds()/60))
####### function - merge_ins_upd_using_df
class merge_ins_upd_using_df:
def __init__(self,cs_instance,cs_database,df,dest_table):
self.cs_instance = cs_instance
self.cs_database = cs_database
self.thread_counter = thread_counter
self.df = df
self.dest_table = dest_table
from google.cloud import spanner
import datetime
begin = datetime.datetime.now()
spanner_client = spanner.Client()
instance = spanner_client.instance(cs_instance)
database = instance.database(cs_database)
with database.batch() as batch:
batch.insert_or_update(
table=dest_table,columns=df.columns,values=df.values.tolist())
end = datetime.datetime.now()
diff = end-begin
### add logic to handle exceptions
,
我想说,您可以使用类似的SQL子句(例如union和intersect)来实现您的目标,这个post详细说明了目标。我认为您对使用Join的响应的估计也不错。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。