Skip to content

Commit b8b25bd

Browse files
committed
#5: replace polling file tailer with multi file version
1 parent 3139f7f commit b8b25bd

File tree

8 files changed

+108
-55
lines changed

8 files changed

+108
-55
lines changed

grok_exporter.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"github.com/fstab/grok_exporter/exporter"
2323
"github.com/fstab/grok_exporter/oniguruma"
2424
"github.com/fstab/grok_exporter/tailer"
25+
"github.com/fstab/grok_exporter/tailer/fswatcher"
26+
"github.com/fstab/grok_exporter/tailer/glob"
2527
"github.com/prometheus/client_golang/prometheus"
2628
"github.com/sirupsen/logrus"
2729
"net/http"
@@ -87,21 +89,21 @@ func main() {
8789
matched := false
8890
for _, metric := range metrics {
8991
start := time.Now()
90-
match, err := metric.ProcessMatch(line)
92+
match, err := metric.ProcessMatch(line.Line)
9193
if err != nil {
9294
fmt.Fprintf(os.Stderr, "WARNING: skipping log line: %v\n", err.Error())
93-
fmt.Fprintf(os.Stderr, "%v\n", line)
95+
fmt.Fprintf(os.Stderr, "%v\n", line.Line)
9496
nErrorsByMetric.WithLabelValues(metric.Name()).Inc()
9597
}
9698
if match != nil {
9799
nMatchesByMetric.WithLabelValues(metric.Name()).Inc()
98100
procTimeMicrosecondsByMetric.WithLabelValues(metric.Name()).Add(float64(time.Since(start).Nanoseconds() / int64(1000)))
99101
matched = true
100102
}
101-
_, err = metric.ProcessDeleteMatch(line)
103+
_, err = metric.ProcessDeleteMatch(line.Line)
102104
if err != nil {
103105
fmt.Fprintf(os.Stderr, "WARNING: skipping log line: %v\n", err.Error())
104-
fmt.Fprintf(os.Stderr, "%v\n", line)
106+
fmt.Fprintf(os.Stderr, "%v\n", line.Line)
105107
nErrorsByMetric.WithLabelValues(metric.Name()).Inc()
106108
}
107109
// TODO: create metric to monitor number of matching delete_patterns
@@ -269,16 +271,20 @@ func startServer(cfg v2.ServerConfig, handler http.Handler) chan error {
269271
return serverErrors
270272
}
271273

272-
func startTailer(cfg *v2.Config) (tailer.Tailer, error) {
274+
func startTailer(cfg *v2.Config) (fswatcher.FileTailer, error) {
273275
logger := logrus.New()
274276
logger.Level = logrus.WarnLevel
275-
var tail tailer.Tailer
277+
var tail fswatcher.FileTailer
278+
g, err := glob.FromPath(cfg.Input.Path)
279+
if err != nil {
280+
return nil, err
281+
}
276282
switch {
277283
case cfg.Input.Type == "file":
278284
if cfg.Input.PollInterval == 0 {
279-
tail = tailer.RunFseventFileTailer(cfg.Input.Path, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger)
285+
tail, err = fswatcher.RunFileTailer([]glob.Glob{g}, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger)
280286
} else {
281-
tail = tailer.RunPollingFileTailer(cfg.Input.Path, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger)
287+
tail, err = fswatcher.RunPollingFileTailer([]glob.Glob{g}, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger)
282288
}
283289
case cfg.Input.Type == "stdin":
284290
tail = tailer.RunStdinTailer()

tailer/bufferedTailer.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,31 @@ package tailer
1616

1717
import (
1818
"container/list"
19+
"github.com/fstab/grok_exporter/tailer/fswatcher"
1920
"log"
2021
"sync"
2122
"time"
2223
)
2324

24-
// implements Tailer
25+
// implements fswatcher.FileTailer
2526
type bufferedTailer struct {
26-
out chan string
27-
orig Tailer
27+
out chan fswatcher.Line
28+
orig fswatcher.FileTailer
2829
}
2930

30-
func (b *bufferedTailer) Lines() chan string {
31+
func (b *bufferedTailer) Lines() chan fswatcher.Line {
3132
return b.out
3233
}
3334

34-
func (b *bufferedTailer) Errors() chan Error {
35+
func (b *bufferedTailer) Errors() chan fswatcher.Error {
3536
return b.orig.Errors()
3637
}
3738

3839
func (b *bufferedTailer) Close() {
3940
b.orig.Close()
4041
}
4142

42-
func BufferedTailer(orig Tailer) Tailer {
43+
func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
4344
return BufferedTailerWithMetrics(orig, &noopMetric{})
4445
}
4546

@@ -102,10 +103,10 @@ func BufferedTailer(orig Tailer) Tailer {
102103
//
103104
// To minimize the risk, use the buffered tailer to make sure file system events are handled
104105
// as quickly as possible without waiting for the grok patterns to be processed.
105-
func BufferedTailerWithMetrics(orig Tailer, bufferLoadMetric BufferLoadMetric) Tailer {
106+
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric) fswatcher.FileTailer {
106107
buffer := list.New()
107108
bufferSync := sync.NewCond(&sync.Mutex{}) // coordinate producer and consumer
108-
out := make(chan string)
109+
out := make(chan fswatcher.Line)
109110

110111
// producer
111112
go func() {
@@ -155,7 +156,7 @@ func BufferedTailerWithMetrics(orig Tailer, bufferLoadMetric BufferLoadMetric) T
155156
buffer.Remove(first)
156157
bufferSync.L.Unlock()
157158
switch line := first.Value.(type) {
158-
case string:
159+
case fswatcher.Line:
159160
out <- line
160161
default:
161162
// this cannot happen

tailer/bufferedTailer_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,22 @@ package tailer
1616

1717
import (
1818
"fmt"
19+
"github.com/fstab/grok_exporter/tailer/fswatcher"
1920
"math/rand"
2021
"sync"
2122
"testing"
2223
"time"
2324
)
2425

2526
type sourceTailer struct {
26-
lines chan string
27+
lines chan fswatcher.Line
2728
}
2829

29-
func (tail *sourceTailer) Lines() chan string {
30+
func (tail *sourceTailer) Lines() chan fswatcher.Line {
3031
return tail.lines
3132
}
3233

33-
func (tail *sourceTailer) Errors() chan Error {
34+
func (tail *sourceTailer) Errors() chan fswatcher.Error {
3435
return nil
3536
}
3637

@@ -40,15 +41,15 @@ func (tail *sourceTailer) Close() {
4041

4142
// First produce 10,000 lines, then consume 10,000 lines.
4243
func TestLineBufferSequential(t *testing.T) {
43-
src := &sourceTailer{lines: make(chan string)}
44+
src := &sourceTailer{lines: make(chan fswatcher.Line)}
4445
metric := &peakLoadMetric{}
4546
buffered := BufferedTailerWithMetrics(src, metric)
4647
for i := 1; i <= 10000; i++ {
47-
src.lines <- fmt.Sprintf("This is line number %v.", i)
48+
src.lines <- fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
4849
}
4950
for i := 1; i <= 10000; i++ {
5051
line := <-buffered.Lines()
51-
if line != fmt.Sprintf("This is line number %v.", i) {
52+
if line.Line != fmt.Sprintf("This is line number %v.", i) {
5253
t.Errorf("Expected 'This is line number %v', but got '%v'.", i, line)
5354
}
5455
}
@@ -76,14 +77,14 @@ func TestLineBufferSequential(t *testing.T) {
7677

7778
// Produce and consume in parallel.
7879
func TestLineBufferParallel(t *testing.T) {
79-
src := &sourceTailer{lines: make(chan string)}
80+
src := &sourceTailer{lines: make(chan fswatcher.Line)}
8081
metric := &peakLoadMetric{}
8182
buffered := BufferedTailerWithMetrics(src, metric)
8283
var wg sync.WaitGroup
8384
go func() {
8485
start := time.Now()
8586
for i := 1; i <= 10000; i++ {
86-
src.lines <- fmt.Sprintf("This is line number %v.", i)
87+
src.lines <- fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
8788
if rand.Int()%64 == 0 { // Sleep from time to time
8889
time.Sleep(10 * time.Millisecond)
8990
}
@@ -95,7 +96,7 @@ func TestLineBufferParallel(t *testing.T) {
9596
start := time.Now()
9697
for i := 1; i <= 10000; i++ {
9798
line := <-buffered.Lines()
98-
if line != fmt.Sprintf("This is line number %v.", i) {
99+
if line.Line != fmt.Sprintf("This is line number %v.", i) {
99100
t.Errorf("Expected 'This is line number %v', but got '%v'.", i, line)
100101
}
101102
if rand.Int()%64 == 0 { // Sleep from time to time

tailer/fileTailer_test.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package tailer
1616

1717
import (
1818
"fmt"
19+
"github.com/fstab/grok_exporter/tailer/fswatcher"
20+
"github.com/fstab/grok_exporter/tailer/glob"
1921
"github.com/sirupsen/logrus"
2022
"io/ioutil"
2123
"os"
@@ -166,7 +168,14 @@ func TestVisibleInOSXFinder(t *testing.T) {
166168
logFileWriter := newLogFileWriter(t, logfile, closeFileAfterEachLine)
167169
defer logFileWriter.close(t)
168170
logFileWriter.writeLine(t, log, "test line 1")
169-
tail := RunFseventFileTailer(logfile, false, true, log)
171+
g, err := glob.FromPath(logfile)
172+
if err != nil {
173+
t.Fatal(err)
174+
}
175+
tail, err := fswatcher.RunFileTailer([]glob.Glob{g}, false, true, log)
176+
if err != nil {
177+
t.Fatalf("failed to start file tailer: %v", err)
178+
}
170179
defer func() {
171180
tail.Close()
172181
// wait until closed
@@ -204,7 +213,14 @@ func TestFileMissingOnStartup(t *testing.T) {
204213
tmpDir := mkTmpDirOrFail(t)
205214
defer cleanUp(t, tmpDir)
206215
var logfile = fmt.Sprintf("%s%c%s", tmpDir, os.PathSeparator, logfileName)
207-
tail := RunFseventFileTailer(logfile, true, false, log)
216+
g, err := glob.FromPath(logfile)
217+
if err != nil {
218+
t.Fatal(err)
219+
}
220+
tail, err := fswatcher.RunFileTailer([]glob.Glob{g}, true, false, log)
221+
if err != nil {
222+
t.Fatalf("failed to start file system tailer: %v", err)
223+
}
208224
defer func() {
209225
tail.Close()
210226
// wait until closed
@@ -226,7 +242,7 @@ func TestFileMissingOnStartup(t *testing.T) {
226242
}()
227243

228244
// Double check that file does not exist yet
229-
_, err := os.Stat(logfile)
245+
_, err = os.Stat(logfile)
230246
if err == nil || !os.IsNotExist(err) {
231247
t.Fatalf("%v should not exist yet", logfile)
232248
}
@@ -265,12 +281,19 @@ func testLogrotate(t *testing.T, log logrus.FieldLogger, watcherOpt watcherType,
265281
logFileWriter.writeLine(t, log, "test line 1")
266282
logFileWriter.writeLine(t, log, "test line 2")
267283

268-
var tail Tailer
284+
var tail fswatcher.FileTailer
285+
g, err := glob.FromPath(logfile)
286+
if err != nil {
287+
t.Fatal(err)
288+
}
269289
switch watcherOpt {
270290
case fsevent:
271-
tail = RunFseventFileTailer(logfile, true, true, log)
291+
tail, err = fswatcher.RunFileTailer([]glob.Glob{g}, true, true, log)
272292
case polling:
273-
tail = RunPollingFileTailer(logfile, true, true, 10*time.Millisecond, log)
293+
tail, err = fswatcher.RunPollingFileTailer([]glob.Glob{g}, true, true, 10*time.Millisecond, log)
294+
}
295+
if err != nil {
296+
t.Fatalf("failed to start file system watcher: %v", err)
274297
}
275298
tail = BufferedTailer(tail)
276299
defer func() {
@@ -608,15 +631,15 @@ func truncateOrFail(t *testing.T, logfile string) {
608631
}
609632
}
610633

611-
func expect(t *testing.T, log logrus.FieldLogger, c chan string, line string, timeout time.Duration) {
634+
func expect(t *testing.T, log logrus.FieldLogger, lines chan fswatcher.Line, line string, timeout time.Duration) {
612635
timeoutChan := make(chan bool, 1)
613636
go func() {
614637
time.Sleep(timeout)
615638
close(timeoutChan)
616639
}()
617640
select {
618-
case result := <-c:
619-
if result != line {
641+
case result := <-lines:
642+
if result.Line != line {
620643
t.Fatalf("Expected '%v', but got '%v'.", line, result)
621644
} else {
622645
log.Debugf("Read expected line '%v'\n", line)

tailer/fswatcher/fswatcher_windows.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,8 @@ func (w *watcher) processEvent(t *fileTailer, fsevent fsevent, log logrus.FieldL
150150
return nil
151151
}
152152

153-
func isTruncated(file *os.File) (bool, Error) {
154-
currentPos, err := file.Seek(0, io.SeekCurrent)
155-
if err != nil {
156-
return false, NewError(NotSpecified, os.NewSyscallError("seek", err), file.Name())
157-
}
158-
fileInfo, err := file.Stat()
159-
if err != nil {
160-
return false, NewError(NotSpecified, os.NewSyscallError("stat", err), file.Name())
161-
}
162-
return currentPos > fileInfo.Size(), nil
153+
func isTruncated(file *File) (bool, Error) {
154+
return file.CheckTruncated()
163155
}
164156

165157
func findSameFile(t *fileTailer, newFileInfo *fileInfo, path string) (*fileWithReader, Error) {

tailer/fswatcher/pollingwatcher.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fswatcher
22

33
import (
44
"github.com/sirupsen/logrus"
5+
"io"
56
"time"
67
)
78

@@ -26,6 +27,23 @@ func (w *pollingWatcher) processEvent(t *fileTailer, fsevent fsevent, log logrus
2627
return err
2728
}
2829
}
30+
for _, file := range t.watchedFiles {
31+
truncated, err := isTruncated(file.file)
32+
if err != nil {
33+
return NewErrorf(NotSpecified, err, "%v: seek() or stat() failed", file.file.Name())
34+
}
35+
if truncated {
36+
_, err = file.file.Seek(0, io.SeekStart)
37+
if err != nil {
38+
return NewErrorf(NotSpecified, err, "%v: seek() failed", file.file.Name())
39+
}
40+
file.reader.Clear()
41+
}
42+
readErr := t.readNewLines(file, log)
43+
if readErr != nil {
44+
return readErr
45+
}
46+
}
2947
return nil
3048
}
3149

tailer/glob/glob.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,17 @@ func (g Glob) Match(path string) bool {
5151
return matched
5252
}
5353

54+
// The file tailer implementation switched from watching single paths to globs,
55+
// but the rest of grok_exporter just supports single files.
56+
// FromPath creates a Glob from a file path, so that we can use the new file
57+
// tailers but be sure only a single file is watched.
58+
func FromPath(path string) (Glob, error) {
59+
if containsWildcards(path) {
60+
return "", fmt.Errorf("%v: illegal file name", path)
61+
}
62+
return Parse(path)
63+
}
64+
5465
func containsWildcards(pattern string) bool {
5566
p := []rune(pattern)
5667
escaped := false // p[i] is escaped by '\\'

0 commit comments

Comments
 (0)