Skip to content

Commit 3139f7f

Browse files
committed
#5 prepare polling watcher with multiple logfile support
1 parent ae34a79 commit 3139f7f

File tree

6 files changed

+156
-14
lines changed

6 files changed

+156
-14
lines changed

tailer/fswatcher/fswatcher.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"os"
2424
"path/filepath"
25+
"time"
2526
)
2627

2728
type FileTailer interface {
@@ -92,6 +93,17 @@ func (t *fileTailer) Close() {
9293
}
9394

9495
func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FileTailer, error) {
96+
return runFileTailer(initWatcher, globs, readall, failOnMissingFile, log)
97+
}
98+
99+
func RunPollingFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, pollInterval time.Duration, log logrus.FieldLogger) (FileTailer, error) {
100+
initFunc := func() (fswatcher, Error) {
101+
return initPollingWatcher(pollInterval)
102+
}
103+
return runFileTailer(initFunc, globs, readall, failOnMissingFile, log)
104+
}
105+
106+
func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FileTailer, error) {
95107

96108
var (
97109
t *fileTailer
@@ -106,7 +118,7 @@ func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log
106118
done: make(chan struct{}),
107119
}
108120

109-
t.osSpecific, Err = initWatcher()
121+
t.osSpecific, Err = initFunc()
110122
if Err != nil {
111123
return nil, Err
112124
}
@@ -127,10 +139,10 @@ func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log
127139
eventProducerLoop := t.osSpecific.runFseventProducerLoop()
128140
defer eventProducerLoop.Close()
129141

130-
for _, dirPath := range t.watchedDirs {
131-
dirLogger := log.WithField("directory", dirPath)
142+
for _, dir := range t.watchedDirs {
143+
dirLogger := log.WithField("directory", dir.Path())
132144
dirLogger.Debugf("initializing directory")
133-
Err = t.syncFilesInDir(dirPath, readall, dirLogger)
145+
Err = t.syncFilesInDir(dir, readall, dirLogger)
134146
if Err != nil {
135147
select {
136148
case <-t.done:

tailer/fswatcher/fswatcher_darwin.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,32 @@ func initWatcher() (fswatcher, Error) {
6565

6666
func (w *watcher) watchDir(path string) (*Dir, Error) {
6767
var (
68+
dir *Dir
6869
err error
69-
dir *os.File
70+
Err Error
7071
zeroTimeout = syscall.NsecToTimespec(0) // timeout zero means non-blocking kevent() call
7172
)
72-
dir, err = os.Open(path)
73-
if err != nil {
74-
return nil, NewErrorf(NotSpecified, err, "%v: open() failed", path)
73+
dir, Err = newDir(path)
74+
if Err != nil {
75+
return nil, Err
7576
}
76-
_, err = syscall.Kevent(w.kq, []syscall.Kevent_t{makeEvent(dir)}, nil, &zeroTimeout)
77+
_, err = syscall.Kevent(w.kq, []syscall.Kevent_t{makeEvent(dir.file)}, nil, &zeroTimeout)
7778
if err != nil {
78-
dir.Close()
79+
dir.file.Close()
7980
return nil, NewErrorf(NotSpecified, err, "%v: kevent() failed", path)
8081
}
82+
return dir, nil
83+
}
84+
85+
func newDir(path string) (*Dir, Error) {
86+
var (
87+
err error
88+
dir *os.File
89+
)
90+
dir, err = os.Open(path)
91+
if err != nil {
92+
return nil, NewErrorf(NotSpecified, err, "%v: open() failed", path)
93+
}
8194
return &Dir{dir}, nil
8295
}
8396

tailer/fswatcher/fswatcher_linux.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,23 @@ func initWatcher() (fswatcher, Error) {
7676

7777
func (w *watcher) watchDir(path string) (*Dir, Error) {
7878
var (
79-
wd int
79+
dir *Dir
8080
err error
81+
Err Error
8182
)
82-
wd, err = syscall.InotifyAddWatch(w.fd, path, syscall.IN_MODIFY|syscall.IN_MOVED_FROM|syscall.IN_MOVED_TO|syscall.IN_DELETE|syscall.IN_CREATE)
83+
dir, Err = newDir(path)
84+
if Err != nil {
85+
return nil, Err
86+
}
87+
dir.wd, err = syscall.InotifyAddWatch(w.fd, path, syscall.IN_MODIFY|syscall.IN_MOVED_FROM|syscall.IN_MOVED_TO|syscall.IN_DELETE|syscall.IN_CREATE)
8388
if err != nil {
8489
return nil, NewErrorf(NotSpecified, err, "%q: inotify_add_watch() failed", path)
8590
}
86-
return &Dir{wd: wd, path: path}, nil
91+
return dir, nil
92+
}
93+
94+
func newDir(path string) (*Dir, Error) {
95+
return &Dir{path: path}, nil
8796
}
8897

8998
func (w *watcher) watchFile(_ fileMeta) Error {

tailer/fswatcher/fswatcher_windows.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,23 @@ func initWatcher() (fswatcher, Error) {
7373
}
7474

7575
func (w *watcher) watchDir(path string) (*Dir, Error) {
76-
err := w.winWatcher.Watch(path)
76+
var (
77+
dir *Dir
78+
err error
79+
Err Error
80+
)
81+
dir, Err = newDir(path)
82+
if Err != nil {
83+
return nil, Err
84+
}
85+
err = w.winWatcher.Watch(path)
7786
if err != nil {
7887
return nil, NewErrorf(NotSpecified, err, "%v: failed to watch directory", path)
7988
}
89+
return dir, nil
90+
}
91+
92+
func newDir(path string) (*Dir, Error) {
8093
return &Dir{path: path}, nil
8194
}
8295

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package fswatcher
2+
3+
import "time"
4+
5+
type pollloop struct {
6+
events chan fsevent
7+
errors chan Error // unused
8+
done chan struct{}
9+
}
10+
11+
func (l *pollloop) Events() chan fsevent {
12+
return l.events
13+
}
14+
15+
func (l *pollloop) Errors() chan Error {
16+
return l.errors
17+
}
18+
19+
func (l *pollloop) Close() {
20+
close(l.done)
21+
}
22+
23+
func runPollLoop(pollInterval time.Duration) *pollloop {
24+
25+
events := make(chan fsevent)
26+
errors := make(chan Error) // unused
27+
done := make(chan struct{})
28+
29+
go func() {
30+
defer func() {
31+
close(events)
32+
close(errors)
33+
}()
34+
for {
35+
tick := time.After(pollInterval)
36+
select {
37+
case <-tick:
38+
events <- struct{}{}
39+
case <-done:
40+
return
41+
}
42+
}
43+
}()
44+
return &pollloop{
45+
events: events,
46+
errors: errors,
47+
done: done,
48+
}
49+
}

tailer/fswatcher/pollingwatcher.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package fswatcher
2+
3+
import (
4+
"github.com/sirupsen/logrus"
5+
"time"
6+
)
7+
8+
type pollingWatcher struct {
9+
pollInterval time.Duration
10+
}
11+
12+
func initPollingWatcher(pollInterval time.Duration) (fswatcher, Error) {
13+
return &pollingWatcher{
14+
pollInterval: pollInterval,
15+
}, nil
16+
}
17+
18+
func (w *pollingWatcher) runFseventProducerLoop() fseventProducerLoop {
19+
return runPollLoop(w.pollInterval)
20+
}
21+
22+
func (w *pollingWatcher) processEvent(t *fileTailer, fsevent fsevent, log logrus.FieldLogger) Error {
23+
for _, dir := range t.watchedDirs {
24+
err := t.syncFilesInDir(dir, true, log)
25+
if err != nil {
26+
return err
27+
}
28+
}
29+
return nil
30+
}
31+
32+
func (w *pollingWatcher) Close() error {
33+
return nil
34+
}
35+
36+
func (w *pollingWatcher) watchDir(path string) (*Dir, Error) {
37+
return newDir(path)
38+
}
39+
40+
func (w *pollingWatcher) unwatchDir(dir *Dir) error {
41+
return nil
42+
}
43+
44+
func (w *pollingWatcher) watchFile(file fileMeta) Error {
45+
return nil
46+
}

0 commit comments

Comments
 (0)