如何解决从JSON数据加载BigQuery时无法捕获异常
{
"magic": "atMSG","type": "DT","headers": null,"messageschemaid": null,"messageschema": null,"message": {
"data": {
"ID": "123456","NAME": "ABCXYZ","SALARY": "10"
},"beforeData": null,"headers": {
"operation": "INSERT","changeSequence": "20200822230048000000000017887787417","timestamp": "2020-08-22T23:00:48.000","transactionId": "some_id"
}
}
}
{
"magic": "atMSG","message": {
"data": {
"ID": "QWERTYT","transactionId": "some_id"
}
}
}
我有以下代码,用于将数据从JSON解析为可加载到BigQuery的字典类型。但是,如果我的JSON数据有任何问题,我想捕获它们并将它们加载到其他BQ_Table / File。
class custom_json_parser(beam.DoFn):
def process(self,element):
norm = json_normalize(element,max_level=1)
l = norm["message.data"]
return l
class Audit(beam.DoFn):
def process(self,element):
yield element
table_schema = 'ID:INTEGER,NAME:STRING,SALARY:FLOAT'
audit_table_schema = 'ID:INTEGER,SALARY:FLOAT'
p = beam.Pipeline(options = pipeline_options)
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/file_with_json_data")
| "PARSE JSON" >> beam.Map(json.loads)
| "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
)
Load_col = data_from_source
Audit_col = (data_from_source
| beam.Map(lambda x: (x['ID'],x['NAME'],datetime.Now()))
| beam.Map(lambda x:{'ID':x[0],'NAME':x[1],'LAST_UPDT':x[2]})
)
try:
#Load_col |"WritetoBigQuery" >> beam.ParDo(Printer())
load_data = (Load_col
|"WriteDataToBigQuery" >> beam.io.WritetoBigQuery(
"{0}:{1}.trial_data".format(PROJECT_ID,datasetId_Staging),schema=table_schema,#write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED)
)
audit_data = (Audit_col
|"WriteAuditDataToBigQuery" >> beam.io.WritetoBigQuery(
"{0}:{1}.audit_data".format(PROJECT_ID,schema=audit_table_schema,write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,#write_disposition=beam.io.BigQuerydisposition.WRITE_TruncATE,create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED)
)
except:
(load_data[beam.io.gcp.bigquery.BigQueryWriteFn.Failed_ROWS]
| "Bad lines" >> beam.io.textio.WritetoText("gs://ti-project-1/error-files/data_error_log.txt")
)
results = p.run()
在Except块中,我基本上想将错误的数据(ID”:“ QWERTYT”,“ NAME”:“ ABCXYZ”,“ SALARY”:“ 10”)写入文件,我也想将其写入BigQuery
我还希望使用文件名将错误的数据写入审核表。但这是另一个问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。