微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

部署 MDB 以在 Wildfly 上侦听 Kafka 错误:WFLYEJB0383:在资源适配器中找不到类型 fish.pay....KafkaListener 的消息侦听器

如何解决部署 MDB 以在 Wildfly 上侦听 Kafka 错误:WFLYEJB0383:在资源适配器中找不到类型 fish.pay....KafkaListener 的消息侦听器

我正在尝试使用 [JCA 资源适配器][1] 来使用 MDB 连接到 kafka。 以下是 standalone-full.xml 中用于配置 kafka resoruce adapter 和相关 ejb-mdb 定义的条目:

请注意,我已将 mdb 部署为嵌入在 ejbModule 中的 ear 的一部分

在部署应用程序时出现以下错误

 22:43:53,073 ERROR [org.jboss.msc.service.fail] (MSC service thread 1-1) MSC000001: Failed to start service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: org.jboss.msc.service.StartException in service jboss.deployment.subunit."kafkatemplatereceiver.ear"."kafkatemplatereceiverEJB.jar".component.KafkaMDB.CREATE: Failed to start service
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$startTask.execute(ServiceControllerImpl.java:1731)
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$ControllerTask.run(ServiceControllerImpl.java:1559)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1990)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486)
        at org.jboss.threads@2.4.0.Final//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1363)
        at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka
        at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createActivationSpecs(MessageDrivenComponentCreateService.java:142)
        at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.createComponent(MessageDrivenComponentCreateService.java:105)
        at org.jboss.as.ee@21.0.2.Final//org.jboss.as.ee.component.BasicComponentCreateService.start(BasicComponentCreateService.java:86)
        at org.jboss.as.ejb3@21.0.2.Final//org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentCreateService.start(MessageDrivenComponentCreateService.java:93)
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$startTask.startService(ServiceControllerImpl.java:1739)
        at org.jboss.msc@1.4.12.Final//org.jboss.msc.service.ServiceControllerImpl$startTask.execute(ServiceControllerImpl.java:1701)
        ... 6 more
    
    22:43:53,076 ERROR [org.jboss.as.controller.management-operation] (External Management Request Threads -- 1) WFLYCTL0013: Operation ("add") Failed - address: ([("deployment" => "kafkatemplatereceiver.ear")]) - failure description: {"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
        Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}
    22:43:53,077 ERROR [org.jboss.as.server] (External Management Request Threads -- 1) WFLYSRV0021: Deploy of deployment "kafkatemplatereceiver.ear" was rolled back with the following failure message: 
    {"WFLYCTL0080: Failed services" => {"jboss.deployment.subunit.\"kafkatemplatereceiver.ear\".\"kafkatemplatereceiverEJB.jar\".component.KafkaMDB.CREATE" => "Failed to start service
        Caused by: java.lang.IllegalStateException: WFLYEJB0383: No message listener of type fish.payara.cloud.connectors.kafka.api.KafkaListener found in resource adapter kafka"}}

**standalone-full.xml**
 

     <subsystem xmlns="urn:jboss:domain:ejb3:8.0">
                <session-bean>
                    <stateless>
                        <bean-instance-pool-ref pool-name="slsb-strict-max-pool"/>
                    </stateless>
                    <stateful default-access-timeout="5000" cache-ref="simple" passivation-disabled-cache-ref="simple"/>
                    <singleton default-access-timeout="5000"/>
                </session-bean>
                <mdb>
        <!--<resource-adapter-ref resource-adapter-name="${ejb.resource-adapter-name:kafka-rar-0.6.0.rar}"/>-->
                    <resource-adapter-ref resource-adapter-name="kafka"/>
                    <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
                </mdb> ...

standalone-full.xml 资源适配器配置

<subsystem xmlns="urn:jboss:domain:resource-adapters:6.0">
        <resource-adapters>
            <resource-adapter id="kafka">
                <archive>
                    kafka-rar-0.6.0.rar
                </archive>
                <transaction-support>XATransaction</transaction-support>
                <connection-deFinitions>
                    <connection-deFinition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                        <xa-pool>
                            <min-pool-size>1</min-pool-size>
                            <initial-pool-size>1</initial-pool-size>
                            <max-pool-size>20</max-pool-size>
                            <prefill>false</prefill>
                            <is-same-rm-override>false</is-same-rm-override>
                        </xa-pool>
                    </connection-deFinition>
                </connection-deFinitions>
            </resource-adapter>
        </resource-adapters>
    </subsystem>

MDB 类:

package com.zhun.euon.service;

import java.util.Date;
import javax.ejb.MessageDriven;
import javax.ejb.ActivationConfigProperty;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jboss.ejb3.annotation.ResourceAdapter;

//import com.zhun.euon.service.serializer.Stringo;

import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;

@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "clientId",propertyValue = "testClient"),@ActivationConfigProperty(propertyName = "groupIdConfig",propertyValue = "test-consumer-group"),@ActivationConfigProperty(propertyName = "topics",propertyValue = "zhun_core_data_service_topic"),@ActivationConfigProperty(propertyName = "bootstrapServersConfig",propertyValue = "192.168.0.105:9092"),@ActivationConfigProperty(propertyName = "autoCommitInterval",propertyValue = "100"),@ActivationConfigProperty(propertyName = "retryBackoff",propertyValue = "1000"),@ActivationConfigProperty(propertyName = "pollInterval",//  @ActivationConfigProperty(propertyName="resourceAdapterName",propertyValue="kafka-rar-0.6.0.rar"),//  @ActivationConfigProperty(propertyName="resourceAdapterMid",@ActivationConfigProperty(propertyName = "keyDeserializer",propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),@ActivationConfigProperty(propertyName = "valueDeserializer",})
@ResourceAdapter(value="kafka")
public class KafkaMDB implements KafkaListener {
     
    @OnRecord( topics={"zhun_core_data_service_topic"})
    public void getMessageTest(ConsumerRecord<String,String> record) {
        System.out.println("Got record on topic testing " + record + "at time"+ new Date());
    }
} 
 

我不确定我是否遗漏了什么,但是我无法弄清楚为什么服务器会在 rar 文件本身中而不是在我的 EJB- Jar 中寻找侦听器的接口实现与 rth [1]:https://github.com/payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR

解决方法

我遇到了类似的问题。最终解决这个问题的是在 webapp/WEB-INF/ 下添加一个 jboss-deployment-structure.xml 文件。这指向了与 JBoss 服务器一起部署的 Kafka RAR:

<jboss-deployment-structure>
    <deployment>
        <dependencies>
            <module name="deployment.kafka-rar-0.6.1-SNAPSHOT.rar" export="TRUE"/>
        </dependencies>
    </deployment>
</jboss-deployment-structure>

这是我遵循的教程 -> http://www.mastertheboss.com/other/jboss-stuff/apache-kafka/getting-started-with-apache-kafka-and-wildfly

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?