如何解决如何处理只需要数据库中符号的“最后”值的高速数据流 C#
我有一个来自供应商的高速股票价格流......每秒可能有 5000 个。 (大约 8000 个不同的符号)
我的数据库中有一个表 (SymbolPrice),需要使用最新的最后价格进行更新。
我似乎无法保持数据库更新足够快以处理最新价格队列。
我使用的是 Azure sql Server 数据库,因此我能够将数据库升级到支持 In-Memory 表的高级版本,并使我的 SymbolPrice 表成为 In-Memory 表......但仍然不够好。
如果它最终跳过价格,这不是问题,只要最近的价格尽可能快地进入那里......所以如果我连续被 10 个炸毁......只有最后一个需要写...这听起来很简单,除了连续 10 个可能与其他符号混合。
因此,我当前的解决方案是使用 ConcurrentDictionary 仅保存最近的价格。并使用符号队列将更新推送到数据库(请参阅下面的代码)...但这仍然不够快。
解决这个问题的一种方法是简单地重复遍历整个字典……并用最新的价格更新数据库……但这有点浪费,因为我也会更新可能每隔几分钟更新一次的值与每秒更新多次的值相同。
关于如何做得更好的任何想法?
谢谢!
-
布莱恩
public ConcurrentDictionary<string,QuoddLastPriceCache.PriceData> _lastPrices = new ConcurrentDictionary<string,QuoddLastPriceCache.PriceData>(); public ConcurrentQueue<string> _lastPriceSymbolsToUpdate = new ConcurrentQueue<string>(); public void Start() { Task.Run(() => { UpdateLastPricesTask(services); }); lastPriceCache.PriceReceived += (symbol,priceData) => { _lastPrices.AddOrUpdate(symbol,priceData,(key,value) => { return priceData; }); _lastPriceSymbolsToUpdate.Enqueue(symbol); }; } private void UpdateLastPricesTask(IServiceProvider services) { _lastPriceUpdatesRunning = true; while (_lastPriceUpdatesRunning) { if (_lastPriceSymbolsToUpdate.TryDequeue(out string symbol)) { if (_lastPrices.TryRemove(symbol,out QuoddLastPriceCache.PriceData priceData)) { // write to database if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow) { if (_lastPriceScope != null) _lastPriceScope.dispose(); _lastPriceScope = services.CreateScope(); } var unitOfWork = _lastPriceScope.ServiceProvider.GetrequiredService<IUnitOfWork>(); unitOfWork.SymbolPrice.UpdateLastPrice(symbol,priceData.Price,priceData.Timestamp); } } else Thread.Sleep(1); } }
解决方法
您需要使用使您能够查询流的东西,SQL 不是最好的工具。搜索复杂事件处理和 Kafka/事件中心 + 流分析。
,我能做的最好的是以下方法......我将最后一个值保存在字典中并添加一个标志是否已写入数据库......然后通过数据并写入更新值到数据库...这样我只更新最近更新的值。效果很好……不过似乎应该有更好的方法。
public void Start()
{
Task.Run(() => { UpdateLastPricesTask(services); });
LastPriceCache.PriceReceived += (symbol,priceData) =>
{
_lastPrices.AddOrUpdate(symbol,priceData,(key,value) => { return priceData; });
};
}
public ConcurrentDictionary<string,PriceData> _lastPrices = new ConcurrentDictionary<string,PriceData>();
public bool _lastPriceUpdatesRunning;
public DateTime _lastScopeCreate = DateTime.MinValue;
public IServiceScope _lastPriceScope = null;
private void UpdateLastPricesTask(IServiceProvider services)
{
_lastPriceUpdatesRunning = true;
while (_lastPriceUpdatesRunning)
{
var processed = 0;
foreach (var symbol in _lastPrices.Keys)
{
if (_lastPrices.TryGetValue(symbol,out QuoddLastPriceCache.PriceData priceData))
{
if (priceData.WrittenToDatabase == false)
{
// create a new scope every 5 minutes
if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
{
if (_lastPriceScope != null)
_lastPriceScope.Dispose();
_lastPriceScope = services.CreateScope();
}
// write to database
var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
unitOfWork.SymbolPrice.UpdateLastPrice(symbol,priceData.Price,priceData.Timestamp);
priceData.WrittenToDatabase = true;
processed++;
}
}
}
if (processed > 0)
Thread.Sleep(1);
else
Thread.Sleep(1000 * 1);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。