Skip to content

Commit ef59621

Browse files
committed
feat(kafka): add authentication and authorization support
1 parent f2b32ed commit ef59621

File tree

3 files changed

+150
-76
lines changed

3 files changed

+150
-76
lines changed

plugins/extractors/kafka/kafka.go

Lines changed: 59 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ type AuthConfig struct {
5858
// certificate authority file for TLS client authentication
5959
CAFile string `mapstructure:"ca_file"`
6060
} `mapstructure:"tls"`
61+
62+
SASL struct {
63+
Enabled bool `mapstructure:"enabled"`
64+
Mechanism string `mapstructure:"mechanism"`
65+
}
6166
}
6267

6368
var sampleConfig = `broker: "localhost:9092"`
@@ -74,7 +79,7 @@ var info = plugins.Info{
7479
type Extractor struct {
7580
plugins.BaseExtractor
7681
// internal states
77-
conn *kafka.Conn
82+
conn sarama.Consumer
7883
logger log.Logger
7984
config Config
8085
clientDurn metric.Int64Histogram
@@ -104,69 +109,91 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
104109
return err
105110
}
106111

107-
// create default dialer
108-
dialer := &kafka.Dialer{
109-
Timeout: 10 * time.Second,
110-
DualStack: true,
111-
}
112+
consumerConfig := sarama.NewConfig()
112113

113114
if e.config.Auth.TLS.Enabled {
114115
tlsConfig, err := e.createTLSConfig()
115116
if err != nil {
116117
return fmt.Errorf("create tls config: %w", err)
117118
}
118-
119-
dialer.TLS = tlsConfig
119+
consumerConfig.Net.TLS.Enable = true
120+
consumerConfig.Net.TLS.Config = tlsConfig
121+
122+
if e.config.Auth.SASL.Enabled {
123+
consumerConfig.Net.SASL.Enable = true
124+
if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
125+
consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
126+
consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
127+
}
120128
}
121129

122-
// create connection
123-
e.conn, err = dialer.DialContext(ctx, "tcp", e.config.Broker)
130+
consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig)
124131
if err != nil {
125-
return fmt.Errorf("create connection: %w", err)
132+
fmt.Printf("Error is here !! %s", err.Error())
133+
return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker,
134+
consumerConfig, err.Error())
126135
}
127-
136+
e.conn = consumer
128137
return nil
129138
}
130139

