Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High lvl api bugfix: #11

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
my_test_file.py

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/
33 changes: 29 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Built on top of the low level wrapper is a more usable api to perform common USB
def list_usb_devices(self, **kwargs):

# vid and pid must be str, returns True if device was correctly initialized and False otherwise
def init_winusb_device(self, vid, pid):
def init_winusb_device(self, vid, pid):

# Returns True if device was correctly closed and False otherwise.
def close_winusb_device(self):
Expand All @@ -82,7 +82,7 @@ def query_pipe(self, pipe_index):
# it returns a dict with the response and with the buffer under the keywords 'result' and 'buffer'
def control_transfer(self, setup_packet, buff=None):

#Send Bulk data to the Usb device, write_buffer must be a str buffer
#Send Bulk data to the Usb device, write_buffer must be of type "bytes"
def write(self, pipe_id, write_buffer):

#Read Bulk data from the Usb device, Returns of a buffer not greater than length_buffer length
Expand All @@ -99,8 +99,33 @@ pid = "pid_device"
api = WinUsbPy()
result = api.list_usb_devices(deviceinterface=True, present=True)
if result:
if api.init_winusb_device(pl2303_vid, pl2303_pid):
api.write(0x02, "hello")
if api.init_winusb_device(vid, pid):

# print device interface 0 descriptors
interface_descriptor = api.query_interface_settings(0)
if interface_descriptor != None:
print("bLength: " + str(interface_descriptor.b_length))
print("bDescriptorType: " + str(interface_descriptor.b_descriptor_type))
print("bInterfaceNumber: " + str(interface_descriptor.b_interface_number))
print("bAlternateSetting: " + str(interface_descriptor.b_alternate_setting))
print("bNumEndpoints " + str(interface_descriptor.b_num_endpoints))
print("bInterfaceClass " + str(interface_descriptor.b_interface_class))
print("bInterfaceSubClass: " + str(interface_descriptor.b_interface_sub_class))
print("bInterfaceProtocol: " + str(interface_descriptor.b_interface_protocol))
print("iInterface: " + str(interface_descriptor.i_interface))

# print device endpoint descriptors
pipe_info_list = map(api.query_pipe, range(interface_descriptor.b_num_endpoints))
for item in pipe_info_list:
print("PipeType: " + str(item.pipe_type))
print("PipeId: " + str(item.pipe_id))
print("MaximumPacketSize: " + str(item.maximum_packet_size))
print("Interval: " + str(item.interval))

api.write(0x02, b"hello") # send bulk packet on OUT endpoint 2

