Skip to content

Commit

Permalink
Add Python stubs for rosbag2_py
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Sokolkov <[email protected]>
  • Loading branch information
r7vme committed Feb 26, 2024
1 parent 1564687 commit 3b0a5f4
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ jobs:
source /opt/ros/rolling/setup.sh && colcon test --mixin linters-skip --packages-select ${rosbag2_packages} --packages-skip rosbag2_performance_benchmarking --event-handlers console_cohesion+ --return-code-on-test-failure --ctest-args "-L xfail" --pytest-args "-m xfail"
working-directory: ${{ steps.action-ros-ci.outputs.ros-workspace-directory-name }}
shell: bash
- name: Is regeneration of Python stubs required?
run: |
rosbag2_path=$(colcon list -p --packages-select rosbag2)/..
sudo pip uninstall -y mypy
sudo apt update && sudo apt -y install mypy
source install/setup.sh
stubgen -p rosbag2_py -o ${rosbag2_path}/rosbag2_py
cd ${rosbag2_path}
git diff --exit-code
working-directory: ${{ steps.action-ros-ci.outputs.ros-workspace-directory-name }}
shell: bash
- uses: actions/upload-artifact@v1
with:
name: colcon-logs
Expand Down
20 changes: 20 additions & 0 deletions rosbag2_py/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# rosbag2_py

## Regenerating Python stub files (.pyi)

Python stub files allow to supply type-hinting information for binary Python modules (e.g. pybind-based).

In rosbag2_py stub files are generated with utility called `stubgen`.

To regenerate stub files
```
cd <workspace>
colcon build --packages-up-to rosbag2_py
source install/setup.sh
# Make sure rosbag2_py can be imported
python3 -c 'import rosbag2_py'
sudo apt update && sudo apt install mypy
cd <rosbag2 git repo>
stubgen -p rosbag2_py -o rosbag2_py
```
7 changes: 7 additions & 0 deletions rosbag2_py/rosbag2_py/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from rosbag2_py._compression_options import CompressionMode as CompressionMode, CompressionOptions as CompressionOptions, compression_mode_from_string as compression_mode_from_string, compression_mode_to_string as compression_mode_to_string
from rosbag2_py._info import Info as Info
from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers
from rosbag2_py._reindexer import Reindexer as Reindexer
from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, get_default_storage_id as get_default_storage_id
from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, bag_rewrite as bag_rewrite
from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers
35 changes: 35 additions & 0 deletions rosbag2_py/rosbag2_py/_compression_options.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import ClassVar

FILE: CompressionMode
MESSAGE: CompressionMode
NONE: CompressionMode

class CompressionMode:
__doc__: ClassVar[str] = ... # read-only
__members__: ClassVar[dict] = ... # read-only
FILE: ClassVar[CompressionMode] = ...
MESSAGE: ClassVar[CompressionMode] = ...
NONE: ClassVar[CompressionMode] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __setstate__(self, state: int) -> None: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

class CompressionOptions:
compression_format: str
compression_mode: CompressionMode
compression_queue_size: int
compression_threads: int
def __init__(self, compression_format: str = ..., compression_mode: CompressionMode = ..., compression_queue_size: int = ..., compression_threads: int = ...) -> None: ...

def compression_mode_from_string(arg0: str) -> CompressionMode: ...
def compression_mode_to_string(arg0: CompressionMode) -> str: ...
6 changes: 6 additions & 0 deletions rosbag2_py/rosbag2_py/_info.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import rosbag2_py._storage

class Info:
def __init__(self) -> None: ...
def read_metadata(self, arg0: str, arg1: str) -> rosbag2_py._storage.BagMetadata: ...
def read_metadata_and_output_service_verbose(self, arg0: str, arg1: str) -> None: ...
31 changes: 31 additions & 0 deletions rosbag2_py/rosbag2_py/_reader.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Any

class SequentialCompressionReader:
def __init__(self) -> None: ...
def get_all_message_definitions(self, *args, **kwargs) -> Any: ...
def get_all_topics_and_types(self, *args, **kwargs) -> Any: ...
def get_metadata(self, *args, **kwargs) -> Any: ...
def has_next(self) -> bool: ...
def open(self, arg0, arg1) -> None: ...
def open_uri(self, arg0: str) -> None: ...
def read_next(self) -> tuple: ...
def reset_filter(self) -> None: ...
def seek(self, arg0: int) -> None: ...
def set_filter(self, arg0) -> None: ...
def set_read_order(self, arg0) -> bool: ...

