diff --git a/CONFIG.md b/CONFIG.md index 173de77a..7f34c6e6 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -51,7 +51,7 @@ The following table shows which `grok_exporter` version uses which `config_versi Input Section ------------- -We currently support two input types: `file` and `stdin`. The following two sections describe the `file` input type and the `stdin` input type: +We currently support two input types: `file`, `stdin` and `kafka`. The following sections describe `file`, `stdin` and `kafka` input types: ### File Input Type @@ -92,6 +92,30 @@ the exporter will terminate as soon as `sample.log` is processed, and we will not be able to access the result via HTTP(S) after that. Always use a command that keeps the output open (like `tail -f`) when testing the `grok_exporter` with the `stdin` input. +### Kafka Input Type +Grok exporter can subscribe to kafka topics and parse message text. If messages are in json format some fields can be extracted from message. + +```yaml +input: + type: kafka + brokers: 'broker1:9092,broker2:9092' + topics: 'topic1,topic2' + jsonfields: 'loglevel,message,tag' +``` +Brokers parameter is a comma separated list of Kafka brokers (mandatory for input type kafka). Topics parameter is comma separated list of topics to subscribe to (mandatory for input type kafka). +Jsonfields parameter is a comma separated list of JSON fields which values will be extracted from JSON message and concatinated to one line (space as separator). +Jsonfields is optional parameter for input type kafka. If omited kafka messages will be sent to parser as string. +I.e. having this message from kafka +```json +{message: "123", loglevel: "INFO", host: "myhost1", tag: "live"} +``` +Parser will receive the text +```INFO 123 live``` +So Grok regexp should be designed accordingly. + +Messages from all topics are processed in same way (same grok parser). + + Grok Section ------------ diff --git a/README.md b/README.md index ecbde91e..9b66ce02 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,14 @@ git submodule update --init --recursive The resulting `grok_exporter` binary will be dynamically linked to the Oniguruma library, i.e. it needs the Oniguruma library to run. The [releases] are statically linked with Oniguruma, i.e. the releases don't require Oniguruma as a run-time dependency. The releases are built with `release.sh`. +**Building grok_exporter docker image** +Building docker image requires docker installed. +```bash +go get get github.com/mbarzilovich/grok_exporter +cd $GOPATH/src/github.com/fstab/grok_exporter +./build_image.sh +``` + More Documentation ------------------ diff --git a/build_image.sh b/build_image.sh new file mode 100755 index 00000000..024f7697 --- /dev/null +++ b/build_image.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +set +e + +cd $GOPATH/src/github.com/fstab/grok_exporter +docker run --rm --net none -it -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter ubuntu:16.04 rm -rf /root/go/src/github.com/fstab/grok_exporter/dist +mkdir dist + +export VERSION=0.2.2.3 +export ARCH=linux-amd64 +export VERSION_FLAGS="\ + -X github.com/fstab/grok_exporter/exporter.Version=$VERSION \ + -X github.com/fstab/grok_exporter/exporter.BuildDate=$(date +%Y-%m-%d) \ + -X github.com/fstab/grok_exporter/exporter.Branch=$(git rev-parse --abbrev-ref HEAD) \ + -X github.com/fstab/grok_exporter/exporter.Revision=$(git rev-parse --short HEAD) \ +" + +#-------------------------------------------------------------- +# Make sure all tests run. +#-------------------------------------------------------------- + +go fmt $(go list ./... | grep -v /vendor/) +go test $(go list ./... | grep -v /vendor/) +cp -r docker/* dist +cp -a logstash-patterns-core/patterns dist +docker run -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter --net none --rm -ti fstab/grok_exporter-compiler compile-$ARCH.sh -ldflags "$VERSION_FLAGS" -o dist/grok_exporter + +cd dist +docker build -t grok_exporter:$VERSION -t grok_exporter:latest . + + diff --git a/config/v1/configV1_test.go b/config/v1/configV1_test.go index a67da6ea..969f63d5 100644 --- a/config/v1/configV1_test.go +++ b/config/v1/configV1_test.go @@ -65,6 +65,7 @@ metrics: prometheus_label: user server: protocol: https + host: 0.0.0.0 port: 1111 ` @@ -111,6 +112,7 @@ metrics: user: '{{.user}}' server: protocol: https + host: 0.0.0.0 port: 1111 ` diff --git a/config/v2/configV2.go b/config/v2/configV2.go index 0804081d..a4699482 100644 --- a/config/v2/configV2.go +++ b/config/v2/configV2.go @@ -42,13 +42,17 @@ func (cfg *Config) String() string { } type GlobalConfig struct { - ConfigVersion int `yaml:"config_version,omitempty"` + ConfigVersion int `yaml:"config_version,omitempty"` + Debug bool `yaml:",omitempty"` } type InputConfig struct { - Type string `yaml:",omitempty"` - Path string `yaml:",omitempty"` - Readall bool `yaml:",omitempty"` + Type string `yaml:",omitempty"` + Path string `yaml:",omitempty"` + Readall bool `yaml:",omitempty"` + Brokers string `yaml:",omitempty"` + Topics string `yaml:",omitempty"` + Jsonfields string `yaml:",omitempty"` } type GrokConfig struct { @@ -116,6 +120,7 @@ func (c *GlobalConfig) addDefaults() { if c.ConfigVersion == 0 { c.ConfigVersion = 2 } + c.Debug = false } func (c *InputConfig) addDefaults() { @@ -124,7 +129,11 @@ func (c *InputConfig) addDefaults() { } } -func (c *GrokConfig) addDefaults() {} +func (c *GrokConfig) addDefaults() { + if c.PatternsDir == "" { + c.PatternsDir = "/patterns" + } +} func (c *MetricsConfig) addDefaults() {} @@ -132,6 +141,9 @@ func (c *ServerConfig) addDefaults() { if c.Protocol == "" { c.Protocol = "http" } + if c.Host == "" { + c.Host = "0.0.0.0" + } if c.Port == 0 { c.Port = 9144 } @@ -167,6 +179,13 @@ func (c *InputConfig) validate() error { if c.Path == "" { return fmt.Errorf("Invalid input configuration: 'input.path' is required for input type \"file\".") } + case c.Type == "kafka": + if c.Brokers == "" { + return fmt.Errorf("Invalid input configuration: 'input.brokers' is required for input type \"kafka\".") + } + if c.Topics == "" { + return fmt.Errorf("Invalid input configuration: 'input.topics' is required for input type \"kafka\".") + } default: return fmt.Errorf("Unsupported 'input.type': %v", c.Type) } @@ -174,9 +193,6 @@ func (c *InputConfig) validate() error { } func (c *GrokConfig) validate() error { - if c.PatternsDir == "" && len(c.AdditionalPatterns) == 0 { - return fmt.Errorf("Invalid grok configuration: no patterns defined: one of 'grok.patterns_dir' and 'grok.additional_patterns' must be configured.") - } return nil } diff --git a/config/v2/configV2_test.go b/config/v2/configV2_test.go index 1cd139f9..2b7d21d5 100644 --- a/config/v2/configV2_test.go +++ b/config/v2/configV2_test.go @@ -38,6 +38,7 @@ metrics: label_b: '{{.some_grok_field_b}}' server: protocol: https + host: 0.0.0.0 port: 1111 ` @@ -77,6 +78,7 @@ metrics: buckets: $BUCKETS server: protocol: http + host: 0.0.0.0 port: 9144 ` @@ -96,6 +98,7 @@ metrics: quantiles: $QUANTILES server: protocol: http + host: 0.0.0.0 port: 9144 ` diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..bdd54edc --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:16.04 + +ADD patterns /patterns +ADD grok_exporter /grok_exporter + +VOLUME /config.yml + +EXPOSE 9144 + +CMD /grok_exporter -config /config.yml \ No newline at end of file diff --git a/grok_exporter.go b/grok_exporter.go index f32799d5..79673d25 100644 --- a/grok_exporter.go +++ b/grok_exporter.go @@ -31,6 +31,7 @@ var ( printVersion = flag.Bool("version", false, "Print the grok_exporter version.") configPath = flag.String("config", "", "Path to the config file. Try '-config ./example/config.yml' to get started.") showConfig = flag.Bool("showconfig", false, "Print the current configuration to the console. Example: 'grok_exporter -showconfig -config ./exemple/config.yml'") + debug = flag.Bool("debug", false, "Print extra logs") ) const ( @@ -46,6 +47,7 @@ func main() { } validateCommandLineOrExit() cfg, warn, err := config.LoadConfigFile(*configPath) + cfg.Global.Debug = *debug if len(warn) > 0 && !*showConfig { // warning is suppressed when '-showconfig' is used fmt.Fprintf(os.Stderr, "%v\n", warn) @@ -244,6 +246,8 @@ func startTailer(cfg *v2.Config) (tailer.Tailer, error) { tail = tailer.RunFileTailer(cfg.Input.Path, cfg.Input.Readall, nil) case cfg.Input.Type == "stdin": tail = tailer.RunStdinTailer() + case cfg.Input.Type == "kafka": + tail = tailer.RunKafkaTailer(cfg) default: return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type) } diff --git a/release.sh b/release.sh index f01e4dc2..74648a34 100755 --- a/release.sh +++ b/release.sh @@ -51,6 +51,6 @@ function make_release { cd .. } -make_release native darwin-amd64 +#make_release native darwin-amd64 make_release docker linux-amd64 -make_release docker windows-amd64 .exe +#make_release docker windows-amd64 .exe diff --git a/tailer/kafkaTailer.go b/tailer/kafkaTailer.go new file mode 100644 index 00000000..987af66a --- /dev/null +++ b/tailer/kafkaTailer.go @@ -0,0 +1,97 @@ +package tailer + +import ( + "encoding/json" + "github.com/fstab/grok_exporter/config/v2" + "github.com/optiopay/kafka" + "log" + "strings" +) + +type kafkaTailer struct { + lines chan string + errors chan error +} + +func (t *kafkaTailer) Lines() chan string { + return t.lines +} + +func (t *kafkaTailer) Errors() chan error { + return t.errors +} + +func (t *kafkaTailer) Close() { + // broker.Close() +} + +func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error, topic string, partitions int32, cfg *v2.Config) { + fetchers := []kafka.Consumer{} + log.Printf("Creating consumer for topic %s", topic) + for partition := int32(0); partition < partitions; partition++ { + conf := kafka.NewConsumerConf(topic, partition) + conf.StartOffset = kafka.StartOffsetNewest + consumer, err := broker.Consumer(conf) + if err != nil { + log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) + errorChan <- err + } + fetchers = append(fetchers, consumer) + } + mx := kafka.Merge(fetchers...) + defer mx.Close() + log.Printf("Consumer for topic %s is ready", topic) + for { + var data map[string]interface{} + msg, err := mx.Consume() + if err != nil { + if err != kafka.ErrMxClosed { + log.Printf("All consumers stopped. Cannot consume %q topic message: %s", topic, err) + } + errorChan <- err + return + } + s := []string{} + if cfg.Input.Jsonfields == "" { + s = append(s, string(msg.Value)) + } else { + if err := json.Unmarshal([]byte(msg.Value), &data); err != nil { + log.Fatalf("Cannot unmarshal JSON message %s because of the error: %s", string(msg.Value), err) + } + for _, field := range strings.Split(cfg.Input.Jsonfields, ",") { + if data[field] != nil { + s = append(s, data[field].(string)) + } + } + } + line := strings.Join(s, " ") + if cfg.Global.Debug { + log.Println("Sending line to parcer: " + line) + } + lineChan <- line + } +} + +func RunKafkaTailer(cfg *v2.Config) Tailer { + lineChan := make(chan string) + errorChan := make(chan error) + topics := strings.Split(cfg.Input.Topics, ",") + brokerConf := kafka.NewBrokerConf("grok-exporter-client") + broker, err := kafka.Dial(strings.Split(cfg.Input.Brokers, ","), brokerConf) + if err != nil { + log.Fatalf("cannot connect to kafka cluster: %s", err) + errorChan <- err + } + for _, topic := range topics { + partitions, err := broker.PartitionCount(topic) + if err != nil { + log.Fatalf("Unable to fetch partitions from broker for topis %s\n", topic) + errorChan <- err + } + go RunConsumer(broker, lineChan, errorChan, topic, partitions, cfg) + } + return &kafkaTailer{ + lines: lineChan, + errors: errorChan, + } +} diff --git a/vendor/github.com/golang/snappy b/vendor/github.com/golang/snappy new file mode 160000 index 00000000..553a6414 --- /dev/null +++ b/vendor/github.com/golang/snappy @@ -0,0 +1 @@ +Subproject commit 553a641470496b2327abcac10b36396bd98e45c9 diff --git a/vendor/github.com/optiopay/kafka b/vendor/github.com/optiopay/kafka new file mode 160000 index 00000000..7b7b6a0d --- /dev/null +++ b/vendor/github.com/optiopay/kafka @@ -0,0 +1 @@ +Subproject commit 7b7b6a0d49740e36cd39226913b38cecf9ef76cb diff --git a/vendor/update.sh b/vendor/update.sh index e26b35d0..dee542d7 100755 --- a/vendor/update.sh +++ b/vendor/update.sh @@ -110,6 +110,19 @@ git checkout 7be2ce36128ef1337a5348a7cb5a599830b42ac3 find . -type f | grep -v winfsnotify.go | xargs rm -f find . -type d -empty -delete +########################################################################### +# github.com/optiopay/kafka +########################################################################### + +cd $VENDOR +mkdir -p github.com/optiopay/ +cd github.com/optiopay +git clone https://github.com/optiopay/kafka.git +cd kafka +git checkout 7b7b6a0d49740e36cd39226913b38cecf9ef76cb +rm -rf .git .gitignore .travis.yml LICENSE README.md + + ########################################################################### find $VENDOR -type f -name '*_test.go' | xargs rm