Skip to content

Commit 331ed69

Browse files
committed
use buffered tailer in fileTailer_test
1 parent 13df909 commit 331ed69

File tree

5 files changed

+64
-40
lines changed

5 files changed

+64
-40
lines changed

grok_exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,5 +282,5 @@ func startTailer(cfg *v2.Config) (tailer.Tailer, error) {
282282
default:
283283
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
284284
}
285-
return exporter.BufferedTailerWithMetrics(tail), nil
285+
return tailer.BufferedTailerWithMetrics(tail), nil
286286
}
Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,27 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package exporter
15+
package tailer
1616

1717
import (
1818
"container/list"
19-
"github.com/fstab/grok_exporter/tailer"
2019
"github.com/prometheus/client_golang/prometheus"
2120
"log"
2221
"sync"
2322
"time"
2423
)
2524

26-
// implements tailer.Tailer
25+
// implements Tailer
2726
type bufferedTailerWithMetrics struct {
2827
out chan string
29-
orig tailer.Tailer
28+
orig Tailer
3029
}
3130

3231
func (b *bufferedTailerWithMetrics) Lines() chan string {
3332
return b.out
3433
}
3534

36-
func (b *bufferedTailerWithMetrics) Errors() chan tailer.Error {
35+
func (b *bufferedTailerWithMetrics) Errors() chan Error {
3736
return b.orig.Errors()
3837
}
3938

@@ -46,7 +45,61 @@ func (b *bufferedTailerWithMetrics) Close() {
4645
// and does not need to wait until the lines are processed.
4746
// The number of buffered lines are exposed as a Prometheus metric, if lines are constantly
4847
// produced faster than they are consumed, we will eventually run out of memory.
49-
func BufferedTailerWithMetrics(orig tailer.Tailer) tailer.Tailer {
48+
//
49+
// ---
50+
// The buffered tailer prevents the following error (this can be reproduced on Windows,
51+
// where we don't keep the logfile open):
52+
//
53+
// Example test actions
54+
// --------------------
55+
//
56+
// Sequence of actions simulated in fileTailer_test:
57+
//
58+
// 1) write line a
59+
// 2) write line b
60+
// 3) move the old logfile away and create a new logfile
61+
// 4) write line c
62+
//
63+
// Good case event processing
64+
// --------------------------
65+
//
66+
// How Events.Process() should process the file system events triggered by the actions above:
67+
//
68+
// 1) MODIFIED : Process() reads line a
69+
// 2) MODIFIED : Process() reads line b
70+
// 3) MOVED_FROM, CREATED : Process() resets the line reader and seeks the file to position 0
71+
// 4) MODIFIED : Process() reads line c
72+
//
73+
// Bad case event processing
74+
// -------------------------
75+
//
76+
// When Events.Process() receives a MODIFIED event, it does not know how many lines have been written.
77+
// Therefore, it reads all new lines until EOF is reached.
78+
// If line processing is slow (writing to the lines channel blocks until all grok patterns are processed),
79+
// we might read 'line b' while we are still processing the first MODIFIED event:
80+
//
81+
// 1) MODIFIED : Process() reads 'line a' and 'line b'
82+
//
83+
// Meanwhile, the test continues with steps 3 and 4, moving the logfile away, creating a new logfile,
84+
// and writing 'line c'. When the tailer receives the second MODIFIED event, it learns that the file
85+
// has been truncated, seeks to position 0, and reads 'line c'.
86+
//
87+
// 2) MODIFIED : Process() detects the truncated file, seeks to position 0, reads 'line c'
88+
//
89+
// The tailer now receives MOVED_FROM, which makes it close the logfile, CREATED, which makes
90+
// it open the logfile and start reading from position 0:
91+
//
92+
// 3) MOVED_FROM, CREATED : seek to position 0, read line c again !!!
93+
//
94+
// When the last MODIFIED event is processed, there are no more changes in the file:
95+
//
96+
// 4) MODIFIED : no changes in file
97+
//
98+
// As a result, we read 'line c' two times.
99+
//
100+
// To minimize the risk, use the buffered tailer to make sure file system events are handled
101+
// as quickly as possible without waiting for the grok patterns to be processed.
102+
func BufferedTailerWithMetrics(orig Tailer) Tailer {
50103
buffer := list.New()
51104
bufferSync := sync.NewCond(&sync.Mutex{}) // coordinate producer and consumer
52105
out := make(chan string)
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package exporter
15+
package tailer
1616

1717
import (
1818
"fmt"
19-
"github.com/fstab/grok_exporter/tailer"
2019
"math/rand"
2120
"sync"
2221
"testing"
@@ -31,7 +30,7 @@ func (tail *sourceTailer) Lines() chan string {
3130
return tail.lines
3231
}
3332

34-
func (tail *sourceTailer) Errors() chan tailer.Error {
33+
func (tail *sourceTailer) Errors() chan Error {
3534
return nil
3635
}
3736

tailer/fileTailer.go

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -60,36 +60,7 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
6060
logger = &nilLogger{}
6161
}
6262

63-
// Why does the lines channel have a size of 10?
64-
// ---
65-
// In fileTailer_test, we read a line from the channel, then we run the next action
66-
// on the logfile (like writing the next line, simulating logrotate, etc.).
67-
//
68-
// Example:
69-
// 1) write line a
70-
// 2) write line b
71-
// 3) move the old logfile away and create a new logfile
72-
// 4) write line c
73-
//
74-
// With this example, we would expect 4 calls to Events.Process() with the following events:
75-
// 1) MODIFIED : Process() reads line a
76-
// 2) MODIFIED : Process() reads line b
77-
// 3) MOVED_FROM, CREATED : Process() resets the line reader and seeks the file to position 0
78-
// 4) MODIFIED : Process() reads line c
79-
//
80-
// However, there is a race condition: When the line reader is slow, it does not hit EOF
81-
// before line b is written. The reader keeps reading upon the 1st MODIFIED event.
82-
// 1) MODIFIED : Process() reads line a and line b
83-
// 2) MODIFIED : Process() detects the truncated file, seeks to position 0, reads line c
84-
// 3) MOVED_FROM, CREATED : seek to position 0, read line c again !!!
85-
// 4) MODIFIED : no changes in file
86-
//
87-
// As a result, we read 'line c' two times.
88-
// This problem occurs especially on Windows where we don't keep the file open.
89-
//
90-
// To minimize the risk, we give the lines channel size 10, so that the line reader can
91-
// continue reading the next few lines and does not need to wait until the lines are processed.
92-
lines := make(chan string, 10)
63+
lines := make(chan string)
9364
done := make(chan struct{})
9465
errors := make(chan Error)
9566

tailer/fileTailer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ func testLogrotate(t *testing.T, log simpleLogger, watcherOpt watcherType, logro
245245
case polling:
246246
tail = RunPollingFileTailer(logfile, true, true, 10*time.Millisecond, log)
247247
}
248+
tail = BufferedTailerWithMetrics(tail)
248249
defer tail.Close()
249250

250251
// We don't expect errors. However, start a go-routine listening on

0 commit comments

Comments
 (0)