class SequentialReader:
def __init__(self) -> None: ...
def get_all_message_definitions(self, *args, **kwargs) -> Any: ...
def get_all_topics_and_types(self, *args, **kwargs) -> Any: ...
def get_metadata(self, *args, **kwargs) -> Any: ...
def has_next(self) -> bool: ...
def open(self, arg0, arg1) -> None: ...
def open_uri(self, arg0: str) -> None: ...
def read_next(self) -> tuple: ...
def reset_filter(self) -> None: ...
def seek(self, arg0: int) -> None: ...
def set_filter(self, arg0) -> None: ...
def set_read_order(self, arg0) -> bool: ...

def get_registered_readers() -> Set[str]: ...
5 changes: 5 additions & 0 deletions rosbag2_py/rosbag2_py/_reindexer.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import rosbag2_py._storage

class Reindexer:
def __init__(self) -> None: ...
def reindex(self, arg0: rosbag2_py._storage.StorageOptions) -> None: ...
212 changes: 212 additions & 0 deletions rosbag2_py/rosbag2_py/_storage.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from typing import ClassVar, Dict, List

import datetime

class BagMetadata:
bag_size: int
compression_format: str
compression_mode: str
custom_data: Dict[str,str]
duration: object
files: List[FileInformation]
message_count: int
relative_file_paths: List[str]
ros_distro: str
starting_time: object
storage_identifier: str
topics_with_message_count: List[TopicInformation]
version: int
def __init__(self, version: int = ..., bag_size: int = ..., storage_identifier: str = ..., relative_file_paths: List[str] = ..., files: List[FileInformation] = ..., duration: object = ..., starting_time: object = ..., message_count: int = ..., topics_with_message_count: List[TopicInformation] = ..., compression_format: str = ..., compression_mode: str = ..., custom_data: Dict[str,str] = ..., ros_distro: str = ...) -> None: ...

class ConverterOptions:
input_serialization_format: str
output_serialization_format: str
def __init__(self, input_serialization_format: str = ..., output_serialization_format: str = ...) -> None: ...

class Duration:
def __init__(self, seconds: int, nanoseconds: int) -> None: ...

class FileInformation:
duration: datetime.timedelta
message_count: int
path: str
starting_time: object
def __init__(self, path: str, starting_time: object, duration: object, message_count: int) -> None: ...

class MessageDefinition:
encoded_message_definition: str
encoding: str
topic_type: str
type_hash: str
def __init__(self, topic_type: str, encoding: str, encoded_message_definition: str, type_hash: str) -> None: ...

class MetadataIo:
def __init__(self) -> None: ...
def deserialize_metadata(self, arg0: str) -> BagMetadata: ...
def metadata_file_exists(self, arg0: str) -> bool: ...
def read_metadata(self, arg0: str) -> BagMetadata: ...
def serialize_metadata(self, arg0: BagMetadata) -> str: ...
def write_metadata(self, arg0: str, arg1: BagMetadata) -> None: ...

class QoS:
def __init__(self, history_depth: int) -> None: ...
def avoid_ros_namespace_conventions(self, arg0: bool) -> QoS: ...
def best_effort(self) -> QoS: ...
def deadline(self, arg0: Duration) -> QoS: ...
def durability(self, arg0: rmw_qos_durability_policy_t) -> QoS: ...
def durability_volatile(self) -> QoS: ...
def history(self, arg0: rmw_qos_history_policy_t) -> QoS: ...
def keep_all(self) -> QoS: ...
def keep_last(self, arg0: int) -> QoS: ...
def lifespan(self, arg0: Duration) -> QoS: ...
def liveliness(self, arg0: rmw_qos_liveliness_policy_t) -> QoS: ...
def liveliness_lease_duration(self, arg0: Duration) -> QoS: ...
def reliability(self, arg0: rmw_qos_reliability_policy_t) -> QoS: ...
def reliable(self) -> QoS: ...
def transient_local(self) -> QoS: ...

