Skip to content

Commit 3ed0af1

Browse files
committed
Adding changes for kafka tailer integration
1 parent ed28cb8 commit 3ed0af1

File tree

1,153 files changed

+334279
-7
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,153 files changed

+334279
-7
lines changed

CONFIG.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,37 @@ input:
200200
201201
This configuration example may be found in the examples directory [here](example/config_logstash_http_input_ipv6.yml).
202202
203+
### Kafka Input Type
204+
205+
The `grok_exporter` is also capable of consuming log entries from Kafka. Currently, only plain-text encoded messages are supported.
206+
207+
```yaml
208+
input:
209+
type: kafka
210+
# Version corresponding to the kafka cluster
211+
kafka_version: 2.1.0
212+
213+
# The list of the Kafka brokers part of the Kafka cluster. Please note that you need an instance of grok_exporter per Kafka cluster if you plan on consuming from topics from multiple clusters.
214+
kafka_brokers:
215+
- localhost:9092
216+
217+
# The list of Kafka topics to consume from.
218+
kafka_topics:
219+
- grok_exporter_test
220+
221+
# The assignor to use, which can be either range, roundrobin, sticky (range by default)
222+
kafka_partition_assignor: range
223+
224+
# The name of the consumer group to register as on the broker. If not specified, the default is 'grok_exporter'
225+
kafka_consumer_group_name: grok_exporter
226+
227+
# Indicates if the exporter should start consuming as of the most recent messages in the topic (true), or consume from the earliest messages in the topic (false).
228+
kafka_consume_from_oldest: false
229+
```
230+
231+
This configuration example may be found in the examples directory [here](example/config-kafka.yml).
232+
233+
203234
imports Section
204235
---------------
205236

config/v3/configV3.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,24 @@ package v3
1616

1717
import (
1818
"fmt"
19-
v2 "github.com/fstab/grok_exporter/config/v2"
20-
"github.com/fstab/grok_exporter/tailer/glob"
21-
"github.com/fstab/grok_exporter/template"
22-
"gopkg.in/yaml.v2"
2319
"os"
20+
"regexp"
2421
"strconv"
2522
"strings"
2623
"time"
24+
25+
v2 "github.com/fstab/grok_exporter/config/v2"
26+
"github.com/fstab/grok_exporter/tailer/glob"
27+
"github.com/fstab/grok_exporter/template"
28+
"gopkg.in/yaml.v2"
2729
)
2830

2931
const (
3032
defaultRetentionCheckInterval = 53 * time.Second
3133
inputTypeStdin = "stdin"
3234
inputTypeFile = "file"
3335
inputTypeWebhook = "webhook"
36+
inputTypeKafka = "kafka"
3437
importMetricsType = "metrics"
3538
importPatternsType = "grok_patterns"
3639
)
@@ -102,6 +105,12 @@ type InputConfig struct {
102105
WebhookFormat string `yaml:"webhook_format,omitempty"`
103106
WebhookJsonSelector string `yaml:"webhook_json_selector,omitempty"`
104107
WebhookTextBulkSeparator string `yaml:"webhook_text_bulk_separator,omitempty"`
108+
KafkaVersion string `yaml:"kafka_version,omitempty"`
109+
KafkaBrokers []string `yaml:"kafka_brokers,omitempty"`
110+
KafkaTopics []string `yaml:"kafka_topics,omitempty"`
111+
KafkaPartitionAssignor string `yaml:"kafka_partition_assignor,omitempty"`
112+
KafkaConsumerGroupName string `yaml:"kafka_consumer_group_name,omitempty"`
113+
KafkaConsumeFromOldest bool `yaml:"kafka_consume_from_oldest,omitempty"`
105114
}
106115

107116
type GrokPatternsConfig []string
@@ -268,6 +277,19 @@ func (c *InputConfig) addDefaults() {
268277
c.WebhookTextBulkSeparator = "\n\n"
269278
}
270279
}
280+
if c.Type == inputTypeKafka {
281+
c.KafkaConsumeFromOldest = false
282+
283+
if c.KafkaPartitionAssignor == "" {
284+
c.KafkaPartitionAssignor = "range"
285+
}
286+
if c.KafkaVersion == "" {
287+
c.KafkaVersion = "2.1.0"
288+
}
289+
if c.KafkaConsumerGroupName == "" {
290+
c.KafkaConsumerGroupName = "grok_exporter"
291+
}
292+
}
271293
}
272294

273295
func (c *GrokPatternsConfig) addDefaults() {}
@@ -403,6 +425,26 @@ func (c *InputConfig) validate() error {
403425
if c.WebhookFormat == "text_bulk" && c.WebhookTextBulkSeparator == "" {
404426
return fmt.Errorf("invalid input configuration: 'input.webhook_text_bulk_separator' is required for input type \"webhook\" and webhook_format \"text_bulk\"")
405427
}
428+
case c.Type == inputTypeKafka:
429+
if len(c.KafkaBrokers) == 0 {
430+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_brokers' cannot be empty")
431+
}
432+
if len(c.KafkaTopics) == 0 {
433+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_topics' cannot be empty")
434+
}
435+
436+
matched, _ := regexp.MatchString(`^[0-9]\.[0-9]\.[0-9]$`, c.KafkaVersion)
437+
if !matched {
438+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_version' must a valid semantic version X.Y.Z")
439+
}
440+
441+
versionParts := strings.Split(c.KafkaVersion, ".")
442+
vMajor, vMajorErr := strconv.Atoi(versionParts[0])
443+
vMinor, vMinorErr := strconv.Atoi(versionParts[1])
444+
if vMajorErr != nil && vMinorErr != nil && vMajor < 1 && vMinor < 8 {
445+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_version' must be >= 0.8.0")
446+
}
447+
406448
default:
407449
return fmt.Errorf("unsupported 'input.type': %v", c.Type)
408450
}

example/config-kafka.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
global:
2+
config_version: 3
3+
input:
4+
type: kafka
5+
kafka_version: 2.1.0
6+
kafka_brokers:
7+
- localhost:9092
8+
kafka_topics:
9+
- grok_exporter_test
10+
kafka_consumer_group_name: grok_exporter
11+
kafka_consume_from_oldest: true
12+
imports:
13+
- type: grok_patterns
14+
dir: ./logstash-patterns-core/patterns
15+
metrics:
16+
- type: counter
17+
name: test_strings
18+
help: Test string detected.
19+
match: 'test'
20+
labels:
21+
logfile: '{{base .logfile}}'
22+
server:
23+
protocol: http
24+
port: 9144
25+

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
module github.com/fstab/grok_exporter
22

33
require (
4+
github.com/Shopify/sarama v1.27.0
45
github.com/bitly/go-simplejson v0.5.0
56
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
7+
github.com/ofabry/go-callvis v0.6.1 // indirect
68
github.com/prometheus/client_golang v1.7.1
79
github.com/prometheus/client_model v0.2.0
810
github.com/prometheus/common v0.10.0
911
github.com/sirupsen/logrus v1.6.0
1012
golang.org/x/exp v0.0.0-20191227195350-da58074b4299
13+
golang.org/x/image v0.0.0-20200801110659-972c09e46d76 // indirect
14+
golang.org/x/tools v0.0.0-20200828161849-5deb26317202 // indirect
15+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
1116
gopkg.in/yaml.v2 v2.3.0
1217
)
1318

0 commit comments

Comments
 (0)