NEST ElasticClient C# 批量插入集合

如何解决NEST ElasticClient C# 批量插入集合

我正在尝试使用 nesT Elasticclient 库将一组数据批量插入到 Elastic Search 中。

    var pool = new SingleNodeConnectionPool(new Uri($"http://localhost:9200"));
    var settings = new ConnectionSettings(pool);

    var client = new Elasticclient(settings);

    var path = @"D:\data.xml";

    var serializer = new XmlSerializer(typeof(Entity));

    using (TextReader reader = new StreamReader(new FileStream(path,FileMode.Open))) {
        var collection = (Entity)serializer.Deserialize(reader);             

        var elasticSearchEntities = new List<ElasticSearchEntity>();
        
        for (int i = 0; i < collection.Entity.Length; i++)
        {
            var elasticSearchEntity = new ElasticSearchEntity
            {
                _index = "entity",_type = "entity",_id = collection.Entity[i].id.ToString(),Entity = collection.Entity[i],};
            elasticSearchEntities.Add(elasticSearchEntity);
     
        }
        var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
    }

我在下面的这一行放置了一个断点,elasticSearchEntities 有对象中的数据。

var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));

但运行后,没有创建索引。

如果我使用Index,它可以工作,但它会一一插入,速度很慢。如果可能,我需要批量插入。

解决方法

批量 API 的输入的构造对于低级客户端看起来不正确。每个批量操作应该由两个对象组成

  1. 表示要执行的批量操作的对象,例如索引和相关元数据
  2. 代表文档的对象

看起来问题中的示例将这两者合并为一个对象,这可能会导致错误 - 批量响应将包含更多详细信息。

如评论中所问,您是否有理由特别使用低级客户端?高级客户端中有一个批量 observable 助手,可以帮助索引大量文档,如果这些文档来自文件或数据库等其他来源,这将非常有用。

例如,indexing all questions and answers from Stack Overflow's posts.xml archive

public class Question : Post
{
    public string Title { get; set; }

    public CompletionField TitleSuggest { get; set; }

    public int? AcceptedAnswerId { get; set; }

    public int ViewCount { get; set; }

    public string LastEditorDisplayName { get; set; }

    public List<string> Tags { get; set; }

    public int AnswerCount { get; set; }

    public int FavoriteCount { get; set; }

    public DateTimeOffset? CommunityOwnedDate { get; set; }

    public override string Type => nameof(Question);
}

public class Answer : Post
{
    public override string Type => nameof(Answer);
}

public class Post
{
    public int Id { get; set; }

    public JoinField ParentId { get; set; }

    public DateTimeOffset CreationDate { get; set; }

    public int Score { get; set; }

    public string Body { get; set; }

    public int? OwnerUserId { get; set; }

    public string OwnerDisplayName { get; set; }

    public int? LastEditorUserId { get; set; }

    public DateTimeOffset? LastEditDate { get; set; }

    public DateTimeOffset? LastActivityDate { get; set; }

    public int CommentCount { get; set; }

    public virtual string Type { get; }
}

void Main()
{
    var indexName = "posts";
    var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var settings = new ConnectionSettings(node)
        .RequestTimeout(TimeSpan.FromMinutes(10))
        .DefaultMappingFor(new ClrTypeMapping[] {
            new ClrTypeMapping(typeof(Post)) { IndexName = indexName },new ClrTypeMapping(typeof(Question)) { IndexName = indexName,RelationName = "question" },new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },})
        .OnRequestCompleted(response =>
        {        
            if (response.Success)
                Console.WriteLine($"Status: {response.HttpStatusCode}");
            else
                Console.WriteLine($"Error: {response.DebugInformation}");
        });


    var client = new ElasticClient(settings);  
    var characterFilterMappings = CreateCharacterFilterMappings();

    if (!client.Indices.Exists(indexName).Exists)
    {
        var createIndexResponse = client.Indices.Create(indexName,c => c
            .Settings(s => s
                .NumberOfShards(3)
                .NumberOfReplicas(0)
                .Analysis(a => a
                    .CharFilters(cf => cf
                        .Mapping("programming_language",mca => mca
                            .Mappings(characterFilterMappings)
                        )
                    )
                    .Analyzers(an => an
                        .Custom("html",ca => ca
                            .CharFilters("html_strip","programming_language")
                            .Tokenizer("standard")
                            .Filters("standard","lowercase","stop")
                        )
                        .Custom("expand",ca => ca
                            .CharFilters("programming_language")
                            .Tokenizer("standard")
                            .Filters("standard","stop")
                        )
                    )
                )
            )
            .Map<Post>(u => u
                .RoutingField(r => r.Required())
                .AutoMap<Question>()
                .AutoMap<Answer>()
                .SourceField(s => s
                    .Excludes(new[] { "titleSuggest" })
                )
                .Properties<Question>(p => p
                    .Join(j => j
                        .Name(f => f.ParentId)
                        .Relations(r => r
                            .Join<Question,Answer>()
                        )
                    )
                    .Text(s => s
                        .Name(n => n.Title)
                        .Analyzer("expand")
                        .Norms(false)
                        .Fields(f => f
                            .Keyword(ss => ss
                                .Name("raw")
                            )
                        )
                    )
                    .Keyword(s => s
                        .Name(n => n.OwnerDisplayName)
                    )
                    .Keyword(s => s
                        .Name(n => n.LastEditorDisplayName)
                    )
                    .Keyword(s => s
                        .Name(n => n.Tags)
                    )
                    .Keyword(s => s
                        .Name(n => n.Type)
                    )
                    .Text(s => s
                        .Name(n => n.Body)
                        .Analyzer("html")
                        .SearchAnalyzer("expand")
                    )
                    .Completion(co => co
                        .Name(n => n.TitleSuggest)
                    )
                )
            )
        );

        if (!createIndexResponse.IsValid)
            Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
    }
    
    var seenPages = 0;
    var handle = new ManualResetEvent(false);
    var size = 1000;

    var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(),f => f
        .MaxDegreeOfParallelism(16)
        .BackOffTime(TimeSpan.FromSeconds(10))
        .BackOffRetries(2)
        .Size(size)
        .BufferToBulk((bulk,posts) =>
        {
            foreach (var post in posts)
            {
                if (post is Question question)
                {
                    var item = new BulkIndexOperation<Question>(question);
                    bulk.AddOperation(item);
                }
                else
                {
                    var answer = (Answer)post;
                    var item = new BulkIndexOperation<Answer>(answer);
                    bulk.AddOperation(item);
                }
            }
        })
        .RefreshOnCompleted()
        .Index(indexName)
    );

    ExceptionDispatchInfo exception = null;

    var bulkObserver = new BulkAllObserver(
        onError: e => 
        { 
            exception = ExceptionDispatchInfo.Capture(e);
            handle.Set();
        },onCompleted: () => handle.Set(),onNext: b =>
        {
            Interlocked.Increment(ref seenPages);
            Console.WriteLine($"indexed {seenPages} pages");
        }
    );
    
    observableBulk.Subscribe(bulkObserver);   
    handle.WaitOne();
    
    if (exception != null) 
        exception.Throw();
}

