Nest for .NET BulkAllRequest BufferToBulk 发送一个 Dictionary<string, object>

如何解决Nest for .NET BulkAllRequest BufferToBulk 发送一个 Dictionary<string, object>

我有一个围绕 nest 的辅助类来与 Elastic Search 进行交互。此方法采用 BlockingCollection 并用于执行批量操作。

一个 IndexDocument 看起来像:

    public class IndexDocument
    {
        public string ID { get; set; } // ID of the record in ElasticSearch
        public IndexFile File { get; set; } // Underlying file that this record came from
        public string Index { get; set; } // Index that should be posted to
        public string Document { get; set; } // JSON of the actual record
    }

首选批量操作,因为我们可以指定 1 个或多个索引。为了提高性能,我已将所有索引中的所有数据合并到一个操作中。问题是 nest 不喜欢原始 JSON。通常低级客户端就是你想要的。我不想重新发明这种批量操作的轮子,而是想使用它。

public static void Bulk(IEnumerable<IndexDocument> documents)
{
    var request = new BulkAllRequest<IndexDocument>(documents);

    Client.BulkAll(documents,func =>
    {
        return func.Index(null).BufferToBulk((descriptor,buffer) =>
        {
            foreach (var document in buffer)
            {
                descriptor.Index<Dictionary<string,object>>(operation =>
                {
                    var product = JsonConvert.DeserializeObject<Dictionary<string,object>>(document.Document);

                    return operation.Index(document.Index)
                                    .Document(product)
                                    .Id(document.ID);
                });
            }
        })
        .BackOffTime("10s")
        .Size(1)  // if I can't get one to work....
        .RefreshOnCompleted()
        .MaxDegreeOfParallelism(10)
        .BackOffRetries(2);
    }).Wait(TimeSpan.FromMinutes(5),next =>
    {

    });
}

我遇到的问题是如何将 RAW json 转换为 nest 可接受的对象?我的原始 JSON 包括

  • 一组核心字段。 (必填)
  • 一组常用字段。 (可选)
  • 一组属性。 (可选)
  • 一组实验田。 (可选)

这些字段今天表示为 Dictionary。这些数据是在我们的开发组之外管理的,所以我无法修改它。我无法创建一个具体的类,因为这些字段是动态的。我在字典之上创建了一个具体的类,但收到一条错误消息:

Elasticsearch.Net.UnexpectedElasticsearchClientException
  HResult=0x80131500
  Message=GenericArguments[0],'Newtonsoft.Json.Linq.JContainer',on 'Elasticsearch.Net.Utf8Json.Formatters.NonGenericListFormatter`1[T]' violates the constraint of type parameter 'T'.
  Source=nest
  StackTrace:
   at nest.BlockingSubscribeExtensions.WaitOnObservable[TObservable,TObserve,TObserver](TObservable observable,TimeSpan maximumRunTime,Func`3 factory)
   at nest.BlockingSubscribeExtensions.Wait[T](BulkAllObservable`1 observable,Action`1 onNext)
   at Ced.Search.Services.Indexing.Helpers.ElasticHelper.Bulk2(IEnumerable`1 documents) in 
[STACK]
Inner Exception 1:
TypeLoadException: GenericArguments[0],on 'Elasticsearch.Net.Utf8Json.Formatters.NonGenericListFormatter`1[T]' violates the constraint of type parameter 'T'.

我尝试使用 Newtonsoft.JObject,但它发布了一个无效的奇怪值:

[Request]
{"index":{"_id":"cpn_0007473CPN46","_index":"Trade_1"}}
[[[]],[[]],[[[]]]] // WTF IS THIS!!!!

[Response]
{"took":1,"errors":true,"items":[{"index":{"_index":"Trade_1","_type":"_doc","_id":"cpn_0007473CPN46","status":400,"error":{"type":"mapper_parsing_exception","reason":"Failed to parse","caused_by":{"type":"not_x_content_exception","reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}}}]}

这个例外是有道理的。这是垃圾邮件

我还能尝试什么?

解决方法

...我找到了答案。我偶然发现了此处描述的自定义 JSON 序列化程序功能:

https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/custom-serialization.html

问题的根源在于内置的序列化程序不知道如何转换通用字典甚至 JObject。所以我原来的 ElasticClient 看起来像:

private static ElasticClient Client = new ElasticClient(ServiceConfiguration.Instance.ElasticSearch.Uri);

现在看起来像:

private static ElasticClient Client = null;

static ElasticHelper()
{
    var pool = new SingleNodeConnectionPool(ServiceConfiguration.Instance.ElasticSearch.Uri);
    var connectionSettings = new ConnectionSettings(pool,sourceSerializer: (builtin,settings) =>
    {
        return new JsonNetSerializer(builtin,settings,() =>
        {
            return new JsonSerializerSettings { };
        },resolver => resolver.NamingStrategy = new DefaultNamingStrategy());
    });

    Client = new ElasticClient(connectionSettings);
}

现在我的批量操作按预期进行序列化。请务必将 JsonNetSerializer nuget 包添加到您的项目中。

using Nest.JsonNetSerializer;

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?