如何解决比 elasticsearch SQL Server 慢
我比较了 sql Server 和 ElasticSearch 的速度。我有 150 万条数据。但是 sql 查询比弹性搜索运行得更快。我不明白这个问题。为什么像查询这样的词在 sql 中完成得更快?
public static List<Sales> GetAllRecords(string itemType)
{
List<Sales> salesReports = new List<Sales>();
string sqlQuery = String.Format(@"SELECT * FROM dbo.Sales where Region like '%{0}%'",itemType);
using (sqlConnection connection = new sqlConnection(CONNECTION_STRING))
{
var result = connection.Query<Sales>(sqlQuery);
foreach (var item in result)
{
Sales global = new Sales()
{
Region = item.Region,Country = item.Country,Item_Type=item.Item_Type,Order_Date=item.Order_Date,Order_ID = item.Order_ID,Order_Priority=item.Order_Priority,Sales_Channel=item.Sales_Channel,Ship_Date = item.Ship_Date,Total_Cost=item.Total_Cost,Total_Profit=item.Total_Profit,Total_Revenue=item.Total_Revenue,Units_Sold=item.Units_Sold,Unit_Cost=item.Unit_Cost,Unit_Price = item.Unit_Price
};
salesReports.Add(global);
}
return result.ToList();
}
}
我的 ElasticSearch 代码。我在这里用 elasticsearch 搜索我之前索引的数据。
public static List<Sales> ConfigureES(string inputText)
{
List<Sales> salesReports = new List<Sales>();
// 1. Connection URL's elastic search
var listofUrls = new Uri[]
{
// here we can set multple connectionn URL's...
new Uri("http://localhost:9200/")
};
StaticConnectionPool connPool = new StaticConnectionPool(listofUrls);
ConnectionSettings connSett = new ConnectionSettings(connPool);
Elasticclient eClient = new Elasticclient(connSett);
// var see = eClient.DeleteIndex(INDEX_NAME);
// check the connection health
var checkClusterHealth = eClient.ClusterHealth();
if (checkClusterHealth.ApiCall.Success && checkClusterHealth.IsValid)
{
// 2. check the index exist or not
var checkResult = eClient.IndexExists(INDEX_NAME);
if (!checkResult.Exists)
{
// Raise error to Index not avaliable
}
// Search particular text field
var searchResponse = eClient.Search<Sales>(s =>
s.Index(INDEX_NAME).From(0).Size(5000).Scroll("10m")
.Query(q => q.Match(m => m.Field(f => f.Region).Query(inputText))));
//var results = eClient.Scroll<Salesreport>("10m",searchResponse.ScrollId);
while (searchResponse.Documents.Any())
{
var res = searchResponse.Documents;
var sds = res.Cast<Sales>();
salesReports.AddRange(sds);
searchResponse = eClient.Scroll<Sales>("10m",searchResponse.ScrollId);
}
}
else
{
// fail log the exception further use
var exception = checkClusterHealth.OriginalException.ToString();
var debugException = checkClusterHealth.Debuginformation.ToString();
}
return salesReports;
}
我将数据索引到elasticsearch。
public static string CONNECTION_STRING = string.Empty;
public static string INDEX_NAME = "elastic";
public static string INDEX_TYPE = "report4";
private static Elasticclient eClient;
static void Main(string[] args)
{
try
{
// read the config file ...
var configuration = new ConfigurationBuilder()
.SetBasePath(@"C:\Users\Celal\Desktop\ElasticSearch-master\ElasticSearch-master\ElasticSearchBGPJob\ElasticSearchBGPJob\ElasticSearchBGPJob")
.AddJsonFile("appsettings.json",false)
.Build();
CONNECTION_STRING = configuration.GetSection("DefaultConnection").Value;
if (string.IsNullOrEmpty(CONNECTION_STRING))
throw new ArgumentException("No connection string in appsettings.json");
// 1. Connection URL's elastic search
var listofUrls =
// here we can set multple connectionn URL's...
new Uri("http://localhost:9200/");
ConnectionSettings connSett = new ConnectionSettings(listofUrls);
eClient = new Elasticclient(connSett);
// var see = eClient.DeleteIndex(INDEX_NAME);
var createIndexDescriptor = new CreateIndexDescriptor(INDEX_NAME).Mappings(ms => ms.Map<Sales>(m => m.AutoMap()));
// check the connection health
var checkClusterHealth = eClient.ClusterHealth();
if (checkClusterHealth.ApiCall.Success && checkClusterHealth.IsValid)
{
// 2. check the index exist or not
var checkResult = eClient.IndexExists(INDEX_NAME);
if (!checkResult.Exists)
{
var createIndexResponse = eClient.CreateIndex(createIndexDescriptor);
if (createIndexResponse.ApiCall.Success && createIndexResponse.IsValid)
{
// index is created successfully....
}
else
{
// fail log the exception further use
var exception = createIndexResponse.OriginalException.ToString();
var debugException = createIndexResponse.Debuginformation.ToString();
}
}
// 3. get the last documet id of index
var lastRecordResponse = eClient.Search<Sales>(s => s
.Index(INDEX_NAME)
.Type(INDEX_TYPE)
.From(0)
.Size(1).sort(sr => sr.Descending(f => f.Order_ID)));
if (lastRecordResponse.ApiCall.Success && lastRecordResponse.IsValid)
{
Console.WriteLine("Start " + DateTime.Now);
long salesRecordId = 0;
var listofrecords = new List<Sales>();
if (lastRecordResponse.Documents.Count >= 1)
{
var obj = lastRecordResponse.Documents;
foreach (var item in obj)
{
salesRecordId = item.Order_ID;
}
listofrecords = GetAllRecords(salesRecordId);
}
else
{
listofrecords = GetAllRecords(salesRecordId);
}
Console.WriteLine("END " + DateTime.Now);
// Insert the data into document format corresponding index...
if (listofrecords.Count > 0)
{
Console.WriteLine("===== START========= " + DateTime.Now);
BulkInsertData(listofrecords,eClient).Wait();
Console.WriteLine("===== END========= " + DateTime.Now);
}
}
else
{
// fail log the exception further use
var exception = lastRecordResponse.OriginalException.ToString();
var debugException = lastRecordResponse.Debuginformation.ToString();
}
}
else
{
// fail log the exception further use
var exception = checkClusterHealth.OriginalException.ToString();
var debugException = checkClusterHealth.Debuginformation.ToString();
}
Console.WriteLine("Hello World!");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
Console.ReadLine();
}
}
public static List<Sales> GetAllRecords(long LastSalesId)
{
List<Sales> salesReports = new List<Sales>();
string sqlQuery = String.Format(@"SELECT * FROM dbo.Sales where Order_ID > {0} ",LastSalesId);
using (sqlConnection connection = new sqlConnection(CONNECTION_STRING))
{
connection.open();
using (sqlCommand command = new sqlCommand(sqlQuery,connection))
{
command.CommandTimeout = 1000;
using (sqlDataReader dataReader = command.ExecuteReader())
{
if (dataReader.HasRows)
{
while (dataReader.Read())
{
Sales global = new Sales()
{
Order_ID=Convert.ToInt32(dataReader["Order_ID"]),Region=Convert.ToString(dataReader["Region"]),Country = Convert.ToString(dataReader["Country"]),Total_Cost = (decimal)Convert.Todouble(dataReader["Total_Cost"]),Total_Revenue = Convert.ToString(dataReader["Total_Revenue"]),Item_Type = Convert.ToString(dataReader["Item_Type"])
};
salesReports.Add(global);
}
}
}
}
connection.Close();
}
return salesReports;
}
static async Task BulkInsertData(List<Sales> listofData,Elasticclient Eclient)
{
try
{
var splitTheLargeList = ChunkBy(listofData);
var test = splitTheLargeList.LastOrDefault();
foreach (var item in splitTheLargeList)
{
var bulkResponse = await Eclient.BulkAsync(b => b
.Index(INDEX_NAME)
// .Type(INDEX_TYPE)
.IndexMany(item));
if (bulkResponse.ApiCall.Success && bulkResponse.IsValid)
{
// success fully inserted...
}
else
{
// fail log the exception further use
var exception = bulkResponse.OriginalException.ToString();
var debugException = bulkResponse.Debuginformation.ToString();
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.InnerException.ToString());
}
}
public static List<List<T>> ChunkBy<T>(List<T> source,int chunkSize = 1000)
{
return source
.Select((x,i) => new { Index = i,Value = x })
.GroupBy(x => x.Index / chunkSize)
.Select(x => x.Select(v => v.Value).ToList())
.ToList();
}
public static IEnumerable<List<T>> SplitList<T>(List<T> listofData,int listSize = 1000)
{
for (int i = 0; i < listofData.Count; i += listSize)
{
yield return listofData.GetRange(i,Math.Min(listSize,listofData.Count - i));
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。