-
Notifications
You must be signed in to change notification settings - Fork 282
Description
I'm connecting to Kafka to push messages, and there are no errors when creating the instance. I'm using Kafka settings that allow automatic topic creation, and I can see that the topic is created successfully. However, when sending messages to this topic, it keeps reporting the error: "closed". What could be the possible reason for this?
This is the code:
local function test()
core.log.warn("test=================")
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "172.31.191.10",
port = 9092,
sasl_config = {
mechanism = "PLAIN",
user = "kaclient",
password = "0x1jZTDWGtO10+gsxtNP7s1T1lZNaNmy72QYE9HVBEM=",
},
},
}
local key = "key"
local message = "hello world"
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
core.log.warn("brokers=====:", core.json.encode(brokers))
core.log.warn("partitions====", core.json.encode(partitions))
if not brokers then
core.log.warn("fetch metadata err:", brokers)
return
end
local bp = producer:new(broker_list, { producer_type = "sync" })
local ok, err = bp:send("test", key, message)
core.log.warn("result=====", core.json.encode(ok))
if not ok then
core.log.warn("send err:", err)
return
end
core.log.warn("send success")
end
This is the error:
2025/10/29 14:44:27 [warn] 49#49: 4767 [lua] customer-operate.lua:196: handler(): test***************************, client: 10.244.99.64, server: , request: "PUT /v1/customer/push-kafka HTTP/1.1", host: "172.31.197.177:31688"
2025/10/29 14:44:27 [warn] 49#49: *4767 [lua] customer-operate.lua:217: handler(): brokers=====:[{"host":"172.31.191.10","sasl_config":{"password":"0x1jZTDWGtO10+gsxtNP7s1T1lZNaNmy72QYE9HVBEM=","mechanism":"PLAIN","user":"kaclient"},"port":9092}], client: 10.244.99.64, server: , request: "PUT /v1/customer/push-kafka HTTP/1.1", host: "172.31.197.177:31688"
2025/10/29 14:44:27 [warn] 49#49: *4767 [lua] customer-operate.lua:218: handler(): partitions===={"0":{"replicas":[1],"isr":[1],"leader":1,"errcode":0,"id":0},"num":1,"errcode":0}, client: 10.244.99.64, server: , request: "PUT /v1/customer/push-kafka HTTP/1.1", host: "172.31.197.177:31688"
2025/10/29 14:44:28 [warn] 49#49: *4767 [lua] customer-operate.lua:226: handler(): reult=====null, client: 10.244.99.64, server: , request: "PUT /v1/customer/push-kafka HTTP/1.1", host: "172.31.197.177:31688"
2025/10/29 14:44:28 [warn] 49#49: *4767 [lua] customer-operate.lua:228: handler(): send err:closed, client: 10.244.99.64, server: , request: "PUT /v1/customer/push-kafka HTTP/1.1", host: "172.31.197.177:31688"