微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

用于 .NET 的 GraphQL - 设置订阅

如何解决用于 .NET 的 GraphQL - 设置订阅

我一直在关注使用订阅支持设置 GraphQL .NET 项目的示例:https://github.com/graphql-dotnet/server/tree/develop/samples/Samples.Schemas.Chat

我似乎已经按照 ChatSchema 中示例中概述的方式设置了所有内容,但我仍然无法使其正常工作。我从服务器得到的错误是:

Cannot subscribe as no result stream available

这是我在 UI Playground 中收到的错误,如果我向 /graphql 端点发送订阅查询

{
  "error": {
    "name": "FormatedError","message": "UnkNown error","originalError": {
      "data": null,"extensions": {
        "tracing": {
          "version": 1,"startTime": "2021-06-18T13:10:21.6759947Z","endTime": "2021-06-18T13:10:21.6769947Z","duration": 546000,"parsing": {
            "startOffset": 3200,"duration": 87300
          },"validation": {
            "startOffset": 92500,"duration": 386800
          },"execution": {
            "resolvers": []
          }
        }
      }
    }
  }
}

这些是我在本地运行的 GraphQL .NET 项目中看到的日志:

[23:10:21 DBG] Handle start: 1
[23:10:21 DBG] Executing operation: order_created query: subscription order_created($eventId: ID!) {
  order_subscription {
    order_created(eventId: $eventId) {
      eventId
      eventTimestamputc
    }
  }
}
[23:10:21 ERR] Cannot subscribe as no result stream available
[23:10:21 DBG] Handling message: 1 of type: stop
[23:10:21 DBG] Received message: Type: stop Id: 1 Payload: 
[23:10:21 DBG] Handle stop: 1
[23:10:21 DBG] Subscription: 1 unsubscribed

我的 Startup.cs 看起来像这样:

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSingleton<OrderingPlatformSchema>();

            services.AddSingleton<IOrderEventStreamService,OrderEventStreamService>();

            services.AddSingleton<IDocumentExecuter,SubscriptionDocumentExecuter>();

            services.AddGraphQL((options,provider) =>
            {
                var logger = provider.GetrequiredService<ILogger<Startup>>();
                
                options.UnhandledExceptionDelegate = ctx => logger.LogError("{Error} occurred",ctx.OriginalException.Message);
            })
            .AddErrorInfoProvider((options) =>
            {
                options.ExposeExtensions = false;
                options.ExposeExceptionStackTrace = true;
            })
            .AddDefaultEndpointSelectorPolicy()
            .AddSystemTextJson(deserializerSettings => { },serializerSettings => { })
            .AddWebSockets()
            .AddDataLoader()
            .AddGraphTypes(typeof(OrderingPlatformSchema));
        }

        public void Configure(IApplicationBuilder app,IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
                app.UseDeveloperExceptionPage();

            app.UseWebSockets();

            app.UseGraphQLWebSockets<OrderingPlatformSchema>();
            app.UseGraphQL<OrderingPlatformSchema,GraphQLCustomLoggingMiddleware<OrderingPlatformSchema>>();

            if (env.IsDevelopment())
            {
                app.UseGraphQLPlayground(new PlaygroundOptions
                {
                    BetaUpdates = true,RequestCredentials = RequestCredentials.Omit,HideTracingResponse = false,EditorCursorShape = EditorCursorShape.Line,EditorTheme = EditorTheme.Light,EditorFontSize = 14,EditorReuseHeaders = true,EditorFontFamily = "Consolas",PrettierPrintWidth = 80,PrettierTabWidth = 2,PrettierUseTabs = true,SchemadisableComments = false,SchemaPollingEnabled = true,SchemaPollingEndpointFilter = "*localhost*",SchemaPollingInterval = 5000
                });
            }
        }

我的 Schema 对象如下所示:

    public class OrderingPlatformSchema : Schema
    {
        public OrderingPlatformSchema(IServiceProvider provider) : base(provider)
        {
            Query = new OrderingPlatformQuery();
            Subscription = new OrderingPlatformSubscription();
        }
    }

OrderingPlatformSubscription 对象:

    public class OrderingPlatformSubscription : ObjectGraphType<object>
    {
        public OrderingPlatformSubscription()
        {
            Name = nameof(OrderingPlatformSubscription);

            Field<OrderSubscription>("order_subscription",resolve: context => new {});
        }
    }

OrderSubscription 对象:

    internal class OrderSubscription : ObjectGraphType<object>
    {
        private readonly IOrderEventStreamService orderEventStreamService;

        public OrderSubscription(IOrderEventStreamService orderEventStreamService)
        {
            this.orderEventStreamService = orderEventStreamService;

            Name = nameof(OrderSubscription);

            AddField(new EventStreamFieldType
            {
                Name = "order_created",Arguments = new QueryArguments(
                    new QueryArgument<NonNullGraphType<IdGraphType>> { Name = "eventId" }
                ),Type = typeof(OrderCreatedGraphType),Resolver = new FuncFieldResolver<OrderCreatedEvent>(context => context.source as OrderCreatedEvent),Subscriber = new EventStreamResolver<OrderCreatedEvent>(SubscribeByEventId)
            });
        }

        private IObservable<OrderCreatedEvent> SubscribeByEventId(IResolveEventStreamContext context)
        {
            var eventId = context.GetArgument<Guid>("eventId");

            var events = orderEventStreamService.OrderCreatedEvents();

            return events.Where(x => x.EventId == eventId);
        }
    }

最后,OrderEventStreamService 看起来像这样:

    internal class OrderEventStreamService : IOrderEventStreamService
    {
        private readonly ISubject<OrderCreatedEvent> orderCreatedEventStream = new ReplaySubject<OrderCreatedEvent>(1);

        public IObservable<OrderCreatedEvent> OrderCreatedEvents()
        {
            return orderCreatedEventStream.AsObservable();
        }
    }

非常感谢任何帮助/建议。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。