import lombok.extern.slf4j.Slf4j; import org.web3j.abi.EventEncoder; import org.web3j.abi.TypeReference; import org.web3j.abi.datatypes.Address; import org.web3j.abi.datatypes.Event; import org.web3j.abi.datatypes.Uint; import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameter; import org.web3j.protocol.core.DefaultBlockParameterName; import org.web3j.protocol.core.methods.request.EthFilter; import org.web3j.protocol.core.methods.response.EthBlock; import org.web3j.protocol.http.HttpService; import org.web3j.utils.Convert; import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * 区块链监听事件 */ @Slf4j public class Monitor { /** * 连接web3 节点 */ private final static Web3j web3j = Web3j.build(new HttpService("https://bsc-dataseed.binance.org/")); /*启动监听, 从startBlock区块开始监听token转账事件 代币监听会出现的问题: 如果启动区块距离当前区块稍远,非常可能的情况是中间出现的交易太多,监视代码内部出现空指针异常。 如果监听启动时接近当前区块问题出现概率小。 */ public static void startTransferListen_Token(BigInteger startBlock) { System.out.println("------------------------------要监听的合约事件"); // 要监听的合约事件 Event event = new Event("Transfer", Arrays.asList( new TypeReference<Address>() {}, new TypeReference<Address>() {}, new TypeReference<Uint>(){}) ); //过滤器 System.out.println("-------------------------进入过滤器"); //监听的合约地址,同时监听两个智能合约 List<String> list=new ArrayList<>(); list.add(""); list.add(""); EthFilter filter = new EthFilter( DefaultBlockParameter.valueOf(startBlock), DefaultBlockParameterName.LATEST, list); filter.addSingletopic(EventEncoder.encode(event)); System.out.println("-------------------------结束过滤器"); //注册监听,解析日志中的事件 AtomicInteger block_TokenSub = new AtomicInteger(startBlock.intValue()); System.out.println("-------------------------注册监听"); web3j.ethlogobservable(filter).subscribe(log -> { System.out.println("-------------------------进入监听"); block_TokenSub.set(log.getBlockNumber().intValue()); String token = log.getAddress(); //这是Token合约地址 String txHash = log.getTransactionHash(); List<String> topics = log.getTopics(); // 提取转账记录 String fromAddress = "0x"+topics.get(1).substring(26); String toAddress = "0x"+topics.get(2).substring(26); System.out.println(" ---token ="+token+", txHash ="+txHash+",fromAddress="+fromAddress+",toAddress="+toAddress); //检查发送地址、接收地址是否属于系统用户, 不是系统用户就不予处理 String usdtAddress=""; String bkAddress=""; if(usdtAddress.equalsIgnoreCase(fromAddress) ||usdtAddress.equalsIgnoreCase(toAddress)||bkAddress.equalsIgnoreCase(fromAddress) || bkAddress.equalsIgnoreCase(toAddress)
) { String value1 = log.getData(); BigInteger big = new BigInteger(value1.substring(2), 16); BigDecimal value = Convert.fromWei(big.toString(), Convert.Unit.ETHER); // System.out.println("value="+value); String timestamp = ""; try { EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(log.getBlockNumber()), false).send(); timestamp = String.valueOf(ethBlock.getBlock().getTimestamp()); } catch (IOException e) { System.out.println("Block timestamp get failure,block number is {}" + log.getBlockNumber()); System.out.println("Block timestamp get failure,{}"+ e.getMessage()); } //执行关键的回调函数,处理内部逻辑 callBack_Token(token,txHash,fromAddress,toAddress,value,timestamp); } }, error->{ System.out.println(" ### tokenSubscription error= "+ error); error.printstacktrace(); }); // System.out.println("tokenSubscription ="+tokenSubscription); // System.out.println(tokenSubscription.isUnsubscribed()); } public static void callBack_Token(String token, String txHash, String from, String to, BigDecimal value, String timestamp) { log.info("----callBack_Token:"); log.info("交易hash---------------------"+txHash); System.out.println("----callBack_Token:"); System.out.println(" token = "+token); //同时监听两个智能合约,通过token来判断是哪个合约的交易 System.out.println(" txHash = "+txHash); System.out.println(" from = "+from); System.out.println(" to = "+to); System.out.println(" value = "+value.doubleValue()); } //启动监听以太坊上的交易 public static void startTransactionListen_ETH() { //监听当前区块以后的交易 web3j.transactionObservable().subscribe(tx -> { //更新检查过的区块高度 int block_EthSub = tx.getBlockNumber().intValue(); System.out.println(" ---transactionFlowable block_EthSub = "+block_EthSub); String txHash = tx.getHash(); System.out.println("----------------------------txHssh"+txHash); String fromAddress = tx.getFrom(); System.out.println("--------------------------------fAddress"+fromAddress); String toAddress = tx.getTo(); System.out.println("--------------------------------tAddress"+toAddress); if("".equals(fromAddress) || "".equals(toAddress)) { //发现了指定地址上的交易 System.out.println("---------------------------------有该地址的交易信息了"); BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER); String timestamp = ""; try { EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send(); timestamp = String.valueOf(ethBlock.getBlock().getTimestamp()); } catch (IOException e) { System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber()); System.out.println("Block timestamp get failure,{}"+ e.getMessage()); } // 监听以太坊上是否有系统生成地址的交易 callBack_ETH(txHash,fromAddress,toAddress,value,timestamp); } }, error->{ System.out.println(" ### transactionFlowable error= "+ error); error.printstacktrace(); }); } public static void callBack_ETH(String token, String from, String to, BigDecimal value, String timestamp) { System.out.println("----callBack_Token:"); System.out.println(" token = "+token); System.out.println(" from = "+from); System.out.println(" to = "+to); System.out.println(" value = "+value.doubleValue()); } }
注意:如果未翻墙或未转发调用可能会失败,可以部署到国外服务器测试,转发后有可能会监听不上或中断,部署服务器启动监听未失败过(连续三次)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。