Skip to content

Commit 261b831

Browse files
authored
Merge pull request #48 from virtuald/streaming
tailer: don't load entire file into memory, stream it out bits at a time
2 parents b8c601e + 3b055bf commit 261b831

File tree

8 files changed

+131
-100
lines changed

8 files changed

+131
-100
lines changed

tailer/fileTailer.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,15 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
107107
if eventLoop != nil {
108108
defer eventLoop.Close()
109109
}
110-
reader := NewBufferedLineReader()
110+
reader := NewBufferedLineReader(lines, done)
111111
if file != nil {
112112
// process all pre-existing lines
113-
freshLines, err := reader.ReadAvailableLines(file)
113+
finished, err := reader.ReadAvailableLines(file)
114114
if err != nil {
115115
writeError(errors, done, err, "failed to initialize file system watcher for %v", path)
116116
return
117-
}
118-
for _, line := range freshLines {
119-
select {
120-
case <-done:
121-
return
122-
case lines <- line:
123-
}
117+
} else if finished {
118+
return
124119
}
125120
}
126121

@@ -153,19 +148,11 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
153148
}
154149
return
155150
}
156-
var freshLines []string
157-
file, freshLines, err = evnts.Process(file, reader, abspath, logger)
151+
file, err = evnts.Process(file, reader, abspath, logger)
158152
if err != nil {
159153
writeError(errors, done, err, "failed to watch %v", abspath)
160154
return
161155
}
162-
for _, line := range freshLines {
163-
select {
164-
case <-done:
165-
return
166-
case lines <- line:
167-
}
168-
}
169156
}
170157
}
171158
}()

tailer/fileTailer_darwin.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,8 @@ func (l *eventLoop) Events() chan Events {
139139
return l.events
140140
}
141141

142-
func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
142+
func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
143143
file = fileBefore
144-
lines = []string{}
145144
var truncated bool
146145
logger.Debug("File system watcher received %v event(s):\n", len(events.events))
147146
for i, event := range events.events {
@@ -167,12 +166,11 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a
167166
// Handle write event.
168167
for _, event := range events.events {
169168
if file != nil && event.Ident == fdToInt(file.Fd()) && event.Fflags&syscall.NOTE_WRITE == syscall.NOTE_WRITE {
170-
var freshLines []string
171-
freshLines, err = reader.ReadAvailableLines(file)
172-
if err != nil {
169+
var finished bool
170+
finished, err = reader.ReadAvailableLines(file)
171+
if finished || err != nil {
173172
return
174173
}
175-
lines = append(lines, freshLines...)
176174
}
177175
}
178176

@@ -196,12 +194,11 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a
196194
return
197195
}
198196
reader.Clear()
199-
var freshLines []string
200-
freshLines, err = reader.ReadAvailableLines(file)
201-
if err != nil {
197+
var finished bool
198+
finished, err = reader.ReadAvailableLines(file)
199+
if finished || err != nil {
202200
return
203201
}
204-
lines = append(lines, freshLines...)
205202
} else {
206203
// If file could not be opened, the CREATE event was for another file, we ignore this.
207204
err = nil

tailer/fileTailer_linux.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,8 @@ func (l *eventLoop) Events() chan Events {
141141
return l.events
142142
}
143143

144-
func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
144+
func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
145145
file = fileBefore
146-
lines = []string{}
147146
filename := filepath.Base(abspath)
148147
var truncated bool
149148
for _, event := range events {
@@ -163,12 +162,11 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab
163162
return
164163
}
165164
}
166-
var freshLines []string
167-
freshLines, err = reader.ReadAvailableLines(file)
168-
if err != nil {
165+
var finished bool
166+
finished, err = reader.ReadAvailableLines(file)
167+
if finished || err != nil {
169168
return
170169
}
171-
lines = append(lines, freshLines...)
172170
}
173171
}
174172

@@ -189,12 +187,11 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab
189187
return
190188
}
191189
reader.Clear()
192-
var freshLines []string
193-
freshLines, err = reader.ReadAvailableLines(file)
194-
if err != nil {
190+
var finished bool
191+
finished, err = reader.ReadAvailableLines(file)
192+
if finished || err != nil {
195193
return
196194
}
197-
lines = append(lines, freshLines...)
198195
}
199196
}
200197
return

