Skip to content

Commit ae34a79

Browse files
committed
#5 prepare polling watcher with multiple logfile support
1 parent 9737fb7 commit ae34a79

File tree

8 files changed

+149
-44
lines changed

8 files changed

+149
-44
lines changed

tailer/fswatcher/errors.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@ const (
2323
DirectoryNotFound
2424
FileNotFound
2525

26-
// As we cannot keep files open on Windows (see comment in file_windows.go),
27-
// it might happen that a file is removed between two reads.
28-
// We signal this using the WinFileRemoved error and handle it by calling syncFilesInDir.
29-
// This error is never sent through the errors channel, it's only used internally.
30-
// TODO: Refactor error handling and make this an internal error.
26+
// The WinFileRemoved Error should never be seen, because it is handled internally in the FileTailer.
27+
// TODO: Refactor error handling such that this is not part of the public interface.
3128
WinFileRemoved
3229
)
3330

tailer/fswatcher/keventloop_darwin.go renamed to tailer/fswatcher/fseventProducerLoop_darwin.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
type keventloop struct {
2323
kq int
24-
events chan syscall.Kevent_t
24+
events chan fsevent
2525
errors chan Error
2626
done chan struct{}
2727
}
@@ -35,10 +35,18 @@ func (p *keventloop) Close() {
3535
close(p.done)
3636
}
3737

