微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 RxPy group_by_until() 运算符?

如何解决如何使用 RxPy group_by_until() 运算符?

这是我尝试使用响应式编程(使用 RxPy)解决的问题:

我有一个 MIDI 事件流(现场钢琴演奏),每个事件对应一个按下或释放的键。对于给定的音符,第一个事件是“按下”的,而后面的事件始终是“释放”的。当然,可以在释放另一个键之前按下任何键。

我想转换这个事件流,只保留“发布”事件以及“按下”和“释放”之间经过的时间。

因此,我发现 RX 是实现这一目标的完美候选者,我的目标是:

enter image description here

为了简单起见,我没有提到“press”和“release”属性的交替,也没有提到应该添加到结果流中的持续时间信息。

我的第一个赌注是使用 group_by() 运算符,结果是:

enter image description here

到目前为止,我的代码是这样的:

import rx
from rx.operators import filter,group_by

def is_note_message(event):
    return event['type']==9  // MIDI "Note On" event

def is_note_pressed(event):
    return event['veLocity']>0

def is_note_released(event):
    return event['veLocity']==0

(...)

note_obs = midi_events.pipe(
    filter( is_note_message ),group_by( lambda evt: evt['note'])
)

问题是我只想对给定笔记的 2 个事件(“新闻”和“发布”)进行分组,因此我想通过使用 group_by_until() 来更进一步。请注意,我在这里做了两个假设,这可能不正确:

  • 我认为这样做会使以后处理持续时间计算的过程更容易。
  • 我认为 group_by_until() 能够根据某些事件属性“停止”分组。

enter image description here

这就是我被卡住的地方:

  • 如果 group_by_until() 为真,我找不到 is_note_released() 停止分组的正确语法
  • 我不确定如何处理每个分组事件以计算持续时间,然后合并(使用 flat_map() ?)它们(但这可能是另一个问题)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。