如何解决Apache Beam ReadFromSpanner 解码问题
我正在尝试在 GCP 数据流管道中运行以下脚本。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import NamedTuple,Optional
from apache_beam.io.gcp.spanner import *
from past.builtins import unicode
import logging
class ItemRow(NamedTuple):
item_id: unicode
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self,element):
logging.info("row: %s",element)
yield element
class SpannerToSpannerAndBigQueryPipelineOptions(PipelineOptions):
"""
Runtime Parameters given during template execution
path parameter is necessary for execution of pipeline
"""
@classmethod
def _add_argparse_args(cls,parser):
parser.add_argument(
'--SOURCE_SPANNER_PROJECT_ID',type=str,help='Source Spanner project ID',default='project_id')
parser.add_argument(
'--SOURCE_SPANNER_DATASET_ID',help='Source Spanner dataset ID',default='dataset_id')
parser.add_argument(
'--SOURCE_SPANNER_INSTANCE_ID',help='Source Spanner instance ID',default='instance_id')
parser.add_argument(
'--SOURCE_QUERY',help='SQL to run in Source Spanner Instance',required=True)
# Setup pipeline
def run():
beam.coders.registry.register_coder(ItemRow,beam.coders.RowCoder)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
importer_options = pipeline_options.view_as(
SpannerToSpannerAndBigQueryPipelineOptions)
rows = (
p
| "Read from source Spanner" >> ReadFromSpanner(
project_id=importer_options.SOURCE_SPANNER_PROJECT_ID,instance_id=importer_options.SOURCE_SPANNER_INSTANCE_ID,database_id=importer_options.SOURCE_SPANNER_DATASET_ID,row_type=ItemRow,sql='Select item_id from Items WHERE created_ts BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 5 SECOND) AND CURRENT_TIMESTAMP()',timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,staleness=3,time_unit=TimeUnit.HOURS,).with_output_types(ItemRow)
)
rows | 'Log results' >> beam.ParDo(LogResults())
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
run()
但是,我在解码从 Spanner 获得的结果时遇到了问题。这些是我的 Dataflow 作业的输出日志:
"An exception was raised when trying to execute the workitem 6665479626992209510 : Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",line 649,in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",line 179,in execute
op.start()
File "dataflow_worker/native_operations.py",line 38,in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py",line 39,line 44,line 48,in dataflow_worker.native_operations.NativeReadOperation.start
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/inmemory.py",line 108,in __iter__
yield self._source.coder.decode(value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py",line 468,in decode
return self.get_impl().decode(encoded)
File "apache_beam/coders/coder_impl.py",line 226,in apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam/coders/coder_impl.py",line 228,line 123,in apache_beam.coders.coder_impl.CoderImpl.decode_from_stream
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py",line 215,in decode_from_stream
is_null in zip(self.components,nulls)))
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py",in <genexpr>
is_null in zip(self.components,nulls)))
File "apache_beam/coders/coder_impl.py",line 259,in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
File "apache_beam/coders/coder_impl.py",line 261,in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py",line 414,in decode
return value.decode('utf-8')
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 9: invalid start byte
"
我不确定如何解决这个问题。我使用这个 https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.gcp.spanner.html?highlight=spanner#module-apache_beam.io.gcp.spanner 示例作为起点。问题似乎在于解码从 Spanner 获得的结果。几乎没有关于如何为我尝试查询的 Spanner 表指定架构的文档。
还有一个用于 Spanner 的实验性 IO 模块,它不使用 Java 扩展模块。是否建议切换到实验版?
谢谢
解决方法
我无法使用 apache_beam.io.gcp.spanner
模块运行管道,因此我最终使用了 apache_beam.io.gcp.experimental.spannerio
模块。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。