Skip to content

Commit 13df909

Browse files
committed
fix race condition with line reader
1 parent 966ce3b commit 13df909

File tree

4 files changed

+60
-22
lines changed

4 files changed

+60
-22
lines changed

tailer/fileTailer.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,36 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
6060
logger = &nilLogger{}
6161
}
6262

63-
lines := make(chan string)
63+
// Why does the lines channel have a size of 10?
64+
// ---
65+
// In fileTailer_test, we read a line from the channel, then we run the next action
66+
// on the logfile (like writing the next line, simulating logrotate, etc.).
67+
//
68+
// Example:
69+
// 1) write line a
70+
// 2) write line b
71+
// 3) move the old logfile away and create a new logfile
72+
// 4) write line c
73+
//
74+
// With this example, we would expect 4 calls to Events.Process() with the following events:
75+
// 1) MODIFIED : Process() reads line a
76+
// 2) MODIFIED : Process() reads line b
77+
// 3) MOVED_FROM, CREATED : Process() resets the line reader and seeks the file to position 0
78+
// 4) MODIFIED : Process() reads line c
79+
//
80+
// However, there is a race condition: When the line reader is slow, it does not hit EOF
81+
// before line b is written. The reader keeps reading upon the 1st MODIFIED event.
82+
// 1) MODIFIED : Process() reads line a and line b
83+
// 2) MODIFIED : Process() detects the truncated file, seeks to position 0, reads line c
84+
// 3) MOVED_FROM, CREATED : seek to position 0, read line c again !!!
85+
// 4) MODIFIED : no changes in file
86+
//
87+
// As a result, we read 'line c' two times.
88+
// This problem occurs especially on Windows where we don't keep the file open.
89+
//
90+
// To minimize the risk, we give the lines channel size 10, so that the line reader can
91+
// continue reading the next few lines and does not need to wait until the lines are processed.
92+
lines := make(chan string, 10)
6493
done := make(chan struct{})
6594
errors := make(chan Error)
6695

tailer/fileTailer_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,11 @@ func (l *closeFileAfterEachLineLogFileWriter) writeLine(t *testing.T, log simple
305305
if err != nil {
306306
t.Fatalf("%v: Failed to open file for writing: %v", l.path, err.Error())
307307
}
308-
_, err = f.WriteString(fmt.Sprintf("%v\n", line))
308+
newline := "\n"
309+
if runtime.GOOS == "windows" {
310+
newline = "\r\n"
311+
}
312+
_, err = f.WriteString(fmt.Sprintf("%v%v", line, newline))
309313
if err != nil {
310314
t.Fatalf("%v: Failed to write to file: %v", l.path, err.Error())
311315
}

tailer/fileTailer_windows.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,19 @@ func (event *event) Process(fileBefore *File, reader *lineReader, abspath string
114114
)
115115
logger.Debug("File system watcher received %v.\n", event.String())
116116

117-
// WRITE or TRUNCATE
118-
if file != nil && norm(event.Name) == norm(abspath) && event.Mask&winfsnotify.FS_MODIFY == winfsnotify.FS_MODIFY {
119-
truncated, err = file.CheckTruncated()
117+
// MOVED_FROM or DELETE
118+
if file != nil && norm(event.Name) == norm(abspath) && (event.Mask&winfsnotify.FS_MOVED_FROM == winfsnotify.FS_MOVED_FROM || event.Mask&winfsnotify.FS_DELETE == winfsnotify.FS_DELETE) {
119+
file = nil
120+
reader.Clear()
121+
}
122+
123+
// MOVED_TO or CREATE
124+
if file == nil && norm(event.Name) == norm(abspath) && (event.Mask&winfsnotify.FS_MOVED_TO == winfsnotify.FS_MOVED_TO || event.Mask&winfsnotify.FS_CREATE == winfsnotify.FS_CREATE) {
125+
file, err = open(abspath)
120126
if err != nil {
121127
return
122128
}
123-
if truncated {
124-
_, err = file.Seek(0, io.SeekStart)
125-
if err != nil {
126-
return
127-
}
128-
reader.Clear()
129-
}
129+
reader.Clear()
130130
for {
131131
line, eof, err = reader.ReadLine(file)
132132
if err != nil {
@@ -139,19 +139,19 @@ func (event *event) Process(fileBefore *File, reader *lineReader, abspath string
139139
}
140140
}
141141

142-
// MOVED_FROM or DELETE
143-
if file != nil && norm(event.Name) == norm(abspath) && (event.Mask&winfsnotify.FS_MOVED_FROM == winfsnotify.FS_MOVED_FROM || event.Mask&winfsnotify.FS_DELETE == winfsnotify.FS_DELETE) {
144-
file = nil
145-
reader.Clear()
146-
}
147-
148-
// MOVED_TO or CREATE
149-
if file == nil && norm(event.Name) == norm(abspath) && (event.Mask&winfsnotify.FS_MOVED_TO == winfsnotify.FS_MOVED_TO || event.Mask&winfsnotify.FS_CREATE == winfsnotify.FS_CREATE) {
150-
file, err = open(abspath)
142+
// WRITE or TRUNCATE
143+
if file != nil && norm(event.Name) == norm(abspath) && event.Mask&winfsnotify.FS_MODIFY == winfsnotify.FS_MODIFY {
144+
truncated, err = file.CheckTruncated()
151145
if err != nil {
152146
return
153147
}
154-
reader.Clear()
148+
if truncated {
149+
_, err = file.Seek(0, io.SeekStart)
150+
if err != nil {
151+
return
152+
}
153+
reader.Clear()
154+
}
155155
for {
156156
line, eof, err = reader.ReadLine(file)
157157
if err != nil {
@@ -163,6 +163,7 @@ func (event *event) Process(fileBefore *File, reader *lineReader, abspath string
163163
lines = append(lines, line)
164164
}
165165
}
166+
166167
return
167168
}
168169

tailer/file_windows.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ func (f *File) Seek(offset int64, whence int) (int64, error) {
4848
return 0, err
4949
}
5050
defer file.Close()
51+
_, err = file.Seek(f.currentPos, io.SeekStart)
52+
if err != nil {
53+
return 0, err
54+
}
5155
result, resultErr := file.Seek(offset, whence)
5256
f.currentPos, err = file.Seek(0, io.SeekCurrent)
5357
if err != nil {

0 commit comments

Comments
 (0)