Skip to content

Commit 966ce3b

Browse files
committed
separate line reader and output channel
1 parent 261b831 commit 966ce3b

File tree

8 files changed

+290
-193
lines changed

8 files changed

+290
-193
lines changed

tailer/fileTailer.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,23 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
107107
if eventLoop != nil {
108108
defer eventLoop.Close()
109109
}
110-
reader := NewBufferedLineReader(lines, done)
110+
reader := NewLineReader()
111111
if file != nil {
112112
// process all pre-existing lines
113-
finished, err := reader.ReadAvailableLines(file)
114-
if err != nil {
115-
writeError(errors, done, err, "failed to initialize file system watcher for %v", path)
116-
return
117-
} else if finished {
118-
return
113+
for {
114+
line, eof, err := reader.ReadLine(file)
115+
if err != nil {
116+
writeError(errors, done, err, "failed to initialize file system watcher for %v", path)
117+
return
118+
}
119+
if eof {
120+
break
121+
}
122+
select {
123+
case <-done:
124+
return
125+
case lines <- line:
126+
}
119127
}
120128
}
121129

@@ -148,11 +156,19 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
148156
}
149157
return
150158
}
151-
file, err = evnts.Process(file, reader, abspath, logger)
159+
var freshLines []string
160+
file, freshLines, err = evnts.Process(file, reader, abspath, logger)
152161
if err != nil {
153162
writeError(errors, done, err, "failed to watch %v", abspath)
154163
return
155164
}
165+
for _, line := range freshLines {
166+
select {
167+
case <-done:
168+
return
169+
case lines <- line:
170+
}
171+
}
156172
}
157173
}
158174
}()

tailer/fileTailer_darwin.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,13 @@ func (l *eventLoop) Events() chan Events {
139139
return l.events
140140
}
141141

142-
func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
142+
func (events *eventList) Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
143143
file = fileBefore
144-
var truncated bool
144+
lines = []string{}
145+
var (
146+
line string
147+
truncated, eof bool
148+
)
145149
logger.Debug("File system watcher received %v event(s):\n", len(events.events))
146150
for i, event := range events.events {
147151
logger.Debug("%v/%v: %v.\n", i+1, len(events.events), event2string(events.watcher.dir, file, event))
@@ -159,17 +163,23 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a
159163
if err != nil {
160164
return
161165
}
166+
reader.Clear()
162167
}
163168
}
164169
}
165170

166171
// Handle write event.
167172
for _, event := range events.events {
168173
if file != nil && event.Ident == fdToInt(file.Fd()) && event.Fflags&syscall.NOTE_WRITE == syscall.NOTE_WRITE {
169-
var finished bool
170-
finished, err = reader.ReadAvailableLines(file)
171-
if finished || err != nil {
172-
return
174+
for {
175+
line, eof, err = reader.ReadLine(file)
176+
if err != nil {
177+
return
178+
}
179+
if eof {
180+
break
181+
}
182+
lines = append(lines, line)
173183
}
174184
}
175185
}
@@ -194,10 +204,15 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a
194204
return
195205
}
196206
reader.Clear()
197-
var finished bool
198-
finished, err = reader.ReadAvailableLines(file)
199-
if finished || err != nil {
200-
return
207+
for {
208+
line, eof, err = reader.ReadLine(file)
209+
if err != nil {
210+
return
211+
}
212+
if eof {
213+
break
214+
}
215+
lines = append(lines, line)
201216
}
202217
} else {
203218
// If file could not be opened, the CREATE event was for another file, we ignore this.

tailer/fileTailer_linux.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,15 @@ func (l *eventLoop) Events() chan Events {
141141
return l.events
142142
}
143143

144-
func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
144+
func (events eventList) Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
145145
file = fileBefore
146-
filename := filepath.Base(abspath)
147-
var truncated bool
146+
lines = []string{}
147+
var (
148+
filename = filepath.Base(abspath)
149+
truncated bool
150+
line string
151+
eof bool
152+
)
148153
for _, event := range events {
149154
logger.Debug("File system watcher received %v.\n", event2string(event))
150155
}
@@ -161,11 +166,17 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab
161166
if err != nil {
162167
return
163168
}
169+
reader.Clear()
164170
}
165-
var finished bool
166-
finished, err = reader.ReadAvailableLines(file)
167-
if finished || err != nil {
168-
return
171+
for {
172+
line, eof, err = reader.ReadLine(file)
173+
if err != nil {
174+
return
175+
}
176+
if eof {
177+
break
178+
}
179+
lines = append(lines, line)
169180
}
170181
}
171182
}
@@ -187,10 +198,15 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab
187198
return
188199
}
189200
reader.Clear()
190-
var finished bool
191-
finished, err = reader.ReadAvailableLines(file)
192-
if finished || err != nil {
193-
return
201+
for {
202+
line, eof, err = reader.ReadLine(file)
203+
if err != nil {
204+
return
205+
}
206+
if eof {
207+
break
208+
}
209+
lines = append(lines, line)
194210
}
195211
}
196212
}

tailer/fileTailer_windows.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,13 @@ func (l *eventLoop) Events() chan Events {
105105
return l.events
106106
}
107107

108-
func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
108+
func (event *event) Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
109109
file = fileBefore
110-
var truncated bool
110+
lines = []string{}
111+
var (
112+
truncated, eof bool
113+
line string
114+
)
111115
logger.Debug("File system watcher received %v.\n", event.String())
112116

