如何解决无法通过sasl.jaas.config获取testcontainers Kafka来测试ACL是否正常工作
我正在尝试利用testcontainers在某些自动化单元测试中本地测试Kafka。我在测试授权时遇到问题。
我的目标是测试
(1)如果此测试容器中没有ACL,则不允许KafkaProducer
写入(目前,即使没有正确配置生产者,只要创建了ACL,它都可以发送到主题-我认为将allow.everyone.if.no.acl.found
的kafka env变量设置为false可以解决问题-但似乎并非如此)
(2)测试KafkaProducer
是否使用了正确的sasl.jaas.config
(即,错误的apiKey和pasword),即使设置了ACL,它也被拒绝访问测试主题对于所有校长。
下面是我的代码。我可以让它“工作”,但是测试我无法弄清楚的上述两种情况。我认为我可能实际上并不是在创建ACL,因为在创建ACL之后添加一行({adminClient.describeAcls(AclBindingFilter.ANY).values().get();
时出现No Authorizer is configured on the broker
错误)->查看与this类似的帖子认为这意味着实际上没有创建ACL绑定。
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.serialization.StringSerializer;
String topicName = "this-is-a-topic";
String confluentVersion = "5.5.1";
network = Network.newNetwork();
String jaastemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required %s=\"%s\" %s=\"%s\";";
String jaasConfig = String.format(jaastemplate,"username","apiKey","password","apiPassword");
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + confluentVersion))
.withNetwork(network)
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE","false")
.withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND","false")
.withEnv("KAFKA_SUPER_USERS","User:OnlySuperUser")
.withEnv("KAFKA_SASL_MECHANISM","PLAIN")
.withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM","http")
.withEnv("KAFKA_SASL_JAAS_CONfig",jaasConfig);
kafka.start();
schemaRegistryContainer = new SchemaRegistryContainer(confluentVersion).withKafka(kafka);
schemaRegistryContainer.start();
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,kafka.getBootstrapServers());
properties.put("input.topic.name",topicName);
properties.put("input.topic.partitions","1");
adminClient = KafkaAdminClient.create(properties);
AclBinding ACL = new AclBinding(new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LIteraL),new AccessControlEntry( "User:*","*",AclOperation.WRITE,AclPermissionType.ALLOW));
var acls = adminClient.createAcls(List.of(ACL)).values();
List<NewTopic> topics = new ArrayList<>();
topics.add(
new NewTopic(topicName,Integer.parseInt(properties.getProperty("input.topic.partitions")),Short.parseShort(properties.getProperty("input.topic.replication.factor")))
);
adminClient.createtopics(topics);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class);
props.put("input.topic.name",topicName);
props.put("security.protocol","PLAINTEXT");
props.put("input.topic.partitions","1");
props.put("input.topic.replication.factor","1");
props.put("Metadata.fetch.timeout.ms","10000");
props.put("sasl.jaas.config",jaasConfig);
producer = new KafkaProducer<>(props);
String key = "testContainers";
String value = "AreAwesome";
ProducerRecord<String,String> record = new ProducerRecord<>(
(String) props.get("input.topic.name"),key,value);
try {
RecordMetadata o = (RecordMetadata) producer.send(record).get();
System.out.println(o.toString());
} catch (Exception e) {
e.printstacktrace();
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。