131140
// Extract checks if the extractor is ready to extract
132141
// if so, then extracts metadata from the kafka broker
133-
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
142+
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {
134143
defer e.conn.Close()
135144

136-
partitions, err := e.readPartitions(ctx)
137-
if err != nil {
138-
return fmt.Errorf("fetch partitions: %w", err)
139-
}
145+
defer func(start time.Time) {
146+
attributes := []attribute.KeyValue{
147+
attribute.String("kafka.broker", e.config.Broker),
148+
attribute.Bool("success", err == nil),
149+
}
150+
if err != nil {
151+
errorCode := "UNKNOWN"
152+
var kErr kafka.Error
153+
if errors.As(err, &kErr) {
154+
errorCode = strings.ReplaceAll(
155+
strings.ToUpper(kErr.Title()), " ", "_",
156+
)
157+
}
158+
attributes = append(attributes, attribute.String("kafka.error_code", errorCode))
159+
}
140160

141-
// collect topic list from partition list
142-
topics := map[string]int{}
143-
for _, p := range partitions {
144-
topics[p.Topic]++
161+
e.clientDurn.Record(
162+
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
163+
)
164+
}(time.Now())
165+
topics, err := e.conn.Topics()
166+
if err != nil {
167+
return fmt.Errorf("fetch topics: %w", err)
145168
}
146169

147170
// build and push topics
148-
for topic, numOfPartitions := range topics {
171+
for _, topic := range topics {
149172
// skip if topic is a default topic
150173
_, isDefaultTopic := defaultTopics[topic]
151174
if isDefaultTopic {
152175
continue
153176
}
154177

155-
asset, err := e.buildAsset(topic, numOfPartitions)
178+
partitions, err := e.conn.Partitions(topic)
179+
if err != nil {
180+
e.logger.Error("failed to fetch partitions for topic", "err", err, "topic", topic)
181+
continue
182+
}
183+
asset, err := e.buildAsset(topic, len(partitions))
156184
if err != nil {
157185
e.logger.Error("failed to build asset", "err", err, "topic", topic)
158186
continue
159187
}
160188
emit(models.NewRecord(asset))
161189
}
162-
163190
return nil
164191
}
165192

166193
func (e *Extractor) createTLSConfig() (*tls.Config, error) {
167194
authConfig := e.config.Auth.TLS
168195

169-
if authConfig.CertFile == "" || authConfig.KeyFile == "" || authConfig.CAFile == "" {
196+
if authConfig.CAFile == "" {
170197
//nolint:gosec
171198
return &tls.Config{
172199
InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify,
@@ -178,9 +205,13 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) {
178205
return nil, fmt.Errorf("create cert: %w", err)
179206
}
180207

181-
caCert, err := os.ReadFile(authConfig.CAFile)
182-
if err != nil {
183-
return nil, fmt.Errorf("read ca cert file: %w", err)
208+
var cert tls.Certificate
209+
var err error
210+
if authConfig.CertFile != "" && authConfig.KeyFile != "" {
211+
cert, err = tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile)
212+
if err != nil {
213+
return nil, fmt.Errorf("create cert: %w", err)
214+
}
184215
}
185216

186217
caCertPool := x509.NewCertPool()
@@ -215,31 +246,6 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.
215246
}, nil
216247
}
217248

218-
func (e *Extractor) readPartitions(ctx context.Context) (partitions []kafka.Partition, err error) {
219-
defer func(start time.Time) {
220-
attributes := []attribute.KeyValue{
221-
attribute.String("kafka.broker", e.config.Broker),
222-
attribute.Bool("success", err == nil),
223-
}
224-
if err != nil {
225-
errorCode := "UNKNOWN"
226-
var kErr kafka.Error
227-
if errors.As(err, &kErr) {
228-
errorCode = strings.ReplaceAll(
229-
strings.ToUpper(kErr.Title()), " ", "_",
230-
)
231-
}
232-
attributes = append(attributes, attribute.String("kafka.error_code", errorCode))
233-
}
234-
235-
e.clientDurn.Record(
236-
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
237-
)
238-
}(time.Now())
239-
240-
return e.conn.ReadPartitions()
241-
}
242-
243249
func init() {
244250
if err := registry.Extractors.Register("kafka", func() plugins.Extractor {
245251
return New(plugins.GetLog())

plugins/extractors/kafka/kafka_test.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ package kafka_test
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"log"
10-
"net"
1111
"os"
12-
"strconv"
1312
"testing"
13+
"time"
14+
15+
kafkaLib "github.com/IBM/sarama"
1416

1517
"github.com/ory/dockertest/v3"
1618
"github.com/ory/dockertest/v3/docker"
@@ -27,13 +29,12 @@ import (
2729
)
2830

2931
var (
30-
brokerHost = "localhost:9093"
32+
brokerHost = "0.0.0.0:9093"
3133
urnScope = "test-kafka"
3234
)
3335

3436
func TestMain(m *testing.M) {
35-
var conn *kafkaLib.Conn
36-
var broker kafkaLib.Broker
37+
var broker *kafkaLib.Broker
3738

3839
// setup test
3940
opts := dockertest.RunOptions{
@@ -49,25 +50,23 @@ func TestMain(m *testing.M) {
4950
},
5051
},
5152
}
53+
5254
retryFn := func(resource *dockertest.Resource) (err error) {
53-
// create client
54-
conn, err = kafkaLib.Dial("tcp", brokerHost)
55+
time.Sleep(30 * time.Second)
56+
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
5557
if err != nil {
5658
return
5759
}
5860

5961
// healthcheck
60-
brokerList, err := conn.Brokers()
61-
if err != nil {
62-
return
63-
}
64-
if len(brokerList) == 0 {
62+
if len(conn.Brokers()) == 0 {
6563
err = errors.New("not ready")
6664
return
6765
}
6866

6967
broker, err = conn.Controller()
7068
if err != nil {
69+
fmt.Printf("error fetching controller %s", err.Error())
7170
conn.Close()
7271
return
7372
}
@@ -163,7 +162,7 @@ func TestInit(t *testing.T) {
163162
},
164163
})
165164

166-
assert.ErrorContains(t, err, "create connection")
165+
assert.ErrorContains(t, err, "failed to create kafka consumer")
167166
})
168167
}
169168

@@ -226,24 +225,26 @@ func TestExtract(t *testing.T) {
226225
})
227226
}
228227

229-
func setup(broker kafkaLib.Broker) (err error) {
230-
// create broker connection to create topics
231-
var conn *kafkaLib.Conn
232-
conn, err = kafkaLib.Dial("tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)))
228+
func setup(broker *kafkaLib.Broker) (err error) {
229+
// create client connection to create topics
230+
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
233231
if err != nil {
232+
fmt.Printf("error creating client ")
234233
return
235234
}
236235
defer conn.Close()
237236

238237
// create topics
239-
topicConfigs := []kafkaLib.TopicConfig{
240-
{Topic: "meteor-test-topic-1", NumPartitions: 1, ReplicationFactor: 1},
241-
{Topic: "meteor-test-topic-2", NumPartitions: 1, ReplicationFactor: 1},
242-
{Topic: "meteor-test-topic-3", NumPartitions: 1, ReplicationFactor: 1},
243-
{Topic: "__consumer_offsets", NumPartitions: 1, ReplicationFactor: 1},
238+
topicConfigs := map[string]*kafkaLib.TopicDetail{
239+
"meteor-test-topic-1": {NumPartitions: 1, ReplicationFactor: 1},
240+
"meteor-test-topic-2": {NumPartitions: 1, ReplicationFactor: 1},
241+
"meteor-test-topic-3": {NumPartitions: 1, ReplicationFactor: 1},
242+
"__consumer_offsets": {NumPartitions: 1, ReplicationFactor: 1},
244243
}
245-
err = conn.CreateTopics(topicConfigs...)
244+
createTopicRequest := &kafkaLib.CreateTopicsRequest{TopicDetails: topicConfigs}
245+
_, err = broker.CreateTopics(createTopicRequest)
246246
if err != nil {
247+
fmt.Printf("error creating topics! %s", err.Error())
247248
return
248249
}
249250

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package kafka
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"strings"
7+
8+
"github.com/IBM/sarama"
9+
"github.com/rs/zerolog/log"
10+
)
11+
12+
const (
13+
kubernetesServiceAccountTokenPath = "/var/run/secrets/kafka/serviceaccount/token"
14+
)
15+
16+
// NewKubernetesTokenProvider creates a new TokenProvider that reads the token from kubernetes pod service account
17+
// token file. By default, the token file path for kafka is stored in `/var/run/secrets/kafka/serviceaccount/token`.
18+
// User need to make sure there a valid projected service account token on that path.
19+
func NewKubernetesTokenProvider(opts ...TokenProviderOption) *KubernetesTokenProvider {
20+
options := &TokenProviderOptions{
21+
FilePath: kubernetesServiceAccountTokenPath,
22+
}
23+
for _, o := range opts {
24+
o(options)
25+
}
26+
log.Info().Str("token_file_path", options.FilePath).Msg("token provider options")
27+
return &KubernetesTokenProvider{
28+
serviceAccountFilePath: options.FilePath,
29+
}
30+
}
31+
32+
type KubernetesTokenProvider struct {
33+
serviceAccountFilePath string
34+
}
35+
36+
// Token returns the token from the service account token file.
37+
func (tp *KubernetesTokenProvider) Token() (*sarama.AccessToken, error) {
38+
token, err := tp.readFile()
39+
if err != nil {
40+
log.Error().Err(err).Msg("failed to read token from service account token file")
41+
return nil, err
42+
}
43+
return &sarama.AccessToken{
44+
Token: token,
45+
}, nil
46+
}
47+
func (tp *KubernetesTokenProvider) readFile() (string, error) {
48+
token, err := os.ReadFile(tp.serviceAccountFilePath)
49+
if err != nil {
50+
return "", fmt.Errorf("failed to read files: %w", err)
51+
}
52+
tkn := strings.TrimSpace(string(token))
53+
return tkn, nil
54+
}
55+
56+
type TokenProviderOptions struct {
57+
// FilePath is the path to the file containing the token.
58+
FilePath string
59+
}
60+
type TokenProviderOption func(*TokenProviderOptions)
61+
62+
// WithTokenFilePath sets the file path to the token.
63+
func WithTokenFilePath(path string) TokenProviderOption {
64+
return func(o *TokenProviderOptions) {
65+
o.FilePath = path
66+
}
67+
}

0 commit comments

Comments
 (0)