38+
func (p *keventloop) Events() chan fsevent {
39+
return p.events
40+
}
41+
42+
func (p *keventloop) Errors() chan Error {
43+
return p.errors
44+
}
45+
3846
func runKeventLoop(kq int) *keventloop {
3947
var result = &keventloop{
4048
kq: kq,
41-
events: make(chan syscall.Kevent_t),
49+
events: make(chan fsevent),
4250
errors: make(chan Error),
4351
done: make(chan struct{}),
4452
}

tailer/fswatcher/inotifyloop_linux.go renamed to tailer/fswatcher/fseventProducerLoop_linux.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
type inotifyloop struct {
2525
fd int
26-
events chan inotifyEvent
26+
events chan fsevent
2727
errors chan Error
2828
done chan struct{}
2929
}
@@ -33,6 +33,14 @@ type inotifyEvent struct {
3333
Name string
3434
}
3535

36+
func (l *inotifyloop) Events() chan fsevent {
37+
return l.events
38+
}
39+
40+
func (l *inotifyloop) Errors() chan Error {
41+
return l.errors
42+
}
43+
3644
// Terminate the inotify loop.
3745
// If the loop hangs in syscall.Read(), it will keep hanging there until the next event is read.
3846
// Therefore, after the consumer called Close(), it should generate an artificial IN_IGNORE event to
@@ -44,7 +52,7 @@ func (l *inotifyloop) Close() {
4452
func runInotifyLoop(fd int) *inotifyloop {
4553
var result = &inotifyloop{
4654
fd: fd,
47-
events: make(chan inotifyEvent),
55+
events: make(chan fsevent),
4856
errors: make(chan Error),
4957
done: make(chan struct{}),
5058
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2019 The grok_exporter Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fswatcher
16+
17+
import "golang.org/x/exp/winfsnotify"
18+
19+
type winwatcherloop struct {
20+
events chan fsevent
21+
errors chan Error
22+
done chan struct{}
23+
}
24+
25+
func (l *winwatcherloop) Events() chan fsevent {
26+
return l.events
27+
}
28+
29+
func (l *winwatcherloop) Errors() chan Error {
30+
return l.errors
31+
}
32+
33+
func (l *winwatcherloop) Close() {
34+
close(l.done)
35+
}
36+
37+
func runWinWatcherLoop(w *winfsnotify.Watcher) *winwatcherloop {
38+
var (
39+
events = make(chan fsevent)
40+
errors = make(chan Error)
41+
done = make(chan struct{})
42+
)
43+
go func() {
44+
for {
45+
select {
46+
case event := <-w.Event:
47+
events <- event
48+
case err := <-w.Error:
49+
errors <- NewError(NotSpecified, err, "")
50+
case <-done:
51+
w.Close()
52+
return
53+
}
54+
}
55+
}()
56+
return &winwatcherloop{
57+
events: events,
58+
errors: errors,
59+
done: done,
60+
}
61+
}

tailer/fswatcher/fswatcher.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2018 The grok_exporter Authors
1+
// Copyright 2018-2019 The grok_exporter Authors
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -49,12 +49,34 @@ type fileTailer struct {
4949
globs []glob.Glob
5050
watchedDirs []*Dir
5151
watchedFiles map[string]*fileWithReader // path -> fileWithReader
52-
osSpecific *watcher
52+
osSpecific fswatcher
5353
lines chan Line
5454
errors chan Error
5555
done chan struct{}
5656
}
5757

58+
type fswatcher interface {
59+
io.Closer
60+
runFseventProducerLoop() fseventProducerLoop
61+
processEvent(t *fileTailer, event fsevent, log logrus.FieldLogger) Error
62+
watchDir(path string) (*Dir, Error)
63+
unwatchDir(dir *Dir) error
64+
watchFile(file fileMeta) Error
65+
}
66+
67+
type fseventProducerLoop interface {
68+
Close()
69+
Events() chan fsevent
70+
Errors() chan Error
71+
}
72+
73+
type fsevent interface{}
74+
75+
type fileMeta interface {
76+
Fd() uintptr
77+
Name() string
78+
}
79+
5880
func (t *fileTailer) Lines() chan Line {
5981
return t.lines
6082
}
@@ -134,7 +156,7 @@ func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log
134156
select {
135157
case <-t.done:
136158
return
137-
case event := <-eventProducerLoop.events:
159+
case event := <-eventProducerLoop.Events():
138160
processEventError := t.osSpecific.processEvent(t, event, log)
139161
if processEventError != nil {
140162
select {
@@ -143,7 +165,7 @@ func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log
143165
}
144166
return
145167
}
146-
case err := <-eventProducerLoop.errors:
168+
case err := <-eventProducerLoop.Errors():
147169
select {
148170
case <-t.done:
149171
case t.errors <- NewError(NotSpecified, err, "error reading file system events"):
@@ -224,7 +246,7 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
224246
fileLogger.Debug("skipping, because it is a directory")
225247
continue
226248
}
227-
alreadyWatched, Err := t.osSpecific.findSameFile(t, fileInfo, filePath)
249+
alreadyWatched, Err := findSameFile(t, fileInfo, filePath)
228250
if Err != nil {
229251
return Err
230252
}
@@ -252,7 +274,7 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
252274
fileLogger = fileLogger.WithField("fd", newFile.Fd())
253275
fileLogger.Info("watching new file")
254276

255-
Err = t.osSpecific.watchNewFile(newFile)
277+
Err = t.osSpecific.watchFile(newFile)
256278
if Err != nil {
257279
newFile.Close()
258280
return Err

tailer/fswatcher/fswatcher_darwin.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ func (w *watcher) Close() error {
5151
}
5252
}
5353

54-
func (w *watcher) runFseventProducerLoop() *keventloop {
54+
func (w *watcher) runFseventProducerLoop() fseventProducerLoop {
5555
return runKeventLoop(w.kq)
5656
}
5757

58-
func initWatcher() (*watcher, Error) {
58+
func initWatcher() (fswatcher, Error) {
5959
kq, err := syscall.Kqueue()
6060
if err != nil {
6161
return nil, NewError(NotSpecified, err, "kqueue() failed")
@@ -81,7 +81,7 @@ func (w *watcher) watchDir(path string) (*Dir, Error) {
8181
return &Dir{dir}, nil
8282
}
8383

84-
func (w *watcher) watchNewFile(newFile *os.File) Error {
84+
func (w *watcher) watchFile(newFile fileMeta) Error {
8585
zeroTimeout := syscall.NsecToTimespec(0) // timeout zero means non-blocking kevent() call
8686
_, err := syscall.Kevent(w.kq, []syscall.Kevent_t{makeEvent(newFile)}, nil, &zeroTimeout)
8787
if err != nil {
@@ -90,12 +90,20 @@ func (w *watcher) watchNewFile(newFile *os.File) Error {
9090
return nil
9191
}
9292

93-
func (w *watcher) processEvent(t *fileTailer, kevent syscall.Kevent_t, log logrus.FieldLogger) Error {
93+
func (w *watcher) processEvent(t *fileTailer, event fsevent, log logrus.FieldLogger) Error {
9494
var (
9595
dir *Dir
9696
file *fileWithReader
9797
dirLogger, fileLogger logrus.FieldLogger
98+
kevent syscall.Kevent_t
99+
ok bool
98100
)
101+
102+
kevent, ok = event.(syscall.Kevent_t)
103+
if !ok {
104+
return NewErrorf(NotSpecified, nil, "received a file system event of unknown type %T", event)
105+
}
106+
99107
for _, dir = range t.watchedDirs {
100108
if kevent.Ident == fdToInt(dir.file.Fd()) {
101109
dirLogger = log.WithField("directory", dir.file.Name())
@@ -189,7 +197,7 @@ func isTruncated(file *os.File) (bool, error) {
189197
return currentPos > fileInfo.Size(), nil
190198
}
191199

192-
func (w *watcher) findSameFile(t *fileTailer, file os.FileInfo, _ string) (*fileWithReader, Error) {
200+
func findSameFile(t *fileTailer, file os.FileInfo, _ string) (*fileWithReader, Error) {
193201
var (
194202
fileInfo os.FileInfo
195203
err error
@@ -206,7 +214,11 @@ func (w *watcher) findSameFile(t *fileTailer, file os.FileInfo, _ string) (*file
206214
return nil, nil
207215
}
208216

209-
func makeEvent(file *os.File) syscall.Kevent_t {
217+
type withFd interface {
218+
Fd() uintptr
219+
}
220+
221+
func makeEvent(file withFd) syscall.Kevent_t {
210222

211223
// Note about the EV_CLEAR flag:
212224
//

tailer/fswatcher/fswatcher_linux.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ func unwatchDirByEvent(t *fileTailer, event inotifyEvent) {
6262
t.watchedDirs = watchedDirsAfter
6363
}
6464

65-
func (w *watcher) runFseventProducerLoop() *inotifyloop {
65+
func (w *watcher) runFseventProducerLoop() fseventProducerLoop {
6666
return runInotifyLoop(w.fd)
6767
}
6868

69-
func initWatcher() (*watcher, Error) {
69+
func initWatcher() (fswatcher, Error) {
7070
fd, err := syscall.InotifyInit1(syscall.IN_CLOEXEC)
7171
if err != nil {
7272
return nil, NewError(NotSpecified, err, "inotify_init1() failed")
@@ -86,7 +86,7 @@ func (w *watcher) watchDir(path string) (*Dir, Error) {
8686
return &Dir{wd: wd, path: path}, nil
8787
}
8888

89-
func (w *watcher) watchNewFile(newFile *os.File) Error {
89+
func (w *watcher) watchFile(_ fileMeta) Error {
9090
// nothing to do, because on Linux we watch the directory and don't need to watch individual files.
9191
return nil
9292
}
@@ -100,7 +100,11 @@ func findDir(t *fileTailer, event inotifyEvent) *Dir {
100100
return nil
101101
}
102102

103-
func (w *watcher) processEvent(t *fileTailer, event inotifyEvent, log logrus.FieldLogger) Error {
103+
func (w *watcher) processEvent(t *fileTailer, fsevent fsevent, log logrus.FieldLogger) Error {
104+
event, ok := fsevent.(inotifyEvent)
105+
if !ok {
106+
return NewErrorf(NotSpecified, nil, "received a file system event of unknown type %T", event)
107+
}
104108
dir := findDir(t, event)
105109
if dir == nil {
106110
return NewError(NotSpecified, nil, "watch list inconsistent: received a file system event for an unknown directory")
@@ -159,7 +163,7 @@ func isTruncated(file *os.File) (bool, error) {
159163
return currentPos > fileInfo.Size(), nil
160164
}
161165

162-
func (w *watcher) findSameFile(t *fileTailer, file os.FileInfo, _ string) (*fileWithReader, Error) {
166+
func findSameFile(t *fileTailer, file os.FileInfo, _ string) (*fileWithReader, Error) {
163167
var (
164168
fileInfo os.FileInfo
165169
err error

tailer/fswatcher/fswatcher_windows.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,11 @@ func (w *watcher) Close() error {
6060
}
6161
}
6262

63-
func (w *watcher) runFseventProducerLoop() *winwatcherloop {
64-
return &winwatcherloop{
65-
events: w.winWatcher.Event,
66-
errors: w.winWatcher.Error,
67-
}
68-
}
69-
70-
type winwatcherloop struct {
71-
events chan *winfsnotify.Event
72-
errors chan error
63+
func (w *watcher) runFseventProducerLoop() fseventProducerLoop {
64+
return runWinWatcherLoop(w.winWatcher)
7365
}
7466

75-
func (l *winwatcherloop) Close() {
76-
// noop, winwatcher.Close() called in shutdown()
77-
}
78-
79-
func initWatcher() (*watcher, Error) {
67+
func initWatcher() (fswatcher, Error) {
8068
winWatcher, err := winfsnotify.NewWatcher()
8169
if err != nil {
8270
return nil, NewError(NotSpecified, err, "failed to initialize file system watcher")
@@ -92,12 +80,17 @@ func (w *watcher) watchDir(path string) (*Dir, Error) {
9280
return &Dir{path: path}, nil
9381
}
9482

95-
func (w *watcher) watchNewFile(newFile *File) Error {
83+
func (w *watcher) watchFile(_ fileMeta) Error {
9684
// nothing to do, because on Windows we watch the directory and don't need to watch individual files.
9785
return nil
9886
}
9987

100-
func (w *watcher) processEvent(t *fileTailer, event *winfsnotify.Event, log logrus.FieldLogger) Error {
88+
func (w *watcher) processEvent(t *fileTailer, fsevent fsevent, log logrus.FieldLogger) Error {
89+
event, ok := fsevent.(*winfsnotify.Event)
90+
if !ok {
91+
return NewErrorf(NotSpecified, nil, "received a file system event of unknown type %T", event)
92+
}
93+
10194
dir, fileName := dirAndFile(t, event.Name)
10295
if dir == nil {
10396
return NewError(NotSpecified, nil, "watch list inconsistent: received a file system event for an unknown directory")
@@ -156,7 +149,7 @@ func isTruncated(file *os.File) (bool, Error) {
156149
return currentPos > fileInfo.Size(), nil
157150
}
158151

159-
func (w *watcher) findSameFile(t *fileTailer, newFileInfo *fileInfo, path string) (*fileWithReader, Error) {
152+
func findSameFile(t *fileTailer, newFileInfo *fileInfo, path string) (*fileWithReader, Error) {
160153
newFile, Err := open(path)
161154
if Err != nil {
162155
return nil, Err

0 commit comments

Comments
 (0)