微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

测试用例完成后打开遥测 InMemorySpanExporter 未重置

如何解决测试用例完成后打开遥测 InMemorySpanExporter 未重置

打开遥测 InMemorySpanExporter 未在测试类完成后重置。这导致第二个测试类的 span_list = self.memory_exporter.get_finished_spans() 为空。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from contextlib import contextmanager
import unittest

tracer = trace.get_tracer(__name__)


@contextmanager
def method(name):
    with tracer.start_as_current_span(name) as span:
        try:
            yield span
        except Exception as error:
            span.record_exception(error)
            raise
          
          
class OpenTelemetryBase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.tracer_provider = TracerProvider()
        cls.memory_exporter = InMemorySpanExporter()
        cls.span_processor = SimpleSpanProcessor(cls.memory_exporter)
        cls.tracer_provider.add_span_processor(cls.span_processor)
        trace.set_tracer_provider(cls.tracer_provider)

    def tearDown(self):
        self.memory_exporter.clear()

class TestTracing(OpenTelemetryBase):
    def test_method_1(self):
        with self.assertRaises(Exception):
            with method("TestTracing1") as span:
                raise Exception("Failed trace 1")
        span_list = self.memory_exporter.get_finished_spans()
        self.assertEqual(len(span_list),1)
        self.assertEqual(span_list[0].status.description,"Exception: Failed trace 1")
        self.assertEqual(span_list[0].name,"TestTracing1")


class TestTracingB(OpenTelemetryBase):
    def test_method_2(self):
        with self.assertRaises(Exception):
            with method("TestTracingB1") as span:
                raise Exception("Failed traceB 1")
        span_list = self.memory_exporter.get_finished_spans()
        self.assertEqual(len(span_list),"Exception: Failed traceB 1")
        self.assertEqual(span_list[0].name,"TestTracingB1")

仅运行测试类 1 的命令:py.test tracing.py::TestTracing:成功执行。

仅运行测试类 2 的命令:py.test tracing.py::TestTracingB:成功执行。

但是同时运行两个测试类的命令:py.test tracing.py

输出tracing.py .F 第一次成功,第二次测试失败。

Failed tracing.py::TestTracingB::test_method_2 - AssertionError: 0 != 1

即使我在每次测试中都使用了 tera down 的 memory_exporter.clear()

对于 setUpClass 方法,如果我将其更改为 setup,则同一类中的多个测试将开始失败,只有第一个通过其余测试将失败。

解决方法

您的描述不清楚,但我可以分享您遇到 AssertionError 的原因。发生这种情况是因为一旦设置了全局跟踪器提供程序,我们就不允许设置它; link to code which does that。只能有一个全局跟踪器提供程序。因此,当在第二次测试中调用 trace.set_tracer_provider 时,它会在不执行任何操作的情况下记录警告,因此您第二次尝试设置管道失败,即第二个导出器从未收到跨度。

,

Srikanth Chekuri 击败了我:这里的基本问题是对 set_tracer_provider 的第二次调用没有做任何事情,并且您最终使用了与第一次设置调用相同的导出器。

不幸的是,跟踪器提供程序是全局的,只配置一次,并且库没有提供一种简单的方法来交换提供程序或跨处理器以进行此类测试。

就像暴力产生暴力一样,全局变量产生全局变量。解决此问题的一种(丑陋)方法是使您的测试导出器也成为全局对象,并且仅在您初始化测试时将其附加到跟踪提供程序。这意味着 OpenTelemetryBase 测试的所有实例将共享同一个导出器,因此您需要在拆卸时清除已完成的跨度,并且测试可能无法并行运行。

from contextlib import contextmanager
import logging
import unittest

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter


logger = logging.getLogger(__name__)

# Only here for demonstration reasons
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

# Typically called somewhere else. If this isn't called anywhere we get the
# default ProxyTracerProvider provider,which doesn't support adding span
# processors. This is usually only the case if the OT SDK isn't installed.
trace.set_tracer_provider(TracerProvider())

_TEST_OT_EXPORTER = None
_TEST_OT_PROVIDER_INITIALIZED = False

def get_test_ot_exporter():
    global _TEST_OT_EXPORTER

    if _TEST_OT_EXPORTER is None:
        _TEST_OT_EXPORTER = InMemorySpanExporter()
    return _TEST_OT_EXPORTER

def use_test_ot_exporter():
    global _TEST_OT_PROVIDER_INITIALIZED

    if _TEST_OT_PROVIDER_INITIALIZED:
        logger.info("Skipping repeated call to use_test_ot_exporter")
        return

    provider = trace.get_tracer_provider()
    if not hasattr(provider,'add_span_processor'):
        logger.warn("OT TracerProvider has no add_span_processor. Is the OT "
                    "SDK installed?")
        return
    provider.add_span_processor(SimpleSpanProcessor(get_test_ot_exporter()))
    _TEST_OT_PROVIDER_INITIALIZED = True


@contextmanager
def method(name):
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span(name) as span:
        try:
            yield span
        except Exception as error:
            span.record_exception(error)
            raise


class OpenTelemetryBase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        use_test_ot_exporter()
        cls.ot_exporter = get_test_ot_exporter()

    def tearDown(self):
        self.ot_exporter.clear()


class TestTracing(OpenTelemetryBase):
    def test_method_1(self):
        with self.assertRaises(Exception):
            with method("TestTracing1") as span:
                raise Exception("Failed trace 1")

        span_list = self.ot_exporter.get_finished_spans()
        self.assertEqual(len(span_list),1)
        self.assertEqual(
            span_list[0].status.description,"Exception: Failed trace 1"
        )
        self.assertEqual(span_list[0].name,"TestTracing1")


class TestTracingB(OpenTelemetryBase):
    def test_method_2(self):
        with self.assertRaises(Exception):
            with method("TestTracingB1") as span:
                raise Exception("Failed traceB 1")

        span_list = self.ot_exporter.get_finished_spans()
        self.assertEqual(len(span_list),"Exception: Failed traceB 1"
        )
        self.assertEqual(span_list[0].name,"TestTracingB1")

在 OT python 库添加更好的方法来临时替换跟踪器提供程序,或者临时将跨度处理器或导出器添加到全局提供程序之前,这可能是执行此操作的侵入性最小的方法。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。