如何解决NEST ElasticClient C# 批量插入集合
我正在尝试使用 nesT Elasticclient 库将一组数据批量插入到 Elastic Search 中。
var pool = new SingleNodeConnectionPool(new Uri($"http://localhost:9200"));
var settings = new ConnectionSettings(pool);
var client = new Elasticclient(settings);
var path = @"D:\data.xml";
var serializer = new XmlSerializer(typeof(Entity));
using (TextReader reader = new StreamReader(new FileStream(path,FileMode.Open))) {
var collection = (Entity)serializer.Deserialize(reader);
var elasticSearchEntities = new List<ElasticSearchEntity>();
for (int i = 0; i < collection.Entity.Length; i++)
{
var elasticSearchEntity = new ElasticSearchEntity
{
_index = "entity",_type = "entity",_id = collection.Entity[i].id.ToString(),Entity = collection.Entity[i],};
elasticSearchEntities.Add(elasticSearchEntity);
}
var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
}
我在下面的这一行放置了一个断点,elasticSearchEntities
有对象中的数据。
var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
但运行后,没有创建索引。
如果我使用Index
,它可以工作,但它会一一插入,速度很慢。如果可能,我需要批量插入。
解决方法
批量 API 的输入的构造对于低级客户端看起来不正确。每个批量操作应该由两个对象组成
- 表示要执行的批量操作的对象,例如索引和相关元数据
- 代表文档的对象
看起来问题中的示例将这两者合并为一个对象,这可能会导致错误 - 批量响应将包含更多详细信息。
如评论中所问,您是否有理由特别使用低级客户端?高级客户端中有一个批量 observable 助手,可以帮助索引大量文档,如果这些文档来自文件或数据库等其他来源,这将非常有用。
例如,indexing all questions and answers from Stack Overflow's posts.xml archive
public class Question : Post
{
public string Title { get; set; }
public CompletionField TitleSuggest { get; set; }
public int? AcceptedAnswerId { get; set; }
public int ViewCount { get; set; }
public string LastEditorDisplayName { get; set; }
public List<string> Tags { get; set; }
public int AnswerCount { get; set; }
public int FavoriteCount { get; set; }
public DateTimeOffset? CommunityOwnedDate { get; set; }
public override string Type => nameof(Question);
}
public class Answer : Post
{
public override string Type => nameof(Answer);
}
public class Post
{
public int Id { get; set; }
public JoinField ParentId { get; set; }
public DateTimeOffset CreationDate { get; set; }
public int Score { get; set; }
public string Body { get; set; }
public int? OwnerUserId { get; set; }
public string OwnerDisplayName { get; set; }
public int? LastEditorUserId { get; set; }
public DateTimeOffset? LastEditDate { get; set; }
public DateTimeOffset? LastActivityDate { get; set; }
public int CommentCount { get; set; }
public virtual string Type { get; }
}
void Main()
{
var indexName = "posts";
var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(node)
.RequestTimeout(TimeSpan.FromMinutes(10))
.DefaultMappingFor(new ClrTypeMapping[] {
new ClrTypeMapping(typeof(Post)) { IndexName = indexName },new ClrTypeMapping(typeof(Question)) { IndexName = indexName,RelationName = "question" },new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },})
.OnRequestCompleted(response =>
{
if (response.Success)
Console.WriteLine($"Status: {response.HttpStatusCode}");
else
Console.WriteLine($"Error: {response.DebugInformation}");
});
var client = new ElasticClient(settings);
var characterFilterMappings = CreateCharacterFilterMappings();
if (!client.Indices.Exists(indexName).Exists)
{
var createIndexResponse = client.Indices.Create(indexName,c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
.Analysis(a => a
.CharFilters(cf => cf
.Mapping("programming_language",mca => mca
.Mappings(characterFilterMappings)
)
)
.Analyzers(an => an
.Custom("html",ca => ca
.CharFilters("html_strip","programming_language")
.Tokenizer("standard")
.Filters("standard","lowercase","stop")
)
.Custom("expand",ca => ca
.CharFilters("programming_language")
.Tokenizer("standard")
.Filters("standard","stop")
)
)
)
)
.Map<Post>(u => u
.RoutingField(r => r.Required())
.AutoMap<Question>()
.AutoMap<Answer>()
.SourceField(s => s
.Excludes(new[] { "titleSuggest" })
)
.Properties<Question>(p => p
.Join(j => j
.Name(f => f.ParentId)
.Relations(r => r
.Join<Question,Answer>()
)
)
.Text(s => s
.Name(n => n.Title)
.Analyzer("expand")
.Norms(false)
.Fields(f => f
.Keyword(ss => ss
.Name("raw")
)
)
)
.Keyword(s => s
.Name(n => n.OwnerDisplayName)
)
.Keyword(s => s
.Name(n => n.LastEditorDisplayName)
)
.Keyword(s => s
.Name(n => n.Tags)
)
.Keyword(s => s
.Name(n => n.Type)
)
.Text(s => s
.Name(n => n.Body)
.Analyzer("html")
.SearchAnalyzer("expand")
)
.Completion(co => co
.Name(n => n.TitleSuggest)
)
)
)
);
if (!createIndexResponse.IsValid)
Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
}
var seenPages = 0;
var handle = new ManualResetEvent(false);
var size = 1000;
var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(),f => f
.MaxDegreeOfParallelism(16)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(size)
.BufferToBulk((bulk,posts) =>
{
foreach (var post in posts)
{
if (post is Question question)
{
var item = new BulkIndexOperation<Question>(question);
bulk.AddOperation(item);
}
else
{
var answer = (Answer)post;
var item = new BulkIndexOperation<Answer>(answer);
bulk.AddOperation(item);
}
}
})
.RefreshOnCompleted()
.Index(indexName)
);
ExceptionDispatchInfo exception = null;
var bulkObserver = new BulkAllObserver(
onError: e =>
{
exception = ExceptionDispatchInfo.Capture(e);
handle.Set();
},onCompleted: () => handle.Set(),onNext: b =>
{
Interlocked.Increment(ref seenPages);
Console.WriteLine($"indexed {seenPages} pages");
}
);
observableBulk.Subscribe(bulkObserver);
handle.WaitOne();
if (exception != null)
exception.Throw();
}
public IEnumerable<Post> GetQuestionsAndAnswers()
{
using (var stream = File.OpenRead(@"stackoverflow_data\Posts.xml"))
using (var reader = XmlReader.Create(stream))
{
reader.ReadToDescendant("posts");
reader.ReadToDescendant("row");
do
{
var item = (XElement)XNode.ReadFrom(reader);
var id = int.Parse(item.Attribute("Id").Value);
var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
var score = int.Parse(item.Attribute("Score").Value);
var body = item.Attribute("Body")?.Value;
var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
var commentCount = int.Parse(item.Attribute("CommentCount").Value);
var ownerUserId = item.Attribute("OwnerUserId") != null
? int.Parse(item.Attribute("OwnerUserId").Value)
: (int?)null;
var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
var lastEditorUserId = item.Attribute("LastEditorUserId") != null
? int.Parse(item.Attribute("LastEditorUserId").Value)
: (int?)null;
var lastEditDate = item.Attribute("LastEditDate") != null
? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
: (DateTimeOffset?)null;
var lastActivityDate = item.Attribute("LastActivityDate") != null
? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
: (DateTimeOffset?)null;
switch (postTypeId)
{
case 1:
var title = item.Attribute("Title")?.Value;
var question = new Question
{
Id = id,ParentId = JoinField.Root<Question>(),AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
? int.Parse(item.Attribute("AcceptedAnswerId").Value)
: (int?)null,CreationDate = creationDate,Score = score,ViewCount = int.Parse(item.Attribute("ViewCount").Value),Body = body,OwnerUserId = ownerUserId,OwnerDisplayName = ownerDisplayName,LastEditorUserId = lastEditorUserId,LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,LastEditDate = lastEditDate,LastActivityDate = lastActivityDate,Title = title,TitleSuggest = new CompletionField
{
Input = new[] { title },Weight = score < 0 ? 0 : score
},Tags = item.Attribute("Tags") != null
? item.Attribute("Tags").Value.Replace("<",string.Empty)
.Split(new[] { ">" },StringSplitOptions.RemoveEmptyEntries)
.ToList()
: null,AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),CommentCount = commentCount,FavoriteCount = item.Attribute("FavoriteCount") != null
? int.Parse(item.Attribute("FavoriteCount").Value)
: 0,CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
: (DateTimeOffset?)null
};
yield return question;
break;
case 2:
var answer = new Answer
{
Id = id,ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),};
yield return answer;
break;
}
}
while (reader.ReadToNextSibling("row"));
}
}
/*
* Simple char filter mappings to transform common
* programming languages in symbols to words
* e.g. c# => csharp,C++ => cplusplus
*/
private IList<string> CreateCharacterFilterMappings()
{
var mappings = new List<string>();
foreach (var c in new[] { "c","f","m","j","s","a","k","t" })
{
mappings.Add($"{c}# => {c}sharp");
mappings.Add($"{c.ToUpper()}# => {c}sharp");
}
foreach (var c in new[] { "g","c","d" })
{
mappings.Add($"{c}++ => {c}plusplus");
mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
}
return mappings;
}
IEnumerable<Post> GetQuestionsAndAnswers()
从大的 posts.xml 文件(如果我记得的话,大约 50GB)中产生问题和答案,将这些提供给 BulkAll
,它一次最多同时发出 16 个批量请求到 Elasticsearch,其中每个批量请求索引 1000 个文档。 See this GitHub repository for a more comprehensive example。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。