Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of concurrent DNS lookups in Machine Tracker #2917

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/+2669.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix Machine Tracker DNS search crashing from exhausting all available file descriptors
58 changes: 39 additions & 19 deletions python/nav/asyncdns.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from IPy import IP
from twisted.names import dns
from twisted.names import client
from twisted.internet import defer
from twisted.internet import defer, task

# pylint: disable=E1101
from twisted.internet import reactor
Expand All @@ -46,6 +46,9 @@
from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError


BATCH_SIZE = 100


def reverse_lookup(addresses):
"""Runs parallel reverse DNS lookups for addresses.

Expand Down Expand Up @@ -75,21 +78,31 @@
)
self.results = defaultdict(list)
self._finished = False
self._errors = []

def resolve(self, names):
"""Resolves DNS names in parallel"""
self._finished = False
self.results = defaultdict(list)

deferred_list = []
for name in names:
for deferred in self.lookup(name):
deferred.addCallback(self._extract_records, name)
deferred.addErrback(self._errback, name)
deferred_list.append(deferred)

deferred_list = defer.DeferredList(deferred_list)
deferred_list.addCallback(self._parse_result)
self._finished = False
self._errors = []

def lookup_names():
for name in names:
for deferred in self.lookup(name):
deferred.addCallback(self._extract_records, name)
deferred.addErrback(self._errback, name)
deferred.addCallback(self._save_result)
yield deferred

# Limits the number of parallel requests to BATCH_SIZE
coop = task.Cooperator()
work = lookup_names()
deferred_list = defer.DeferredList(
[
coop.coiterate(work).addErrback(self._save_error)
for _ in range(BATCH_SIZE)
]
)
deferred_list.addCallback(self._finish)

while not self._finished:
Expand All @@ -98,6 +111,10 @@
# iteration to ensure the resolver library closes its UDP sockets
reactor.iterate()

# raise first error if any occurred
for error in self._errors:
raise error

return dict(self.results)

def lookup(self, name):
Expand All @@ -108,19 +125,22 @@
def _extract_records(result, name):
raise NotImplementedError

def _parse_result(self, result):
"""Parses the result to the correct format"""
for _success, (name, response) in result:
if isinstance(response, Exception):
self.results[name] = response
else:
self.results[name].extend(response)
def _save_result(self, result):
name, response = result
if isinstance(response, Exception):
self.results[name] = response
else:
self.results[name].extend(response)

Check warning on line 133 in python/nav/asyncdns.py

View check run for this annotation

Codecov / codecov/patch

python/nav/asyncdns.py#L133

Added line #L133 was not covered by tests

@staticmethod
def _errback(failure, host):
"""Errback"""
return host, failure.value

def _save_error(self, failure):
"""Errback for coiterator. Saves error so it can be raised later"""
self._errors.append(failure.value)

def _finish(self, _):
self._finished = True

Expand Down
Loading