113117
// WRITE or TRUNCATE
@@ -121,11 +125,17 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat
121125
if err != nil {
122126
return
123127
}
128+
reader.Clear()
124129
}
125-
var finished bool
126-
finished, err = reader.ReadAvailableLines(file)
127-
if finished || err != nil {
128-
return
130+
for {
131+
line, eof, err = reader.ReadLine(file)
132+
if err != nil {
133+
return
134+
}
135+
if eof {
136+
break
137+
}
138+
lines = append(lines, line)
129139
}
130140
}
131141

@@ -142,10 +152,15 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat
142152
return
143153
}
144154
reader.Clear()
145-
var finished bool
146-
finished, err = reader.ReadAvailableLines(file)
147-
if finished || err != nil {
148-
return
155+
for {
156+
line, eof, err = reader.ReadLine(file)
157+
if err != nil {
158+
return
159+
}
160+
if eof {
161+
break
162+
}
163+
lines = append(lines, line)
149164
}
150165
}
151166
return

tailer/linereader.go

Lines changed: 41 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,72 +16,65 @@ package tailer
1616

1717
import (
1818
"bytes"
19-
"fmt"
2019
"io"
2120
)
2221

23-
type bufferedLineReader struct {
22+
type lineReader struct {
2423
remainingBytesFromLastRead []byte
25-
26-
// channels are used to stream the results out
27-
lines chan<- string
28-
done <-chan struct{}
2924
}
3025

31-
func NewBufferedLineReader(lines chan<- string, done <-chan struct{}) *bufferedLineReader {
32-
return &bufferedLineReader{
26+
func NewLineReader() *lineReader {
27+
return &lineReader{
3328
remainingBytesFromLastRead: []byte{},
34-
lines: lines,
35-
done: done,
3629
}
3730
}
3831

39-
func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) (bool, error) {
40-
41-
// for each buffer, split lines and stream
42-
buf := make([]byte, 512)
43-
var done bool
44-
32+
// read the next line from the file.
33+
// return values are (line, eof, err).
34+
// * line is the line read.
35+
// * eof is a boolean indicating if the end of file was reached before getting to the next '\n'.
36+
// * err is set if an error other than io.EOF has occurred. err is never io.EOF.
37+
// if eof is true, line is always "" and err always is nil.
38+
// if eof is false and err is nil, an empty line means that there actually was an empty line in the file.
39+
func (r *lineReader) ReadLine(file io.Reader) (string, bool, error) {
40+
var (
41+
err error
42+
buf = make([]byte, 512)
43+
n = 0
44+
)
4545
for {
46-
n, err := file.Read(buf)
47-
if n > 0 {
48-
// Callers should always process the n > 0 bytes returned before considering the error err.
49-
result := append(r.remainingBytesFromLastRead, buf[0:n]...)
50-
done, r.remainingBytesFromLastRead = r.processLines(result)
51-
if done {
52-
return true, nil
53-
}
54-
}
55-
if err != nil {
46+
newlinePos := bytes.IndexByte(r.remainingBytesFromLastRead, '\n')
47+
if newlinePos >= 0 {
48+
l := len(r.remainingBytesFromLastRead)
49+
result := make([]byte, newlinePos)
50+
copy(result, r.remainingBytesFromLastRead[:newlinePos])
51+
copy(r.remainingBytesFromLastRead, r.remainingBytesFromLastRead[newlinePos+1:])
52+
r.remainingBytesFromLastRead = r.remainingBytesFromLastRead[:l-(newlinePos+1)]
53+
return string(stripWindowsLineEnding(result)), false, nil
54+
} else if err != nil {
5655
if err == io.EOF {
57-
return false, nil
56+
return "", true, nil
5857
} else {
59-
return false, fmt.Errorf("read error: %v", err.Error())
58+
return "", false, err
59+
}
60+
} else {
61+
n, err = file.Read(buf)
62+
if n > 0 {
63+
// io.Reader: Callers should always process the n > 0 bytes returned before considering the error err.
64+
r.remainingBytesFromLastRead = append(r.remainingBytesFromLastRead, buf[0:n]...)
6065
}
6166
}
6267
}
6368
}
6469

65-
func (r *bufferedLineReader) Clear() {
66-
r.remainingBytesFromLastRead = []byte{}
70+
func stripWindowsLineEnding(s []byte) []byte {
71+
if len(s) > 0 && s[len(s)-1] == '\r' {
72+
return s[:len(s)-1]
73+
} else {
74+
return s
75+
}
6776
}
6877

69-
func (r *bufferedLineReader) processLines(data []byte) (finished bool, remainingBytes []byte) {
70-
newline := []byte("\n")
71-
for _, line := range bytes.SplitAfter(data, newline) {
72-
if bytes.HasSuffix(line, newline) {
73-
line = bytes.TrimSuffix(line, newline)
74-
line = bytes.TrimSuffix(line, []byte("\r")) // Needed for CRLF line endings?
75-
select {
76-
case r.lines <- string(line):
77-
case <-r.done:
78-
finished = true
79-
return
80-
}
81-
} else {
82-
// This is the last (incomplete) line returned by SplitAfter(). We will exit the for loop here.
83-
remainingBytes = line
84-
}
85-
}
86-
return
78+
func (r *lineReader) Clear() {
79+
r.remainingBytesFromLastRead = r.remainingBytesFromLastRead[:0]
8780
}

0 commit comments

Comments
 (0)