tailer/fileTailer_windows.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,8 @@ func (l *eventLoop) Events() chan Events {
105105
return l.events
106106
}
107107

108-
func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
108+
func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
109109
file = fileBefore
110-
lines = []string{}
111110
var truncated bool
112111
logger.Debug("File system watcher received %v.\n", event.String())
113112

@@ -123,12 +122,11 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat
123122
return
124123
}
125124
}
126-
var freshLines []string
127-
freshLines, err = reader.ReadAvailableLines(file)
128-
if err != nil {
125+
var finished bool
126+
finished, err = reader.ReadAvailableLines(file)
127+
if finished || err != nil {
129128
return
130129
}
131-
lines = append(lines, freshLines...)
132130
}
133131

134132
// MOVED_FROM or DELETE
@@ -144,12 +142,11 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat
144142
return
145143
}
146144
reader.Clear()
147-
var freshLines []string
148-
freshLines, err = reader.ReadAvailableLines(file)
149-
if err != nil {
145+
var finished bool
146+
finished, err = reader.ReadAvailableLines(file)
147+
if finished || err != nil {
150148
return
151149
}
152-
lines = append(lines, freshLines...)
153150
}
154151
return
155152
}

tailer/linereader.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,60 +22,66 @@ import (
2222

2323
type bufferedLineReader struct {
2424
remainingBytesFromLastRead []byte
25+
26+
// channels are used to stream the results out
27+
lines chan<- string
28+
done <-chan struct{}
2529
}
2630

27-
func NewBufferedLineReader() *bufferedLineReader {
31+
func NewBufferedLineReader(lines chan<- string, done <-chan struct{}) *bufferedLineReader {
2832
return &bufferedLineReader{
2933
remainingBytesFromLastRead: []byte{},
34+
lines: lines,
35+
done: done,
3036
}
3137
}
3238

33-
func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) ([]string, error) {
34-
var lines []string
35-
newBytes, err := read2EOF(file)
36-
if err != nil {
37-
return nil, err
38-
}
39-
lines, r.remainingBytesFromLastRead = splitLines(append(r.remainingBytesFromLastRead, newBytes...))
40-
return lines, nil
41-
}
42-
43-
func (r *bufferedLineReader) Clear() {
44-
r.remainingBytesFromLastRead = []byte{}
45-
}
39+
func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) (bool, error) {
4640

47-
func read2EOF(file io.Reader) ([]byte, error) {
48-
result := make([]byte, 0)
41+
// for each buffer, split lines and stream
4942
buf := make([]byte, 512)
43+
var done bool
44+
5045
for {
5146
n, err := file.Read(buf)
5247
if n > 0 {
5348
// Callers should always process the n > 0 bytes returned before considering the error err.
54-
result = append(result, buf[0:n]...)
49+
result := append(r.remainingBytesFromLastRead, buf[0:n]...)
50+
done, r.remainingBytesFromLastRead = r.processLines(result)
51+
if done {
52+
return true, nil
53+
}
5554
}
5655
if err != nil {
5756
if err == io.EOF {
58-
return result, nil
57+
return false, nil
5958
} else {
60-
return nil, fmt.Errorf("read error: %v", err.Error())
59+
return false, fmt.Errorf("read error: %v", err.Error())
6160
}
6261
}
6362
}
6463
}
6564

66-
func splitLines(data []byte) (lines []string, remainingBytes []byte) {
65+
func (r *bufferedLineReader) Clear() {
66+
r.remainingBytesFromLastRead = []byte{}
67+
}
68+
69+
func (r *bufferedLineReader) processLines(data []byte) (finished bool, remainingBytes []byte) {
6770
newline := []byte("\n")
68-
lines = make([]string, 0)
69-
remainingBytes = make([]byte, 0)
7071
for _, line := range bytes.SplitAfter(data, newline) {
7172
if bytes.HasSuffix(line, newline) {
7273
line = bytes.TrimSuffix(line, newline)
7374
line = bytes.TrimSuffix(line, []byte("\r")) // Needed for CRLF line endings?
74-
lines = append(lines, string(line))
75+
select {
76+
case r.lines <- string(line):
77+
case <-r.done:
78+
finished = true
79+
return
80+
}
7581
} else {
7682
// This is the last (incomplete) line returned by SplitAfter(). We will exit the for loop here.
7783
remainingBytes = line
7884
}
7985
}
80-
return lines, remainingBytes
86+
return
8187
}

tailer/linereader_test.go

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,26 +45,73 @@ func (f *mockFile) Read(p []byte) (int, error) {
4545
}
4646
}
4747

48+
func collectLines(linechan chan string) []string {
49+
lines := []string{}
50+
for {
51+
select {
52+
case line := <-linechan:
53+
lines = append(lines, line)
54+
default:
55+
return lines
56+
}
57+
}
58+
}
59+
4860
func TestLineReader(t *testing.T) {
49-
file := NewMockFile("This is l", "ine 1\n", "This is line two\nThis is line three\n", "This ", "is ", "line 4", "\n", "\n")
50-
reader := NewBufferedLineReader()
51-
52-
lines, err := reader.ReadAvailableLines(file)
53-
expectEmpty(t, lines, err)
54-
lines, err = reader.ReadAvailableLines(file)
55-
expectLines(t, lines, err, "This is line 1")
56-
lines, err = reader.ReadAvailableLines(file)
57-
expectLines(t, lines, err, "This is line two", "This is line three")
58-
lines, err = reader.ReadAvailableLines(file) // This
59-
expectEmpty(t, lines, err)
60-
lines, err = reader.ReadAvailableLines(file) // is
61-
expectEmpty(t, lines, err)
62-
lines, err = reader.ReadAvailableLines(file) // line 4
63-
expectEmpty(t, lines, err)
64-
lines, err = reader.ReadAvailableLines(file) // \n
65-
expectLines(t, lines, err, "This is line 4")
66-
lines, err = reader.ReadAvailableLines(file) // \n
67-
expectLines(t, lines, err, "")
61+
file := NewMockFile("This is l", "ine 1\n", "This is line two\nThis is line three\n", "This ", "is ", "line 4", "\n", "\n", "\n")
62+
63+
done := make(chan struct{})
64+
linechan := make(chan string, 20)
65+
66+
reader := NewBufferedLineReader(linechan, done)
67+
68+
finished, err := reader.ReadAvailableLines(file)
69+
expectEmpty(t, collectLines(linechan), err)
70+
expectNotFinished(t, finished)
71+
72+
finished, err = reader.ReadAvailableLines(file)
73+
expectLines(t, collectLines(linechan), err, "This is line 1")
74+
expectNotFinished(t, finished)
75+
76+
finished, err = reader.ReadAvailableLines(file)
77+
expectLines(t, collectLines(linechan), err, "This is line two", "This is line three")
78+
expectNotFinished(t, finished)
79+
80+
finished, err = reader.ReadAvailableLines(file) // This
81+
expectEmpty(t, collectLines(linechan), err)
82+
expectNotFinished(t, finished)
83+
84+
finished, err = reader.ReadAvailableLines(file) // is
85+
expectEmpty(t, collectLines(linechan), err)
86+
expectNotFinished(t, finished)
87+
88+
finished, err = reader.ReadAvailableLines(file) // line 4
89+
expectEmpty(t, collectLines(linechan), err)
90+
expectNotFinished(t, finished)
91+
92+
finished, err = reader.ReadAvailableLines(file) // \n
93+
expectLines(t, collectLines(linechan), err, "This is line 4")
94+
expectNotFinished(t, finished)
95+
96+
finished, err = reader.ReadAvailableLines(file) // \n
97+
expectLines(t, collectLines(linechan), err, "")
98+
expectNotFinished(t, finished)
99+
100+
close(done)
101+
finished, err = reader.ReadAvailableLines(file) // \n
102+
expectFinished(t, finished)
103+
}
104+
105+
func expectNotFinished(t *testing.T, finished bool) {
106+
if finished {
107+
t.Error("expected to be not finished, but finished")
108+
}
109+
}
110+
111+
func expectFinished(t *testing.T, finished bool) {
112+
if !finished {
113+
t.Error("expected to be finished, but not finished")
114+
}
68115
}
69116

70117
func expectEmpty(t *testing.T, lines []string, err error) {

0 commit comments

Comments
 (0)