Skip to content

Commit 49d9be3

Browse files
committed
remove prometheus dependency from tailer package
1 parent 331ed69 commit 49d9be3

File tree

4 files changed

+97
-23
lines changed

4 files changed

+97
-23
lines changed

grok_exporter.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,5 +282,26 @@ func startTailer(cfg *v2.Config) (tailer.Tailer, error) {
282282
default:
283283
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
284284
}
285-
return tailer.BufferedTailerWithMetrics(tail), nil
285+
return tailer.BufferedTailerWithMetrics(tail, &bufferLoadMetric{}), nil
286+
}
287+
288+
type bufferLoadMetric struct {
289+
bufferLoad prometheus.Summary
290+
}
291+
292+
func (m *bufferLoadMetric) Register() {
293+
bufferLoad := prometheus.NewSummary(prometheus.SummaryOpts{
294+
Name: "grok_exporter_line_buffer_peak_load",
295+
Help: "Number of lines that are read from the logfile and waiting to be processed. Peak value per second.",
296+
})
297+
prometheus.MustRegister(bufferLoad)
298+
m.bufferLoad = bufferLoad
299+
}
300+
301+
func (m *bufferLoadMetric) Unregister() {
302+
prometheus.Unregister(m.bufferLoad)
303+
}
304+
305+
func (m *bufferLoadMetric) Observe(currentLoad float64) {
306+
m.bufferLoad.Observe(currentLoad)
286307
}

tailer/bufferedTailer.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,33 @@ package tailer
1616

1717
import (
1818
"container/list"
19-
"github.com/prometheus/client_golang/prometheus"
2019
"log"
2120
"sync"
2221
"time"
2322
)
2423