# close
api.close_winusb_device()
```

Real examples
Expand Down
42 changes: 27 additions & 15 deletions winusbpy/winusbpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,29 @@ def list_usb_devices(self, **kwargs):
return self.device_paths

def find_device(self, path):
return is_device(self._name, self._vid, self._pid, path)
return is_device(self._vid, self._pid, path, self._name)

def init_winusb_device(self, name, vid, pid):
def extract_device_from_vid_pid(self, vid, pid, dev_list):
name = None
path = None

for dev in dev_list:
dev_path = dev_list[dev]
if( dev_path.find("vid_{:04x}&pid_{:04x}".format( int(str(vid), 16), int(str(pid), 16)) ) != -1 ):
# dev found
name = dev
path = dev_path
break

return (name, path)

def init_winusb_device(self, vid, pid):
self._vid = vid
self._pid = pid
self._name = name
try:
ftr = filter(self.find_device, self.device_paths)
paths = [p for p in ftr]
path = self.device_paths[paths[0]]
except IndexError:

# extract device (name, path) from device_list
_ , path = self.extract_device_from_vid_pid(vid, pid, self.device_paths)
if(path == None):
return False

self.handle_file = self.api.exec_function_kernel32(CreateFile, path, GENERIC_WRITE | GENERIC_READ,
Expand Down Expand Up @@ -207,14 +219,14 @@ def control_transfer(self, setup_packet, buff=None):
def write(self, pipe_id, write_buffer):
write_buffer = create_string_buffer(write_buffer)
written = c_ulong(0)
self.api.exec_function_winusb(WinUsb_WritePipe, self.handle_winusb[self._index], c_ubyte(pipe_id), write_buffer,
self.api.exec_function_winusb(WinUsb_WritePipe, self.handle_winusb, c_ubyte(pipe_id), write_buffer,
c_ulong(len(write_buffer) - 1), byref(written), None)
return written.value

def read(self, pipe_id, length_buffer):
read_buffer = create_string_buffer(length_buffer)
read = c_ulong(0)
result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb[self._index], c_ubyte(pipe_id),
result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb, c_ubyte(pipe_id),
read_buffer, c_ulong(length_buffer), byref(read), None)
if result != 0:
if read.value != length_buffer:
Expand All @@ -237,12 +249,12 @@ class POLICY_TYPE:
policy_type = c_ulong(POLICY_TYPE.PIPE_TRANSFER_TIMEOUT)
value_length = c_ulong(4)
value = c_ulong(int(timeout * 1000)) # in ms
result = self.api.exec_function_winusb(WinUsb_SetPipePolicy, self.handle_winusb[self._index], c_ubyte(pipe_id),
result = self.api.exec_function_winusb(WinUsb_SetPipePolicy, self.handle_winusb, c_ubyte(pipe_id),
policy_type, value_length, byref(value))
return result

def flush(self, pipe_id):
result = self.api.exec_function_winusb(WinUsb_FlushPipe, self.handle_winusb[self._index], c_ubyte(pipe_id))
result = self.api.exec_function_winusb(WinUsb_FlushPipe, self.handle_winusb, c_ubyte(pipe_id))
return result

def _overlapped_read_do(self,pipe_id):
Expand All @@ -251,14 +263,14 @@ def _overlapped_read_do(self,pipe_id):
self.olread_ol.Offset = 0
self.olread_ol.OffsetHigh = 0
self.olread_ol.Pointer = 0
self.olread_ol.hEvent = 0
result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb, c_ubyte(pipe_id), self.olread_buf,
self.olread_ol.hEvent = 0
result = self.api.exec_function_winusb(WinUsb_ReadPipe, self.handle_winusb, c_ubyte(pipe_id), self.olread_buf,
c_ulong(self.olread_buflen), byref(c_ulong(0)), byref(self.olread_ol))
if result != 0:
return True
else:
return False

def overlapped_read_init(self, pipe_id, length_buffer):
self.olread_ol = Overlapped()
self.olread_buf = create_string_buffer(length_buffer)
Expand Down
10 changes: 7 additions & 3 deletions winusbpy/winusbutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_winusb_functions(windll):
# BOOL __stdcall WinUsb_ControlTransfer(_In_ WINUSB_INTERFACE_HANDLE InterfaceHandle,_In_ WINUSB_SETUP_PACKET SetupPacket, _Out_ PUCHAR Buffer,_In_ ULONG BufferLength,_Out_opt_ PULONG LengthTransferred,_In_opt_ LPOVERLAPPED Overlapped);
winusb_functions[WinUsb_ControlTransfer] = windll.WinUsb_ControlTransfer
# winusb_restypes[WinUsb_ControlTransfer] = BOOL
# winusb_argtypes[WinUsb_ControlTransfer] = [c_void_p, UsbSetupPacket, POINTER(c_ubyte), c_ulong, POINTER(c_ulong), LpOverlapped]
# winusb_argtypes[WinUsb_ControlTransfer] = [c_void_p, UsbSetupPacket, POINTER(c_ubyte), c_ulong, POINTER(c_ulong), LpOverlapped]

# BOOL __stdcall WinUsb_GetDescriptor(_In_ WINUSB_INTERFACE_HANDLE InterfaceHandle,_In_ UCHAR DescriptorType,_In_ UCHAR Index,_In_ USHORT LanguageID,_Out_ PUCHAR Buffer,_In_ ULONG BufferLength,_Out_ PULONG LengthTransferred);
winusb_functions[WinUsb_GetDescriptor] = windll.WinUsb_GetDescriptor
Expand Down Expand Up @@ -108,12 +108,12 @@ def get_winusb_functions(windll):
winusb_restypes[WinUsb_GetAssociatedInterface] = BOOL
winusb_argtypes[WinUsb_GetAssociatedInterface] = [c_void_p, c_ubyte, POINTER(c_void_p)]

winusb_dict["functions"] = winusb_functions
winusb_dict["functions"] = winusb_functions
winusb_dict["restypes"] = winusb_restypes
winusb_dict["argtypes"] = winusb_argtypes
return winusb_dict


def get_kernel32_functions(kernel32):
kernel32_dict = {}
kernel32_functions = {}
Expand Down Expand Up @@ -209,6 +209,10 @@ def get_setupapi_functions(setupapi):


def is_device(vid, pid, path, name=None):
# this is not working
# not used
# replaced by extract_device_from_vid_pid

if name and name.lower() == path.lower():
return True
if vid and pid:
Expand Down