如何解决我如何在 C# 和 CouchDB 中观察新数据?
我创建了一个函数,它从数据库(CouchDB)中获取新到达的数据,然后利用这些数据进行分析。一切正常,但现在我正在努力如何在没有更多数据的情况下保持该功能的活动。
我现在得到了什么:
public async Task<FsResponseModel.Root> AnalyzeImage()
{
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("Accept","application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));
HttpResponseMessage httpResponse = await client.GetAsync("CouchDBUrl/_changes?Feed=continuous&filter=_view&view=list/images&include_docs=true&attachments=true");
httpResponse.EnsureSuccessstatusCode();
string json = await httpResponse.Content.ReadAsstringAsync();
//Somewhere to keep the parsed results
var results = new List<ViewResponseModel>();
Root root = null;
var serializer = new JsonSerializer();
using (var stringReader = new StringReader(json))
using (var jsonReader = new JsonTextReader(stringReader))
{
//Make sure the JsonReader kNows we have multiple documents
jsonReader.SupportMultipleContent = true;
while (jsonReader.Read())
{
//Read in the next document
var nextObject = JObject.ReadFrom(jsonReader);
//Determine if we are on the last item or not
if (nextObject["last_seq"] != null)
{
root = nextObject.ToObject<Root>();
}
else
{
results.Add(nextObject.ToObject<ViewResponseModel>());
}
}
}
if (results.Count >= 0)
{
while (results.Count >= 0)
{
string base64Image = results[0].doc._attachments.image.data;
string refNr = results[0].doc.creator;
string token = "myToken";
var httpWebRequest = (HttpWebRequest)WebRequest.Create("MyApi");
httpWebRequest.PreAuthenticate = true;
httpWebRequest.Headers.Add("Authorization","Bearer " + token);
httpWebRequest.ContentType = "application/json";
httpWebRequest.Accept = "application/json";
httpWebRequest.Method = "POST";
var obj = new FsRequestModel() { };
obj.Image = base64Image;
obj.Application = "MyApp";
obj.RefNr = "hzd." + refNr + results[0].doc._id;
string imageJson = JsonConvert.SerializeObject(obj);
using (var streamWriter = new StreamWriter(httpWebRequest.GetRequestStream()))
{
streamWriter.Write(imageJson);
}
var httpResponseApi = (HttpWebResponse)httpWebRequest.GetResponse();
var result = "";
using (var streamReader = new StreamReader(httpResponseApi.GetResponseStream()))
{
result = streamReader.ReadToEnd();
}
FsResponseModel.Root response = JsonConvert.DeserializeObject<FsResponseModel.Root>(result);
return response;
}
}
else
{
//w8 for new data
return null;
}
return null;
}
我需要一个解决方案来告诉代码它必须等待新数据,如果新数据可用,则重新开始该过程。
编辑:
我现在正在尝试处理传入的提要。任何想法如何消费这个?
public async Task ObserveDbAndTrigger()
{
var url = "http://localhost:5984/mydb/_changes?Feed=continuous&filter=_view&view=MyView&include_docs=true&attachments=true&heartbeat=1000";
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
client.DefaultRequestHeaders.Add("Accept","application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));
string line = "";
var request = new HttpRequestMessage(HttpMethod.Get,url);
using (var response = await client.SendAsync(
request,HttpCompletionoption.ResponseHeadersRead))
{
using (var body = await response.Content.ReadAsstreamAsync())
using (var reader = new StreamReader(body))
while (!reader.EndOfStream)
line = reader.ReadLine();
if (line != "")
{
//get data and work with it
}
}
}
}
我有流,但我不知道如何处理传入的线路。 任何帮助都会很棒。
编辑 2:
Stream 无法工作,我已经对其进行了测试,但它无法识别新行...
我的下一个想法是尝试使用事件源而不是连续提要。
对这个想法有什么建议吗?
解决方法
我像这样改变了它,现在我可以处理数据了。
static void Main()
{
string db = "myDb";
string auth = "Basic " + Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(String.Join(":","user","password")));
string query = db + "/_changes?feed=eventsource&include_docs=true&attachments=true&since=0&heartbeat=5000";
ChangesPublisher pub = new ChangesPublisher();
pub.OnChange += (sender,e) => Console.WriteLine(e.Value);
pub.OnException += (sender,e) => Console.WriteLine(e.Value.ToString() + "\r\n\r\nPress a key to exit.");
// start publishing.
Task.Run(async () =>
{
await pub.Begin("localhost",5984,query,auth);
});
//Stop running
Console.ReadKey();
pub.Stop = true;
}
public class ChangesPublisher
{
public bool Stop { get; set; }
public class ChangeEvent : EventArgs
{
public string Value { get; set; }
public ChangeEvent(string value)
{
Value = value;
}
}
public class ExceptionEvent : EventArgs
{
public Exception Value { get; set; }
public ExceptionEvent(Exception value)
{
Value = value;
}
}
public event EventHandler<ChangeEvent> OnChange = delegate { };
public event EventHandler<ExceptionEvent> OnException = delegate { };
public async Task Begin(string serverAddr,int port,string query,string auth)
{
using (var client = new TcpClient())
{
string request = String.Join("\r\n",new List<string> {
String.Format("GET /{0} HTTP/1.1",query),"Authorization: " + auth,"Accept: application/json","Host: " + serverAddr,"Connection: keep-alive","\r\n"
});
try
{
await client.ConnectAsync(serverAddr,port);
using (NetworkStream stream = client.GetStream())
{
StreamWriter writer = new StreamWriter(stream);
await writer.WriteAsync(request);
await writer.FlushAsync();
StreamReader reader = new StreamReader(stream);
while (!Stop)
{
string data = await reader.ReadLineAsync();
OnChange(this,new ChangeEvent(data));
//work with data
}
}
}
catch (Exception e)
{
OnException(this,new ExceptionEvent(e));
}
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。