2524
// implements Tailer
26-
type bufferedTailerWithMetrics struct {
25+
type bufferedTailer struct {
2726
out chan string
2827
orig Tailer
2928
}
3029

31-
func (b *bufferedTailerWithMetrics) Lines() chan string {
30+
func (b *bufferedTailer) Lines() chan string {
3231
return b.out
3332
}
3433

35-
func (b *bufferedTailerWithMetrics) Errors() chan Error {
34+
func (b *bufferedTailer) Errors() chan Error {
3635
return b.orig.Errors()
3736
}
3837

39-
func (b *bufferedTailerWithMetrics) Close() {
38+
func (b *bufferedTailer) Close() {
4039
b.orig.Close()
4140
}
4241

42+
func BufferedTailer(orig Tailer) Tailer {
43+
return BufferedTailerWithMetrics(orig, &noopMetric{})
44+
}
45+
4346
// Wrapper around a tailer that consumes the lines channel quickly.
4447
// The idea is that the original tailer can continue reading lines from the logfile,
4548
// and does not need to wait until the lines are processed.
@@ -99,42 +102,38 @@ func (b *bufferedTailerWithMetrics) Close() {
99102
//
100103
// To minimize the risk, use the buffered tailer to make sure file system events are handled
101104
// as quickly as possible without waiting for the grok patterns to be processed.
102-
func BufferedTailerWithMetrics(orig Tailer) Tailer {
105+
func BufferedTailerWithMetrics(orig Tailer, bufferLoadMetric BufferLoadMetric) Tailer {
103106
buffer := list.New()
104107
bufferSync := sync.NewCond(&sync.Mutex{}) // coordinate producer and consumer
105108
out := make(chan string)
106109

107110
// producer
108111
go func() {
109-
bufferLoad := prometheus.NewSummary(prometheus.SummaryOpts{
110-
Name: "grok_exporter_line_buffer_peak_load",
111-
Help: "Number of lines that are read from the logfile and waiting to be processed. Peak value per second.",
112-
})
113-
prometheus.MustRegister(bufferLoad)
112+
bufferLoadMetric.Register()
114113
bufferLoadPeakValue := 0
115114
tick := time.NewTicker(1 * time.Second)
116115
for {
117116
select {
118117
case line, ok := <-orig.Lines():
119118
if ok {
120119
bufferSync.L.Lock()
120+
buffer.PushBack(line)
121121
if buffer.Len() > bufferLoadPeakValue {
122122
bufferLoadPeakValue = buffer.Len()
123123
}
124-
buffer.PushBack(line)
125124
bufferSync.Signal()
126125
bufferSync.L.Unlock()
127126
} else {
128127
bufferSync.L.Lock()
129128
buffer = nil // make the consumer quit
130129
bufferSync.Signal()
131130
bufferSync.L.Unlock()
132-
prometheus.Unregister(bufferLoad)
131+
bufferLoadMetric.Unregister()
133132
tick.Stop()
134133
return
135134
}
136135
case <-tick.C:
137-
bufferLoad.Observe(float64(bufferLoadPeakValue))
136+
bufferLoadMetric.Observe(float64(bufferLoadPeakValue))
138137
bufferLoadPeakValue = 0
139138
}
140139
}
@@ -164,8 +163,20 @@ func BufferedTailerWithMetrics(orig Tailer) Tailer {
164163
}
165164
}
166165
}()
167-
return &bufferedTailerWithMetrics{
166+
return &bufferedTailer{
168167
out: out,
169168
orig: orig,
170169
}
171170
}
171+
172+
type BufferLoadMetric interface {
173+
Register()
174+
Observe(currentLoad float64)
175+
Unregister()
176+
}
177+
178+
type noopMetric struct{}
179+
180+
func (m *noopMetric) Register() {}
181+
func (m *noopMetric) Observe(currentLoad float64) {}
182+
func (m *noopMetric) Unregister() {}

tailer/bufferedTailer_test.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,19 @@ func (tail *sourceTailer) Close() {
4141
// First produce 10,000 lines, then consume 10,000 lines.
4242
func TestLineBufferSequential(t *testing.T) {
4343
src := &sourceTailer{lines: make(chan string)}
44-
buffered := BufferedTailerWithMetrics(src)
45-
for i := 0; i < 10000; i++ {
44+
metric := &peakLoadMetric{}
45+
buffered := BufferedTailerWithMetrics(src, metric)
46+
for i := 1; i <= 10000; i++ {
4647
src.lines <- fmt.Sprintf("This is line number %v.", i)
4748
}
48-
for i := 0; i < 10000; i++ {
49+
for i := 1; i <= 10000; i++ {
4950
line := <-buffered.Lines()
5051
if line != fmt.Sprintf("This is line number %v.", i) {
5152
t.Errorf("Expected 'This is line number %v', but got '%v'.", i, line)
5253
}
5354
}
55+
// wait until peak load is observed (buffered tailer observes the max of each 1 Sec interval)
56+
time.Sleep(1100 * time.Millisecond)
5457
buffered.Close()
5558
_, stillOpen := <-buffered.Lines()
5659
if stillOpen {
@@ -60,16 +63,26 @@ func TestLineBufferSequential(t *testing.T) {
6063
if stillOpen {
6164
t.Error("Source tailer was not closed.")
6265
}
66+
if !metric.registerCalled {
67+
t.Error("metric.Register() not called.")
68+
}
69+
if !metric.unregisterCalled {
70+
t.Error("metric.Unregister() not called.")
71+
}
72+
// The peak load should be 9999 or 9998, depending on how quick
73+
// the consumer loop started reading
74+
fmt.Printf("peak load: %v\n", metric.peakLoad)
6375
}
6476

6577
// Produce and consume in parallel.
6678
func TestLineBufferParallel(t *testing.T) {
6779
src := &sourceTailer{lines: make(chan string)}
68-
buffered := BufferedTailerWithMetrics(src)
80+
metric := &peakLoadMetric{}
81+
buffered := BufferedTailerWithMetrics(src, metric)
6982
var wg sync.WaitGroup
7083
go func() {
7184
start := time.Now()
72-
for i := 0; i < 10000; i++ {
85+
for i := 1; i <= 10000; i++ {
7386
src.lines <- fmt.Sprintf("This is line number %v.", i)
7487
if rand.Int()%64 == 0 { // Sleep from time to time
7588
time.Sleep(10 * time.Millisecond)
@@ -80,7 +93,7 @@ func TestLineBufferParallel(t *testing.T) {
8093
}()
8194
go func() {
8295
start := time.Now()
83-
for i := 0; i < 10000; i++ {
96+
for i := 1; i <= 10000; i++ {
8497
line := <-buffered.Lines()
8598
if line != fmt.Sprintf("This is line number %v.", i) {
8699
t.Errorf("Expected 'This is line number %v', but got '%v'.", i, line)
@@ -94,6 +107,8 @@ func TestLineBufferParallel(t *testing.T) {
94107
}()
95108
wg.Add(2)
96109
wg.Wait()
110+
// wait until peak load is observed (buffered tailer observes the max of each 1 Sec interval)
111+
time.Sleep(1100 * time.Millisecond)
97112
buffered.Close()
98113
_, stillOpen := <-buffered.Lines()
99114
if stillOpen {
@@ -103,4 +118,31 @@ func TestLineBufferParallel(t *testing.T) {
103118
if stillOpen {
104119
t.Error("Source tailer was not closed.")
105120
}
121+
if !metric.registerCalled {
122+
t.Error("metric.Register() not called.")
123+
}
124+
if !metric.unregisterCalled {
125+
t.Error("metric.Unregister() not called.")
126+
}
127+
// Should be much less than 10000, because consumer and producer work in parallel.
128+
fmt.Printf("peak load: %v\n", metric.peakLoad)
129+
}
130+
131+
type peakLoadMetric struct {
132+
registerCalled, unregisterCalled bool
133+
peakLoad float64
134+
}
135+
136+
func (m *peakLoadMetric) Register() {
137+
m.registerCalled = true
138+
}
139+
140+
func (m *peakLoadMetric) Observe(currentLoad float64) {
141+
if currentLoad > m.peakLoad {
142+
m.peakLoad = currentLoad
143+
}
144+
}
145+
146+
func (m *peakLoadMetric) Unregister() {
147+
m.unregisterCalled = true
106148
}

tailer/fileTailer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func testLogrotate(t *testing.T, log simpleLogger, watcherOpt watcherType, logro
245245
case polling:
246246
tail = RunPollingFileTailer(logfile, true, true, 10*time.Millisecond, log)
247247
}
248-
tail = BufferedTailerWithMetrics(tail)
248+
tail = BufferedTailer(tail)
249249
defer tail.Close()
250250

251251
// We don't expect errors. However, start a go-routine listening on

0 commit comments

Comments
 (0)