Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix the resuming of MongoDB change streams in log-based replication mode #31

Draft
wants to merge 43 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a0280ce
Start scaffolding things to test change stream handling more specific…
menzenski Nov 17, 2023
6ea1f0c
some more refactoring, start adding resume strategy setting and tests
menzenski Nov 24, 2023
8d61b87
more tests, start splitting up the giant method
menzenski Nov 24, 2023
4f0b291
add mypy to precommit checks
menzenski Feb 15, 2024
48e9610
add debugging logging config file
menzenski Feb 15, 2024
d3aacb9
update sdk version
menzenski Feb 15, 2024
154b62e
did this fix it?
menzenski Feb 15, 2024
fa6ad4c
delete logging config
menzenski Feb 15, 2024
32f6a2a
delete mongodb testing things, they didn't work with change streams
menzenski Feb 15, 2024
2f9c015
remove start at operation time
menzenski Feb 16, 2024
650ec30
rm seed utils
menzenski Feb 16, 2024
c4335e5
bump version
menzenski Feb 16, 2024
b0e253a
remove backports cached property if we're on 3.8+ now
menzenski Feb 16, 2024
cf875b8
Fix change stream handling in MongoDB log-based replication mode
menzenski Nov 17, 2023
aaf7dd7
Merge branch 'change-stream-improvements' of github.com:menzenski/tap…
menzenski Feb 16, 2024
2a3c032
remove loguru, not used
menzenski Feb 16, 2024
8fac23a
rm seed command
menzenski Feb 16, 2024
96d6811
test with python 3.12
menzenski Feb 16, 2024
83fa85e
more python version updates
menzenski Feb 16, 2024
f2ac545
rm generic reference to re.match
menzenski Feb 16, 2024
b1f293b
rm conftest
menzenski Feb 16, 2024
0648fff
more strategy cleanup
menzenski Feb 16, 2024
2451443
add remaining change event fields
menzenski Feb 19, 2024
316daa4
Update streams.py
menzenski Feb 20, 2024
794c2da
add a whole bunch of mongo-replica-set-in-docker experiments
menzenski Feb 20, 2024
d7a6250
let's see if that gets a mongodb replica set stood up in github actions
menzenski Feb 20, 2024
a030376
Merge branch 'change-stream-improvements' of github.com:menzenski/tap…
menzenski Feb 20, 2024
9ec4ba4
quote version numbers
menzenski Feb 20, 2024
9f3d774
support 3.12 patch versions
menzenski Feb 21, 2024
a1b8257
don't always throw an exception
menzenski Feb 23, 2024
43272cd
re-lock plugins
menzenski Apr 23, 2024
4ff5b89
re-lock poetry dependencies, update local python version
menzenski Apr 23, 2024
4310b48
first pass at tests that use testcontainers-mongodb
menzenski Apr 23, 2024
a85f1dd
Lint and docstrings
menzenski Apr 23, 2024
981523d
add a real test case of the tap in incremental mode
menzenski Apr 23, 2024
1952c29
rm that block
menzenski Apr 23, 2024
b161e20
make integration_tests a package so we can import from it
menzenski Apr 23, 2024
f481319
try getting replica set to work
menzenski Apr 23, 2024
25f69ba
rm replica-set attempts, didn't work
menzenski Apr 23, 2024
d42a173
use loguru for all logging because I understand it
menzenski Apr 23, 2024
0207c76
various lint and typing fixes
menzenski Apr 23, 2024
95a88b3
add collection name to log messages and more explicitly handle the du…
menzenski Apr 23, 2024
909078d
set metadata on a dummy record
menzenski Apr 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
- 3.9
- '3.10'
- 3.11
- 3.12
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
steps:
- uses: actions/checkout@v3
- name: Install poetry
Expand Down
20 changes: 15 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,54 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
rev: v4.5.0
hooks:
- id: check-added-large-files
- id: check-toml
- id: check-vcs-permalinks
- id: detect-private-key
- id: end-of-file-fixer
exclude: mongodb-replicaset.key
- id: name-tests-test
exclude: tests/utilities/
- id: no-commit-to-branch
args:
- --branch
- main
- id: pretty-format-json
args:
- --autofix
- --indent
- '2'
- id: trailing-whitespace
- repo: https://github.com/lyz-code/yamlfix.git
rev: 1.9.0
rev: 1.16.0
hooks:
- id: yamlfix
- repo: https://github.com/adrienverge/yamllint.git
rev: v1.28.0
rev: v1.35.0
hooks:
- id: yamllint
args:
- --format
- parsable
- --strict
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.252
rev: v0.2.1
hooks:
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 24.2.0
hooks:
- id: black
language_version: python3
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
hooks:
- id: mypy
- repo: local
hooks:
- id: pylint
Expand Down
12 changes: 12 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ plugins:
- insert
- replace
- update
- name: change_stream_resume_strategy
kind: string
description: |
Only used when tap is run in log-based replication mode. This setting specifies how the tap creates a
new change stream on runs after the first. The default is `resume_after` (see
https://www.mongodb.com/docs/manual/changeStreams/#resumeafter-for-change-streams), which was added
in MongoDB 3.6 when the ChangeStream API was introduced. The `start_after` setting (see
https://www.mongodb.com/docs/manual/changeStreams/#startafter-for-change-streams) requires MongoDB
version 4.2 or greater. You may switch back and forth between `resume_after` and `start_after`
settings (provided the MongoDB version is at least 4.2) freely. If the value provided to this setting
is not compatible with the MongoDB version in use, this setting defaults to `resume_after`.
value: resume_after
- name: stream_maps
kind: object
description: |
Expand Down
2,058 changes: 1,060 additions & 998 deletions poetry.lock

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tap-mongodb"
version = "2.5.0"
version = "2.6.0"
description = "`tap-mongodb` is a Singer tap for MongoDB and AWS DocumentDB, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Matt Menzenski"]
Expand All @@ -14,11 +14,12 @@ packages = [
]

[tool.poetry.dependencies]
python = "<3.12,>=3.7.2"
singer-sdk = { version = "^0.31.1" }
python = "<3.12,>=3.8"
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
singer-sdk = { version = "^0.35.0" }
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
fs-s3fs = { version = "^1.1.1", optional = true }
pymongo = "^4.4.1"
"backports.cached-property" = { version = "^1.0.2" }
strenum = "^0.4.15"
typing-extensions = {version = "^4.9.0", python = "<3.10"}

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
Expand All @@ -28,7 +29,7 @@ black = "^23.1.0"
pyupgrade = "^3.3.1"
mypy = "^1.0.0"
isort = "^5.11.5"
singer-sdk = { version = "^0.31.1", extras = ["testing"] }
singer-sdk = { version = "^0.35.0", extras = ["testing"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes #27

pylint = "^3.0.0a6"

[tool.poetry.extras]
Expand Down Expand Up @@ -63,6 +64,10 @@ quote_basic_values = false
quote_keys_and_basic_values = false

[tool.ruff]
line-length = 120
target-version = "py311"

[tool.ruff.lint]
select = [
"E",
"F",
Expand All @@ -74,10 +79,8 @@ exclude = [
".secrets",
"output",
]
line-length = 120
target-version = "py311"

[tool.ruff.isort]
[tool.ruff.lint.isort]
known-third-party = [
"pymongo",
"singer_sdk",
Expand Down
16 changes: 4 additions & 12 deletions tap_mongodb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,22 @@

import sys
from logging import Logger, getLogger
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional

from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import PyMongoError
from singer_sdk._singerlib.catalog import CatalogEntry, MetadataMapping, Schema

from tap_mongodb.schema import SCHEMA
from tap_mongodb.types import MongoVersion

if sys.version_info[:2] < (3, 8):
from backports.cached_property import cached_property
else:
from functools import cached_property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're dropping support for Python 3.7 I think this can be simplified



try:
from typing import TypeAlias # pylint: disable=ungrouped-imports

MongoVersion: TypeAlias = Tuple[int, int]
except ImportError:
TypeAlias = None
MongoVersion = Tuple[int, int]


class MongoDBConnector:
"""MongoDB/DocumentDB connector class"""

Expand All @@ -43,7 +35,7 @@ def __init__( # pylint: disable=too-many-arguments
self._datetime_conversion: str = datetime_conversion.upper()
self._prefix: Optional[str] = prefix
self._logger: Logger = getLogger(__name__)
self._version: Optional[MongoVersion] = None
self._version: MongoVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make sense to move this annotation to the class level, ie

class MongoDBConnector:

    _version: MongoVersion


@cached_property
def mongo_client(self) -> MongoClient:
Expand All @@ -66,7 +58,7 @@ def database(self) -> Database:
return self.mongo_client[self._db_name]

@property
def version(self) -> Optional[MongoVersion]:
def version(self) -> MongoVersion:
"""Returns the MongoVersion that is being used."""
return self._version

Expand Down
Loading
Loading