Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,37 @@ input:

This configuration example may be found in the examples directory [here](example/config_logstash_http_input_ipv6.yml).

### Kafka Input Type

The `grok_exporter` is also capable of consuming log entries from Kafka. Currently, only plain-text encoded messages are supported.

```yaml
input:
type: kafka
# Version corresponding to the kafka cluster
kafka_version: 2.1.0

# The list of the Kafka brokers part of the Kafka cluster. Please note that you need an instance of grok_exporter per Kafka cluster if you plan on consuming from topics from multiple clusters.
kafka_brokers:
- localhost:9092

# The list of Kafka topics to consume from.
kafka_topics:
- grok_exporter_test

# The assignor to use, which can be either range, roundrobin, sticky (range by default)
kafka_partition_assignor: range

# The name of the consumer group to register as on the broker. If not specified, the default is 'grok_exporter'
kafka_consumer_group_name: grok_exporter

# Indicates if the exporter should start consuming as of the most recent messages in the topic (true), or consume from the earliest messages in the topic (false).
kafka_consume_from_oldest: false
```

This configuration example may be found in the examples directory [here](example/config-kafka.yml).


imports Section
---------------

Expand Down
56 changes: 56 additions & 0 deletions KAFKA_TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
## How to test the Kafka integration

1. Ensure you have docker setup on your system.

2. Create the following config and save it as `docker-compose-kafka-single.yml`
```
---
version: '2'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
ports:
- 32181:32181
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
extra_hosts:
- "moby:127.0.0.1"
- "localhost: 127.0.0.1"

kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
ports:
- 9092:9092
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
extra_hosts:
- "moby:127.0.0.1"
- "localhost: 127.0.0.1"
```

3. Create the kafka cluster:
```
docker-compose -f docker-compose-kafka-single.yml up
```

4. Create the necessary topic:
```
docker-compose -f docker-compose-kafka-single.yml exec kafka kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic grok_exporter_test
```

5. Publish a sample test message:
```
docker-compose -f docker-compose-kafka-single.yml exec bash -c "echo 'this is a test' | kafka-console-producer --request-required-acks 1 --broker-list localhost:9092 --topic grok_exporter_test"
```

6. Given that the grok_exporter was properly configured and you're matching for a string in the message you've previously published to kafka, you should have matches that appear on the metrics page.
50 changes: 46 additions & 4 deletions config/v3/configV3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@ package v3

import (
"fmt"
v2 "github.com/fstab/grok_exporter/config/v2"
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/fstab/grok_exporter/template"
"gopkg.in/yaml.v2"
"os"
"regexp"
"strconv"
"strings"
"time"

v2 "github.com/fstab/grok_exporter/config/v2"
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/fstab/grok_exporter/template"
"gopkg.in/yaml.v2"
)

const (
defaultRetentionCheckInterval = 53 * time.Second
inputTypeStdin = "stdin"
inputTypeFile = "file"
inputTypeWebhook = "webhook"
inputTypeKafka = "kafka"
importMetricsType = "metrics"
importPatternsType = "grok_patterns"
)
Expand Down Expand Up @@ -102,6 +105,12 @@ type InputConfig struct {
WebhookFormat string `yaml:"webhook_format,omitempty"`
WebhookJsonSelector string `yaml:"webhook_json_selector,omitempty"`
WebhookTextBulkSeparator string `yaml:"webhook_text_bulk_separator,omitempty"`
KafkaVersion string `yaml:"kafka_version,omitempty"`
KafkaBrokers []string `yaml:"kafka_brokers,omitempty"`
KafkaTopics []string `yaml:"kafka_topics,omitempty"`
KafkaPartitionAssignor string `yaml:"kafka_partition_assignor,omitempty"`
KafkaConsumerGroupName string `yaml:"kafka_consumer_group_name,omitempty"`
KafkaConsumeFromOldest bool `yaml:"kafka_consume_from_oldest,omitempty"`
}

type GrokPatternsConfig []string
Expand Down Expand Up @@ -268,6 +277,19 @@ func (c *InputConfig) addDefaults() {
c.WebhookTextBulkSeparator = "\n\n"
}
}
if c.Type == inputTypeKafka {
c.KafkaConsumeFromOldest = false

if c.KafkaPartitionAssignor == "" {
c.KafkaPartitionAssignor = "range"
}
if c.KafkaVersion == "" {
c.KafkaVersion = "2.1.0"
}
if c.KafkaConsumerGroupName == "" {
c.KafkaConsumerGroupName = "grok_exporter"
}
}
}

func (c *GrokPatternsConfig) addDefaults() {}
Expand Down Expand Up @@ -403,6 +425,26 @@ func (c *InputConfig) validate() error {
if c.WebhookFormat == "text_bulk" && c.WebhookTextBulkSeparator == "" {
return fmt.Errorf("invalid input configuration: 'input.webhook_text_bulk_separator' is required for input type \"webhook\" and webhook_format \"text_bulk\"")
}
case c.Type == inputTypeKafka:
if len(c.KafkaBrokers) == 0 {
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_brokers' cannot be empty")
}
if len(c.KafkaTopics) == 0 {
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_topics' cannot be empty")
}

matched, _ := regexp.MatchString(`^[0-9]\.[0-9]\.[0-9]$`, c.KafkaVersion)
if !matched {
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_version' must a valid semantic version X.Y.Z")
}

versionParts := strings.Split(c.KafkaVersion, ".")
vMajor, vMajorErr := strconv.Atoi(versionParts[0])
vMinor, vMinorErr := strconv.Atoi(versionParts[1])
if vMajorErr != nil && vMinorErr != nil && vMajor < 1 && vMinor < 8 {
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_version' must be >= 0.8.0")
}

default:
return fmt.Errorf("unsupported 'input.type': %v", c.Type)
}
Expand Down
25 changes: 25 additions & 0 deletions example/config-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
global:
config_version: 3
input:
type: kafka
kafka_version: 2.1.0
kafka_brokers:
- localhost:9092
kafka_topics:
- grok_exporter_test
kafka_consumer_group_name: grok_exporter
kafka_consume_from_oldest: true
imports:
- type: grok_patterns
dir: ./logstash-patterns-core/patterns
metrics:
- type: counter
name: test_strings
help: Test string detected.
match: 'test'
labels:
logfile: '{{base .logfile}}'
server:
protocol: http
port: 9144

5 changes: 4 additions & 1 deletion exporter/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func verifyFieldName(metricName string, template template.Template, regex *onigu
return fmt.Errorf("%v: field name %v is ambigous, as this field is defined in the grok pattern but is also a global field provided by grok_exporter for the %v", metricName, grokFieldName, description)
}
} else {
if !regex.HasCaptureGroup(grokFieldName) {
numGroups := regex.NumberOfCaptureGroups(grokFieldName)
if numGroups == 0 {
return fmt.Errorf("%v: grok field %v not found in match pattern", metricName, grokFieldName)
} else if numGroups > 1 {
return fmt.Errorf("%v: grok field %v found %d times in match pattern: this is ambiguous, the pattern should define each grok field exactly once", metricName, grokFieldName, numGroups)
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
module github.com/fstab/grok_exporter

require (
github.com/Shopify/sarama v1.27.0
github.com/bitly/go-simplejson v0.5.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/ofabry/go-callvis v0.6.1 // indirect
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.10.0
github.com/sirupsen/logrus v1.6.0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299
golang.org/x/image v0.0.0-20200801110659-972c09e46d76 // indirect
golang.org/x/tools v0.0.0-20200828161849-5deb26317202 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v2 v2.3.0
)

Expand Down
Loading