public IEnumerable<Post> GetQuestionsAndAnswers()
{
    using (var stream = File.OpenRead(@"stackoverflow_data\Posts.xml"))
    using (var reader = XmlReader.Create(stream))
    {
        reader.ReadToDescendant("posts");
        reader.ReadToDescendant("row");
        do
        {
            var item = (XElement)XNode.ReadFrom(reader);
            var id = int.Parse(item.Attribute("Id").Value);
            var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
            var score = int.Parse(item.Attribute("Score").Value);
            var body = item.Attribute("Body")?.Value;
            var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
            var commentCount = int.Parse(item.Attribute("CommentCount").Value);
            var ownerUserId = item.Attribute("OwnerUserId") != null
                ? int.Parse(item.Attribute("OwnerUserId").Value)
                : (int?)null;
            var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
            var lastEditorUserId = item.Attribute("LastEditorUserId") != null
                ? int.Parse(item.Attribute("LastEditorUserId").Value)
                : (int?)null;
            var lastEditDate = item.Attribute("LastEditDate") != null
                ? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
                : (DateTimeOffset?)null;
            var lastActivityDate = item.Attribute("LastActivityDate") != null
                ? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
                : (DateTimeOffset?)null;
                
            switch (postTypeId)
            {
                case 1:
                    var title = item.Attribute("Title")?.Value;
                    
                    var question = new Question
                    {
                        Id = id,ParentId = JoinField.Root<Question>(),AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
                            ? int.Parse(item.Attribute("AcceptedAnswerId").Value)
                            : (int?)null,CreationDate = creationDate,Score = score,ViewCount = int.Parse(item.Attribute("ViewCount").Value),Body = body,OwnerUserId = ownerUserId,OwnerDisplayName = ownerDisplayName,LastEditorUserId = lastEditorUserId,LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,LastEditDate = lastEditDate,LastActivityDate = lastActivityDate,Title = title,TitleSuggest = new CompletionField
                        {
                            Input = new[] { title },Weight = score < 0 ? 0 : score
                        },Tags = item.Attribute("Tags") != null
                            ? item.Attribute("Tags").Value.Replace("<",string.Empty)
                                .Split(new[] { ">" },StringSplitOptions.RemoveEmptyEntries)
                                .ToList()
                            : null,AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),CommentCount = commentCount,FavoriteCount = item.Attribute("FavoriteCount") != null
                            ? int.Parse(item.Attribute("FavoriteCount").Value)
                            : 0,CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
                            ? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
                            : (DateTimeOffset?)null
                    };
                    
                    yield return question;
                    break;
                case 2:
                    var answer = new Answer
                    {
                        Id = id,ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),};
                    
                    yield return answer;
                    break;
            }
        }
        while (reader.ReadToNextSibling("row"));
    }
}

/* 
* Simple char filter mappings to transform common
* programming languages in symbols to words
* e.g. c# => csharp,C++ => cplusplus
*/
private IList<string> CreateCharacterFilterMappings()
{
    var mappings = new List<string>();
    foreach (var c in new[] { "c","f","m","j","s","a","k","t" })
    {
        mappings.Add($"{c}# => {c}sharp");
        mappings.Add($"{c.ToUpper()}# => {c}sharp");
    }

    foreach (var c in new[] { "g","c","d" })
    {
        mappings.Add($"{c}++ => {c}plusplus");
        mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
    }
    
    return mappings;
}

IEnumerable<Post> GetQuestionsAndAnswers() 从大的 posts.xml 文件(如果我记得的话,大约 50GB)中产生问题和答案,将这些提供给 BulkAll,它一次最多同时发出 16 个批量请求到 Elasticsearch,其中每个批量请求索引 1000 个文档。 See this GitHub repository for a more comprehensive example

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?