微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从JSON数据加载BigQuery时无法捕获异常

如何解决从JSON数据加载BigQuery时无法捕获异常

我有一个文件,其后是JSON数据

{
    "magic": "atMSG","type": "DT","headers": null,"messageschemaid": null,"messageschema": null,"message": {
        "data": {
            "ID": "123456","NAME": "ABCXYZ","SALARY": "10"
        },"beforeData": null,"headers": {
            "operation": "INSERT","changeSequence": "20200822230048000000000017887787417","timestamp": "2020-08-22T23:00:48.000","transactionId": "some_id"
        }
    }
}

{
    "magic": "atMSG","message": {
        "data": {
            "ID": "QWERTYT","transactionId": "some_id"
        }
    }
}

我有以下代码,用于将数据从JSON解析为可加载到BigQuery的字典类型。但是,如果我的JSON数据有任何问题,我想捕获它们并将它们加载到其他BQ_Table / File。

class custom_json_parser(beam.DoFn):
    def process(self,element):
        norm = json_normalize(element,max_level=1)
        l = norm["message.data"]
        return l

class Audit(beam.DoFn):
    
    def process(self,element):
        yield element
table_schema = 'ID:INTEGER,NAME:STRING,SALARY:FLOAT'
audit_table_schema = 'ID:INTEGER,SALARY:FLOAT'

p = beam.Pipeline(options = pipeline_options)

data_from_source = (p
                    | "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/file_with_json_data")
                    | "PARSE JSON" >> beam.Map(json.loads)
                    | "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
                   )

Load_col = data_from_source
Audit_col = (data_from_source 
             | beam.Map(lambda x: (x['ID'],x['NAME'],datetime.Now())) 
            | beam.Map(lambda x:{'ID':x[0],'NAME':x[1],'LAST_UPDT':x[2]})
           )
try:
    #Load_col |"WritetoBigQuery" >> beam.ParDo(Printer())
    load_data = (Load_col 
                 |"WriteDataToBigQuery" >> beam.io.WritetoBigQuery(
                     "{0}:{1}.trial_data".format(PROJECT_ID,datasetId_Staging),schema=table_schema,#write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED)
                )
    
    audit_data = (Audit_col 
                  |"WriteAuditDataToBigQuery" >> beam.io.WritetoBigQuery(
                      "{0}:{1}.audit_data".format(PROJECT_ID,schema=audit_table_schema,write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,#write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED)
                 )
except:
    (load_data[beam.io.gcp.bigquery.BigQueryWriteFn.Failed_ROWS]
     | "Bad lines" >> beam.io.textio.WritetoText("gs://ti-project-1/error-files/data_error_log.txt")
    )

results = p.run()

在Except块中,我基本上想将错误的数据(ID”:“ QWERTYT”,“ NAME”:“ ABCXYZ”,“ SALARY”:“ 10”)写入文件,我也想将其写入BigQuery

我还希望使用文件名将错误的数据写入审核表。但这是另一个问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。