Make sure catalogs get updated periodically on the executor processes #6272
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
When a new catalog file is fetched, there might be a difference for the catalogs among different executor processes, despite there being a single source of truth, on disk at
~/.sky/catalogs/*.TL;DR this is because
read_catalogreturns aLazyDataFrame, which gets assigned to a global variable, for each unique (cloud, catalog) pair. AndLazyDataFramecaches the DataFrame in memory for the duration of the process' lifetime, because it only callsupdate_funcifself._dfis None, which is only when it's first called.This PR changes the behaviour of
LazyDataFrame, such that now,update_if_stale_funcreturns a bool, that indicates if an update was done or not, and this is called on every read (_load_df), to ensure that a long running executor process is aware if it is time to refresh the catalog, and save the new one to disk.To account for the performance penalty of calling the update if stale function for every read, we add two optimizations:
_update_catalogofread_catalog, we add a fast path by calling_need_updatebefore trying to acquire the file lock, to prevent lock contentions. We still need to do it after acquiring the lock, to ensure only one process gets to write to disk, so that part is left as is@annotations.lru_cache(scope='request')to_load_dfinLazyDataFrame. A single request could read a catalog multiple times, so this helps alleviate unnecessary calls toupdate_if_stale_funcI added a unit test in
test_catalog.pyTested (run the relevant ones):
bash format.sh/smoke-test(CI) orpytest tests/test_smoke.py(local)/smoke-test -k test_name(CI) orpytest tests/test_smoke.py::test_name(local)/quicktest-core(CI) orpytest tests/smoke_tests/test_backward_compat.py(local)