import os import sys #import datetime import pyspark.sql.functions as F from pyspark.sql.types import * from pyspark.sql import SparkSession #不启动broadcastJoin 、conf spark.speculation=true spark = SparkSession \ .builder \ .appName("app_test.py") \ .enableHiveSupport() \ .config("spark.dynamicAllocation.maxExecutors", "400") \ .config("spark.sql.autobroadcastJoinThreshold",-1) \ .config("spark.yarn.executor.memoryOverhead", 3702) \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.adaptive.repartition.enabled", "true") \ .config("spark.log.level", "ERROR") \ .config("spark.speculation", "true") \ .config("spark.sql.hive.convertmetastoreOrc", "true")\ .getorCreate() spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.orc.split.strategy=ETL") spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") from datetime import datetime, timedelta def get_date(dt,time_delta=0): try: result=dt+timedelta(days=-time_delta) except: try: dt = datetime.strptime(dt, "%Y-%m-%d") # 字符串转化为date形式 except: dt = datetime.strptime(dt, '%Y%m%d') # 字符串转化为date形式 result = dt + timedelta(days=-time_delta) return str(result.strftime('%Y-%m-%d')) def insert_tab(df,tab,spark): col_target = spark.sql("""select * from {tab} limit 1""".format(tab=tab)).columns col=df.columns not_in_col=[i for i in col_target if i not in col] for i in not_in_col: df = df.withColumn(i, F.lit(None)) df2=df.select(col_target) df2.repartition('dt','data_type').write.insertInto(tab, overwrite=True) def search_dt(partitions_list,dt): ''' 如果想要取的分区dt在partition_list中,则返回dt,否则返回dt之前最近的一个分区 :param partition_list: 分区List :param dt: 想要取的分区 :return: 函数最终确定的分区dt,字符串格式 ''' dt=get_date(dt,0) if 'ACTIVE' in partitions_list: partitions_list.remove('ACTIVE') if dt in partitions_list: return dt dt_date=datetime.strptime(dt, '%Y-%m-%d').date() partition_list_lag=[(datetime.strptime(p_dt, '%Y-%m-%d').date()-dt_date).days for p_dt in partitions_list] try: reuslt=max(list(filter(lambda x:x<0,partition_list_lag))) except: reuslt=min(list(filter(lambda x:x>0,partition_list_lag))) return datetime.strftime(dt_date+timedelta(reuslt),'%Y-%m-%d') def get_nearest_dt(table_name,dt,spark): #检查是否有dt分区,如果没有,取最近分区 partitions = spark.sql("show partitions %s"%table_name).collect() partitions_list = [] for i in range(len(partitions)): dt_tmp = partitions[i]['partition'] partitions_list.append(dt_tmp[3:]) dt_result=search_dt(partitions_list,dt) return dt_result
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。