微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 AWS athena 查询和 Boto3 创建数据框

如何解决使用 AWS athena 查询和 Boto3 创建数据框

我正在使用 AWS Athena 从 S3 查询原始数据。由于 Athena 将查询输出写入 S3 输出存储桶,因此我使用 Lambda 函数将雅典娜查询的结果数据获取到数据帧中:

我的代码

def athena_query_to_dataframe(db,s3Bucket,query):
    
    import boto3
    import pandas as pd
    
    client = boto3.client('athena')
    listofStatus = ['SUCCEEDED','Failed','CANCELLED']
    listofInitialStatus = ['RUNNING','QUEUED']
    
    print('Starting Query Execution:')
    
    temps3Path = 's3://{}'.format(s3Bucket)
    
    response = client.start_query_execution(
        QueryString = query,QueryExecutionContext = {
            'Database': db
        },ResultConfiguration = {
            'OutputLocation': temps3Path,}
    )

    queryExecutionId = response['QueryExecutionId']

    status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']

    while status in listofInitialStatus:
        status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
        if status in listofStatus:
            if status == 'SUCCEEDED':
                print('Query Succeeded!')
                paginator = client.get_paginator('get_query_results')
                query_results = paginator.paginate(
                    QueryExecutionId = queryExecutionId,PaginationConfig = {'PageSize': 1000}
                )
            elif status == 'Failed':
                print('Query Failed!')
            elif status == 'CANCELLED':
                print('Query Cancelled!')
            break
    
    results = []
    rows = []
    
    print('Processing Response')
    
    for page in query_results:
        for row in page['ResultSet']['Rows']:
            rows.append(row['Data'])

    columns = rows[0]
    rows = rows[1:]

    columns_list = []
    for column in columns:
        columns_list.append(column['VarCharValue'])
        
    print('Creating Dataframe')

    dataframe = pd.DataFrame(columns = columns_list)

    for row in rows:
        df_row = []
        try:
            for data in row:
                df_row.append(data['VarCharValue'])
            dataframe.loc[len(dataframe)] = df_row
        except:
            pass

当我尝试返回 df.shape 时 我只得到 (0,20) 这意味着 df 没有被行更新。

我正在寻找以下输出

  1. 修复上述问题以填充行。
  2. 如果有更好的方法获取数据框

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。