如何解决如何为 BTC 汇集 PairCreated 事件?
我有一个用于获取新 eth 对的循环代码。我使用了 web3.py 库。它看起来像:
import json
import time
import pymysql
import asyncio
from web3 import Web3
infura_url = "https://mainnet.infura.io/v3/..."
web3 = Web3(Web3.HTTPProvider(infura_url))
uniswap_factory = '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f'
uniswap_factory_abi = json.loads('[{"inputs":...]')
contract = web3.eth.contract(address=uniswap_factory,abi=uniswap_factory_abi)
def handle_event(event):
json_object = json.loads(Web3.toJSON(event))
token = json_object['args']['pair']
token0 = json_object['args']['token0']
token1 = json_object['args']['token1']
abi = '[{"constant":true,...]'
Contract = web3.eth.contract(address=json_object['args']['token0'],abi=abi)
if Contract.functions.name().call() == "Wrapped Ether":
Contract = web3.eth.contract(address=json_object['args']['token1'],abi=abi)
print(token)
async def log_loop(event_filter,poll_interval):
while True:
for PairCreated in event_filter.get_new_entries():
handle_event(PairCreated)
await asyncio.sleep(poll_interval)
def main():
event_filter = contract.events.PairCreated.createFilter(fromBlock='latest')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
asyncio.gather(
log_loop(event_filter,2)))
finally:
loop.close()
if __name__ == "__main__":
main()
我可以使用此代码获取最新的对并将其添加到 mysql 数据库中。在这里它完美无缺,但现在我只想用 btc 做同样的事情。有没有可以制作的python库?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。