如何解决如何使用 RxPy group_by_until() 运算符?
这是我尝试使用响应式编程(使用 RxPy)解决的问题:
我有一个 MIDI 事件流(现场钢琴演奏),每个事件对应一个按下或释放的键。对于给定的音符,第一个事件是“按下”的,而后面的事件始终是“释放”的。当然,可以在释放另一个键之前按下任何键。
我想转换这个事件流,只保留“发布”事件以及“按下”和“释放”之间经过的时间。
因此,我发现 RX 是实现这一目标的完美候选者,我的目标是:
为了简单起见,我没有提到“press”和“release”属性的交替,也没有提到应该添加到结果流中的持续时间信息。
我的第一个赌注是使用 group_by()
运算符,结果是:
到目前为止,我的代码是这样的:
import rx
from rx.operators import filter,group_by
def is_note_message(event):
return event['type']==9 // MIDI "Note On" event
def is_note_pressed(event):
return event['veLocity']>0
def is_note_released(event):
return event['veLocity']==0
(...)
note_obs = midi_events.pipe(
filter( is_note_message ),group_by( lambda evt: evt['note'])
)
问题是我只想对给定笔记的 2 个事件(“新闻”和“发布”)进行分组,因此我想通过使用 group_by_until()
来更进一步。请注意,我在这里做了两个假设,这可能不正确:
- 我认为这样做会使以后处理持续时间计算的过程更容易。
- 我认为
group_by_until()
能够根据某些事件属性“停止”分组。
这就是我被卡住的地方:
- 如果
group_by_until()
为真,我找不到is_note_released()
停止分组的正确语法 - 我不确定如何处理每个分组事件以计算持续时间,然后合并(使用
flat_map()
?)它们(但这可能是另一个问题)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。