-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fswatcher] File Reconciler w/ Resync Interval #149
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
// Copyright © 2023 Kaleido, Inc. | ||
// Copyright © 2024 Kaleido, Inc. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
@@ -20,6 +20,7 @@ import ( | |
"context" | ||
"os" | ||
"path" | ||
"time" | ||
|
||
"github.com/fsnotify/fsnotify" | ||
"github.com/hyperledger/firefly-common/pkg/fftypes" | ||
|
@@ -35,9 +36,19 @@ import ( | |
// - Only fires if the data in the file is different to the last notification | ||
// - Does not reload the config - that's the caller's responsibility | ||
func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) error { | ||
return sync(ctx, fullFilePath, onChange, onClose, nil, nil) | ||
} | ||
|
||
// Reconcile behaves the same as Watch, except it allows for running the onSync func on a provided | ||
// interval. The default re-sync internal is 1m. | ||
func Reconcile(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error { | ||
return sync(ctx, fullFilePath, onChange, onClose, onSync, resyncInterval) | ||
} | ||
|
||
func sync(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error { | ||
filePath := path.Dir(fullFilePath) | ||
fileName := path.Base(fullFilePath) | ||
log.L(ctx).Debugf("Starting file listener for '%s' in directory '%s'", fileName, filePath) | ||
log.L(ctx).Debugf("Starting file reconciler for '%s' in directory '%s'", fileName, filePath) | ||
|
||
watcher, err := fsnotify.NewWatcher() | ||
if err == nil { | ||
|
@@ -46,7 +57,7 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e | |
if onClose != nil { | ||
onClose() | ||
} | ||
}, watcher.Events, watcher.Errors) | ||
}, onSync, resyncInterval, watcher.Events, watcher.Errors) | ||
err = watcher.Add(filePath) | ||
} | ||
if err != nil { | ||
|
@@ -56,9 +67,18 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e | |
return nil | ||
} | ||
|
||
func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose func(), events chan fsnotify.Event, errors chan error) { | ||
func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration, events chan fsnotify.Event, errors chan error) { | ||
defer onClose() | ||
|
||
timeout := resyncInterval | ||
if timeout == nil { | ||
timeout = func() *time.Duration { | ||
defaultTimeout := time.Minute | ||
return &defaultTimeout | ||
}() | ||
} | ||
log.L(ctx).Debugf("re-sync interval set to '%s'", *timeout) | ||
|
||
var lastHash *fftypes.Bytes32 | ||
for { | ||
select { | ||
|
@@ -83,6 +103,15 @@ func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose | |
lastHash = dataHash | ||
} | ||
} | ||
case <-time.After(*timeout): | ||
if onSync != nil { | ||
data, err := os.ReadFile(fullFilePath) | ||
if err == nil { | ||
dataHash := fftypes.HashString(string(data)) | ||
log.L(ctx).Infof("Config file re-sync. Event=Resync Name=%s Size=%d Hash=%s", fullFilePath, len(data), dataHash) | ||
onSync() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused to see the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But thats the thing - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we can have a follow up issue to actually get the contents of the file back |
||
} | ||
case err, ok := <-errors: | ||
if ok { | ||
log.L(ctx).Errorf("FSEvent error: %s", err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to this approach that we use a few places in FireFly codebases, is to wrap the context.
Generally it's a good idea for all
for { select ....
loops to include<-ctx.Done()
so they will end if the context is terminated. This loop doesn't have that - which isn't caused by you, but does seem like an omission.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d like to suggest a different approach here:
Using
time.Ticker
. The implementation would look like this:(You should call the
ticker.Stop()
only if you want to reconcile the FS before exiting)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does either of these solutions handle the situation where there is another case statement which in this case is waiting for events that the file has been renamed, written too etc... This also acts as a "sync" I think the current proposed code has an issue that timer is actually not reset in this case if we want it to be.
In the first approach, you would just cancel the context when handling this case and go around the loop again with the timer reset. In the second approach, there is a
ticker.Reset(d duration)
so both approaches work.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Enrique - yes it's really important we continue to sync the file over time in the case of
Reconcile
. The reason for these proposed alternatives is that thecase <-time.After
I've proposed is essentially optional in theWatch
usecase. And so we'd occasionally do unnecessary CPU work when the timer pops, and go back around the loop to wait for the FS events, errors, or ctx to finish.In a frequently changing FS, that could incur a performance hit. However, I don't believe this
Watch
is optimized for such rapid changes in the first place. A user would likely prefer to implement their own watcher in that case.For the main case of a file which may changes on the scale of seconds or more, I don't think an extra
case
for this timeout would be detrimental, and I think the simplicity/readability of the code is preferred to a wrapper context/nested select statement which could have new bugs in it as Enrique pointed out we have to reset theloopCtx
, etc.So if its alright with the maintainers, I'd prefer to keep it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - so just marking where we're landing with this PR:
time.Ticker()
, but we're using atime.After()
.All the comments are around code style opinions, and (as per my original comment) I'm not laying down on the tracks for one particular style. So happy to merge.