class ReadOrder:
reverse: bool
sort_by: ReadOrderSortBy
def __init__(self, sort_by: ReadOrderSortBy = ..., reverse: bool = ...) -> None: ...

class ReadOrderSortBy:
__doc__: ClassVar[str] = ... # read-only
__members__: ClassVar[dict] = ... # read-only
File: ClassVar[ReadOrderSortBy] = ...
PublishedTimestamp: ClassVar[ReadOrderSortBy] = ...
ReceivedTimestamp: ClassVar[ReadOrderSortBy] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __setstate__(self, state: int) -> None: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

class StorageFilter:
topics: List[str]
topics_regex: str
topics_regex_to_exclude: str
def __init__(self, topics: List[str] = ..., topics_regex: str = ..., topics_regex_to_exclude: str = ...) -> None: ...

class StorageOptions:
custom_data: Dict[str,str]
end_time_ns: int
max_bagfile_duration: int
max_bagfile_size: int
max_cache_size: int
snapshot_mode: bool
start_time_ns: int
storage_config_uri: str
storage_id: str
storage_preset_profile: str
uri: str
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str,str] = ...) -> None: ...

class TopicInformation:
message_count: int
topic_metadata: TopicMetadata
def __init__(self, topic_metadata: TopicMetadata, message_count: int) -> None: ...

class TopicMetadata:
id: int
name: str
offered_qos_profiles: List[QoS]
serialization_format: str
type: str
type_description_hash: str
def __init__(self, id: int, name: str, type: str, serialization_format: str, offered_qos_profiles: List[QoS] = ..., type_description_hash: str = ...) -> None: ...
def equals(self, arg0: TopicMetadata) -> bool: ...

class rmw_qos_durability_policy_t:
__doc__: ClassVar[str] = ... # read-only
__members__: ClassVar[dict] = ... # read-only
RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: ClassVar[rmw_qos_durability_policy_t] = ...
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: ClassVar[rmw_qos_durability_policy_t] = ...
RMW_QOS_POLICY_DURABILITY_UNKNOWN: ClassVar[rmw_qos_durability_policy_t] = ...
RMW_QOS_POLICY_DURABILITY_VOLATILE: ClassVar[rmw_qos_durability_policy_t] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __setstate__(self, state: int) -> None: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

class rmw_qos_history_policy_t:
__doc__: ClassVar[str] = ... # read-only
__members__: ClassVar[dict] = ... # read-only
RMW_QOS_POLICY_HISTORY_KEEP_ALL: ClassVar[rmw_qos_history_policy_t] = ...
RMW_QOS_POLICY_HISTORY_KEEP_LAST: ClassVar[rmw_qos_history_policy_t] = ...
RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: ClassVar[rmw_qos_history_policy_t] = ...
RMW_QOS_POLICY_HISTORY_UNKNOWN: ClassVar[rmw_qos_history_policy_t] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __setstate__(self, state: int) -> None: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

class rmw_qos_liveliness_policy_t:
__doc__: ClassVar[str] = ... # read-only
__members__: ClassVar[dict] = ... # read-only
RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: ClassVar[rmw_qos_liveliness_policy_t] = ...
RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: ClassVar[rmw_qos_liveliness_policy_t] = ...
RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT: ClassVar[rmw_qos_liveliness_policy_t] = ...
RMW_QOS_POLICY_LIVELINESS_UNKNOWN: ClassVar[rmw_qos_liveliness_policy_t] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __setstate__(self, state: int) -> None: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

class rmw_qos_reliability_policy_t:
__doc__: ClassVar[str] = ... # read-only
__members__: ClassVar[dict] = ... # read-only
RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: ClassVar[rmw_qos_reliability_policy_t] = ...
RMW_QOS_POLICY_RELIABILITY_RELIABLE: ClassVar[rmw_qos_reliability_policy_t] = ...
RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: ClassVar[rmw_qos_reliability_policy_t] = ...
RMW_QOS_POLICY_RELIABILITY_UNKNOWN: ClassVar[rmw_qos_reliability_policy_t] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
def __setstate__(self, state: int) -> None: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

def get_default_storage_id() -> str: ...
Loading

0 comments on commit 3b0a5f4

Please sign in to comment.