如何解决如何使用 SASL_PLAIN 验证 kafka
使用 segmentio/kafka-go 实现 kafka 消费者生产者。 https://github.com/segmentio/kafka-go
- 创建主题
func CreateTopicname() {
fmt.Println("start ")
mechanism := plain.Mechanism{
Username: "****",Password: "***",}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,DualStack: true,SASLMechanism: mechanism,}
_,err = dialer.DialContext(context.Background(),"tcp","localhost:9093")
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("created successfully")
}
- 主题列表
func ListofTopics() {
mechanism := plain.Mechanism{
Username: "***",Password: "****",}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,}
conn,err := dialer.DialContext(context.Background(),"localhost:9092")
if err != nil {
fmt.Println(err.Error())
}
defer conn.Close()
_,err = conn.ReadPartitions()
if err != nil {
fmt.Println(err.Error())
}
fmt.Println(" end of list")
}
在 Kafka/config 目录中创建 /kafka_server_jaas.conf 并添加以下行
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="***"
password="***"
user_admin="***";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="***"
password="***";
};
在 config/server.properties 文件中添加了以下值。
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://:9092
然后运行这个命令 导出 KAFKA_OPTS="-Djava.security.auth.login.config=PATH_TO_JASS.CONF_FILE/kafka_server_jaas.conf"
然后开始
- sudo systemctl start zookeeper
- sudo systemctl start kafka kafka 服务器的状态是 active 并且在运行上面的命令后检查了状态。
但是当我执行 go run main.go
出现类似dial tcp 127.0.0.1:9092: connect: connection denied 或dial tcp 127.0.0.1:9092: connect: server misbehave 的错误。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。