Skip to content

Commit

Permalink
Add support for syncing more complex paths
Browse files Browse the repository at this point in the history
It is currently difficult to sync on a small portion of a model if that
model includes a list element. For example /node1/node2/*/node3/node4.
The current method would be to select node1/node2/* then exclude the
unwanted fields. This however can cause substantial processing.

This change allows paths of type /node1/node2/*/node3/node4 to be
entered. The new code uses apteryx_query and apteryx_watch_tree
to retrieve the database information to synchronize.

Additionally exclude paths are now only applied to the model they were
added with.
  • Loading branch information
gcampbell512 authored and sparlane committed Oct 14, 2024
1 parent 2d07ec7 commit d630574
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 78 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ All files in this directory are read in.
```
/path/to/sync/*
```
* Any path that will work with the CLI "apterxy -q path" can be used. e.g.
```
/path/to/sync/*/specific/information
```
* To exclude a particular node in a synced path, add it as a new line preceded by an '!'. e.g.
```
!/path/to/sync/excluded_node
Expand Down
250 changes: 172 additions & 78 deletions syncer.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ typedef struct sync_partner_s
bool new_joiner;
} sync_partner;

typedef struct _sync_entry
{
GList *paths;
GList *excluded_paths;
} sync_entry;

typedef struct _sync_params
{
uint64_t ts;
GNode *root;
sync_entry *entry;
} sync_params;

/* keep a list of the partners we are syncing paths to */
GList *partners = NULL;
pthread_rwlock_t partners_lock = PTHREAD_RWLOCK_INITIALIZER;
Expand All @@ -33,7 +46,8 @@ pthread_rwlock_t partners_lock = PTHREAD_RWLOCK_INITIALIZER;
static GNode *pending = NULL;
static guint pending_timer = 0;
static uint64_t oldest_pending = 0;
#define PENDING_HOLD_OFF 1000 /* 1 second */
#define PENDING_HOLD_OFF 800 /* 800 ms */
#define WATCH_TREE_HOLD_OFF 200 /* 200 ms */

/* periodic sync timer */
static guint sync_timer = 0;
Expand Down Expand Up @@ -222,13 +236,14 @@ sync_path_check (const char *path)
}

bool
sync_path_excluded (const char *path)
sync_path_excluded (sync_entry *entry, const char *path)
{
pthread_rwlock_rdlock (&paths_lock);

for (GList *iter = excluded_paths; iter; iter = iter->next)
for (GList *iter = entry->excluded_paths; iter; iter = iter->next)
{
char *exclude_path = (char *) iter->data;
char *star;
size_t len = strlen (exclude_path);
int ret;

Expand All @@ -239,8 +254,25 @@ sync_path_excluded (const char *path)
}
else
{
/* Match exact path */
ret = strcmp (path, exclude_path);
/* Match an exclusion path with a starred list field */
if ((star = strchr (exclude_path, '*')))
{
int star_offset = star - exclude_path;
ret = strncmp (path, exclude_path, star_offset);
if (ret == 0)
{
char *ptr = strchr (path + star_offset, '/');
if (!ptr || strstr (ptr, star + 1) != ptr)
{
ret = -1;
}
}
}
else
{
/* Match exact path */
ret = g_strcmp0 (path, exclude_path);
}
}

if (ret == 0)
Expand Down Expand Up @@ -292,77 +324,76 @@ apteryx_set_sp (sync_partner *sp, const char *path, const char *value)
return res;
}

bool
sync_recursive (GNode *root, uint64_t timestamp, const char *path)
static gboolean
sync_tree_process (GNode *node, gpointer arg)
{
if (sync_path_excluded (path))
{
/* Skip excluded paths */
return true;
}
uint64_t ts;
sync_params *params = (sync_params *) arg;
char *path = apteryx_node_path (node);

uint64_t ts = apteryx_timestamp (path);
if (ts <= timestamp || ts == 0)
if (path)
{
/* Skip anything that hasn't changed since last sync */
return true;
if (!sync_path_excluded (params->entry, path))
{
/* The timestamp is stored on the parent of the value. */
char *ptr = strrchr (path, '/');
if (ptr)
{
*ptr = '\0';
}
ts = apteryx_timestamp (path);
if (ts && ts > params->ts)
{
APTERYX_LEAF (params->root, strdup (path), (char *) node->data);
node->data = NULL;
}
}
g_free (path);
}

/* make sure the path doesn't end in '/' for the get */
char *get_path = strdup (path);
if (get_path[strlen (get_path) - 1] == '/')
{
get_path[strlen (get_path) - 1] = '\0';
}
return FALSE;
}

bool
sync_recursive (GNode *root, uint64_t timestamp, sync_entry *entry, const char *path)
{
GNode *tree;
GNode *query = g_node_new (g_strdup (path));

/* now make sure the path ends in '/' for the search */
char *search_path = NULL;
if (path[strlen (path) - 1] != '/')
if (query)
{
if (asprintf (&search_path, "%s/", path) == -1)
tree = apteryx_query (query);
if (tree)
{
ERROR ("SYNC couldn't allocate search path!\n");
search_path = NULL;
free (get_path);
return false;
sync_params params;
params.ts = timestamp;
params.root = root;
params.entry = entry;
g_node_traverse (tree, G_IN_ORDER, G_TRAVERSE_LEAVES, -1, sync_tree_process,
(gpointer) &params);
apteryx_free_tree (tree);
}
}
else
{
search_path = strdup (path);
apteryx_free_tree (query);
}

/* Update this node */
char *value = apteryx_get (get_path);
if (value)
{
/* only sync non-null values or you'll inadvertently prune */
APTERYX_LEAF (root, strdup (get_path + 1), value);
}
free (get_path);

/* Update all children */
GList *sync_paths = apteryx_search (search_path);
free (search_path);
for (GList * iter = sync_paths; iter; iter = iter->next)
{
sync_recursive (root, timestamp, iter->data);
}
g_list_free_full (sync_paths, free);
// TODO: Update remote children that weren't already covered above
// Specifically, look for local deletes that haven't propagated
return true;
}

void
sync_gather (GNode *root, uint64_t timestamp)
{
GList *iter = NULL;
GList *list = NULL;
sync_entry *entry;

pthread_rwlock_rdlock (&paths_lock);
for (iter = paths; iter; iter = iter->next)
{
sync_recursive (root, timestamp, (char *) iter->data);
entry = iter->data;
for (list = entry->paths; list; list = list->next)
{
sync_recursive (root, timestamp, entry, (char *) list->data);
}
}
pthread_rwlock_unlock (&paths_lock);
}
Expand Down Expand Up @@ -443,7 +474,7 @@ periodic_syncer_thread (void *ign)
{
g_source_remove (sync_timer);
sync_timer =
g_timeout_add (PENDING_HOLD_OFF * 1.1, (GSourceFunc) periodic_syncer_thread,
g_timeout_add (PENDING_HOLD_OFF * 1.4, (GSourceFunc) periodic_syncer_thread,
NULL);
}
else
Expand All @@ -457,16 +488,77 @@ periodic_syncer_thread (void *ign)
return false;
}

sync_entry *
sync_find_sync_entry (GNode *node)
{
sync_entry *entry;
char *path;
GList *list;

if (node->children && g_strcmp0 ((char *) node->data , "/") == 0)
{
node = node->children;
}

for (GList *iter = paths; iter; iter = iter->next)
{
entry = iter->data;
list = entry->paths;
if (list)
{
path = list->data;
if (strncmp (path + 1, (char *) node->data, strlen ((char *) node->data)) == 0)
{
return entry;
}
}
}

return NULL;
}

static gboolean
new_change_process (GNode *node, gpointer arg)
{
char *path = apteryx_node_path (node);
char *value = NULL;
sync_entry *entry = (sync_entry *) arg;
if (path)
{
char *ptr = strrchr (path, '/');
if (ptr)
{
*ptr = '\0';
}

if (!sync_path_excluded (entry, path))
{
if (node->data && ((char *) node->data)[0] != '\0')
{
value = node->data;
}

DEBUG ("Pushing NEW_CHANGE on path %s, value %s to cache\n", path, value);
add_data_point (path, value);
}
g_free (path);
}

return FALSE;
}

bool
new_change (const char *path, const char *value)
new_change (GNode *tree)
{
if (sync_path_excluded (path))
if (partners)
{
DEBUG ("Ignoring EXCLUDED_CHANGE on path %s, value %s\n", path, value);
return false;
sync_entry *entry = sync_find_sync_entry (tree);
if (entry)
{
g_node_traverse (tree, G_IN_ORDER, G_TRAVERSE_LEAVES, -1, new_change_process, entry);
}
}
DEBUG ("Pushing NEW_CHANGE on path %s, value %s to cache\n", path, value);
add_data_point (path, value);
apteryx_free_tree (tree);
return true;
}

Expand Down Expand Up @@ -498,25 +590,16 @@ register_existing_partners (void)
}

bool
add_path_to_sync (const char *path)
add_path_to_sync (sync_entry *entry, const char *path)
{
/* Note: it is required that the path in the file ends with "/*" */
/* Note: Any path the works with apteryx -q 'path' is acceptable */
if (sync_path_check (path))
{
DEBUG ("SYNC INIT: about to watch path: %s\n", path);
apteryx_watch (path, new_change);
/* Lastly, add the path to our list for the resyncer thread.
/* note: because we need to do a few things to this later,
* remove the trailing '/*'
*/
apteryx_watch_tree_full (path, new_change, WATCH_F_MASK_MYSELF, WATCH_TREE_HOLD_OFF);
char *new_path = strdup (path);
char *end_ptr = NULL;
if ((end_ptr = strstr (new_path, "/*")) != NULL)
{
end_ptr[0] = '\0';
}
pthread_rwlock_wrlock (&paths_lock);
paths = g_list_append (paths, new_path);
entry->paths = g_list_append (entry->paths, new_path);
pthread_rwlock_unlock (&paths_lock);
}
else
Expand All @@ -527,14 +610,14 @@ add_path_to_sync (const char *path)
}

bool
add_excluded_path (const char *path)
add_excluded_path (sync_entry *entry, const char *path)
{
if (sync_path_check (path))
{
DEBUG ("SYNC INIT: Adding exclusion for: %s\n", path);
char *new_path = strdup (path);
pthread_rwlock_wrlock (&paths_lock);
excluded_paths = g_list_append (excluded_paths, new_path);
entry->excluded_paths = g_list_append (entry->excluded_paths, new_path);
pthread_rwlock_unlock (&paths_lock);
}
return true;
Expand All @@ -547,6 +630,7 @@ parse_config_files (const char* config_dir)
struct dirent *config_file;
DIR *dp = NULL;
char *config_file_name = NULL;
sync_entry *entry;

/* open the sync config dir and read all the files in it to get sync paths */
dp = opendir (config_dir);
Expand Down Expand Up @@ -579,6 +663,8 @@ parse_config_files (const char* config_dir)
char *sync_path = NULL;
char *newline = NULL;
size_t n = 0;
entry = g_malloc0 (sizeof (sync_entry));

while (getline (&sync_path, &n, fp) != -1)
{
/* ignore empty lines or lines starting with '#' */
Expand All @@ -596,17 +682,25 @@ parse_config_files (const char* config_dir)
if (sync_path[0] == '!')
{
// Add an exclusion to syncing
add_excluded_path (sync_path + 1);
add_excluded_path (entry, sync_path + 1);
}
else
{
add_path_to_sync (sync_path);
add_path_to_sync (entry, sync_path);
}

free (sync_path);
sync_path = NULL;
}
fclose (fp);
if (!entry->paths && !entry->excluded_paths)
{
g_free (entry);
}
else
{
paths = g_list_append (paths, entry);
}
}
free (config_file_name);
}
Expand Down

0 comments on commit d630574

Please sign in to comment.