如何解决用于Foundry转换的Python单元测试?
我想对转换为Foundry的转换进行测试,传递测试输入并检查输出是否为预期的输出。是否可以使用伪数据集(存储库中的.csv文件)调用转换?还是应该在转换内部创建要由测试调用的函数(以代码创建的数据)?
解决方法
如果您在Code Repositories
-> Python Transforms
-> Python Unit Tests
下查看平台文档,将会发现很多有用的资源。
您正在寻找有关编写和运行测试的部分。
//开始文档
编写测试
完整文档可在https://docs.pytest.org
中找到Pytest在以test_开头的任何Python文件中查找测试。 建议将所有测试放入项目src目录下的测试包中。 测试只是简单的Python函数,也使用test_前缀命名,并且断言是使用Python的assert语句进行的。 PyTest还将运行使用Python的内置unittest模块编写的测试。 例如,在transforms-python / src / test / test_increment.py中,一个简单的测试将如下所示:
def increment(num):
return num + 1
def test_increment():
assert increment(3) == 5
运行此测试将导致检查失败,并显示以下消息:
============================= test session starts =============================
collected 1 item
test_increment.py F [100%]
================================== FAILURES ===================================
_______________________________ test_increment ________________________________
def test_increment():
> assert increment(3) == 5
E assert 4 == 5
E + where 4 = increment(3)
test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================
使用PySpark测试
PyTest固定装置是一项强大的功能,只需添加相同名称的参数即可将值注入测试功能。此功能用于提供spark_session固定装置,供您在测试功能中使用。例如:
def test_dataframe(spark_session):
df = spark_session.createDataFrame([['a',1],['b',2]],['letter','number'])
assert df.schema.names == ['letter','number']
//结束文档
如果您不想在代码中指定架构,也可以按照How To
-> Read file in Python repository
//开始文档
在Python存储库中读取文件
您可以将其他文件从存储库读取到转换上下文中。这可能在设置参数供转换代码参考时很有用。
首先,请在您的python存储库中编辑setup.py:
setup(
name=os.environ['PKG_NAME'],# ...
package_data={
'': ['*.yaml','*.csv']
}
)
这告诉python将yaml和csv文件捆绑到包中。然后在您的python转换旁边放置一个配置文件(例如config.yaml,但也可以是csv或txt)(例如read_yml.py参见下文):
- name: tbl1
primaryKey:
- col1
- col2
update:
- column: col3
with: 'XXX'
您可以使用以下代码在转换read_yml.py
中阅读它:
from transforms.api import transform_df,Input,Output
from pkg_resources import resource_stream
import yaml
import json
@transform_df(
Output("/Demo/read_yml")
)
def my_compute_function(ctx):
stream = resource_stream(__name__,"config.yaml")
docs = yaml.load(stream)
return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])
所以您的项目结构将是:
- some_folder
- config.yaml
- read_yml.py
这将在您的数据集中输出一行,其中“结果”一列的内容为:
[{"primaryKey": ["col1","col2"],"update": [{"column": "col3","with": "XXX"}],"name": "tbl1"}]
//结束文档
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。