微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法通过sasl.jaas.config获取testcontainers Kafka来测试ACL是否正常工作

如何解决无法通过sasl.jaas.config获取testcontainers Kafka来测试ACL是否正常工作

我正在尝试利用testcontainers在某些自动化单元测试中本地测试Kafka。我在测试授权时遇到问题。

我的目标是测试

(1)如果此测试容器中没有ACL,则不允许KafkaProducer写入(目前,即使没有正确配置生产者,只要创建了ACL,它都可以发送到主题-我认为将allow.everyone.if.no.acl.found的kafka env变量设置为false可以解决问题-但似乎并非如此)

(2)测试KafkaProducer是否使用了正确的sasl.jaas.config(即,错误的apiKey和pasword),即使设置了ACL,它也被拒绝访问测试主题对于所有校长。

下面是我的代码。我可以让它“工作”,但是测试我无法弄清楚的上述两种情况。我认为我可能实际上并不是在创建ACL,因为在创建ACL之后添加一行({adminClient.describeAcls(AclBindingFilter.ANY).values().get();时出现No Authorizer is configured on the broker错误)->查看与this类似的帖子认为这意味着实际上没有创建ACL绑定。

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.serialization.StringSerializer;

        String topicName = "this-is-a-topic";
        String confluentVersion = "5.5.1";
        network = Network.newNetwork();
        String jaastemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required %s=\"%s\" %s=\"%s\";";
        String jaasConfig = String.format(jaastemplate,"username","apiKey","password","apiPassword");
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + confluentVersion))
                .withNetwork(network)
                .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE","false")
                .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND","false")
                .withEnv("KAFKA_SUPER_USERS","User:OnlySuperUser")
                .withEnv("KAFKA_SASL_MECHANISM","PLAIN")
                .withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM","http")
                .withEnv("KAFKA_SASL_JAAS_CONfig",jaasConfig);

        kafka.start();
        schemaRegistryContainer = new SchemaRegistryContainer(confluentVersion).withKafka(kafka);
        schemaRegistryContainer.start();

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,kafka.getBootstrapServers());
        properties.put("input.topic.name",topicName);
        properties.put("input.topic.partitions","1");
        adminClient = KafkaAdminClient.create(properties);
        AclBinding ACL = new AclBinding(new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LIteraL),new AccessControlEntry( "User:*","*",AclOperation.WRITE,AclPermissionType.ALLOW));
        var acls = adminClient.createAcls(List.of(ACL)).values();


        List<NewTopic> topics = new ArrayList<>();
        topics.add(
                new NewTopic(topicName,Integer.parseInt(properties.getProperty("input.topic.partitions")),Short.parseShort(properties.getProperty("input.topic.replication.factor")))
        );
        adminClient.createtopics(topics);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class);

        props.put("input.topic.name",topicName);
        props.put("security.protocol","PLAINTEXT");
        props.put("input.topic.partitions","1");
        props.put("input.topic.replication.factor","1");
        props.put("Metadata.fetch.timeout.ms","10000");
        props.put("sasl.jaas.config",jaasConfig);

        producer = new KafkaProducer<>(props);

        String key = "testContainers";
        String value = "AreAwesome";
        ProducerRecord<String,String> record = new ProducerRecord<>(
                        (String) props.get("input.topic.name"),key,value);
        try {
             RecordMetadata o = (RecordMetadata) producer.send(record).get();
             System.out.println(o.toString());
        } catch (Exception e) {
             e.printstacktrace();
        }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。