Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I want to publish multiple service instances in the entry arry of the same packet. What should I do #25

Open
liuzhong789456 opened this issue Nov 28, 2024 · 0 comments

Comments

@liuzhong789456
Copy link

This is my code logic, but it doesn't do what I want:

class TimeEvgrp(SimpleEventgroup):
def init(self, service: Prot, _id, extend_data):
super().init(service, id=_id, interval=1)
self.service: Prot
self.extend_data = extend_data
self.update_task = asyncio.create_task(self.update())
# self.set_task = asyncio.create_task(self.set_value())

async def update(self):
    while True:
        for k,v in self.extend_data.items():
            value = self.service.set_payload(k)
            self.values[k] = value
            await asyncio.sleep(5)

class Prot(SimpleService):
service_id = 0x1080
version_major = 1
version_minor = 1

def __init__(self, instance_id, extend_data, main_info):
    super().__init__(instance_id)
    self.extend_data = extend_data
    self.payload = b""
    self.service_id = main_info['service_id']
    self.version_major = main_info['version_major']
    self.version_minor = main_info['version_minor']

    for i in main_info['groups_id']:
        self.evgrp_obj = TimeEvgrp(self, i, self.extend_data)
        self.register_eventgroup(self.evgrp_obj)

def set_payload(self,methid) -> bytes:
    data = self.extend_data
    payload = list(data[methid])
    button_value = bytes(payload)
    return button_value

def get_payload(self):
    return self.payload

def method_get_time(
        self, someip_message: SOMEIPHeader, addr: _T_SOCKNAME
) -> typing.Optional[bytes]:
    # only handle empty get requests
    print(someip_message.payload)
    # if someip_message.payload:
    #     raise MalformedMessageError
    #
    return self.get_payload()

def method_set_time(self, v) -> typing.Optional[bytes]:
    try:
        self.set_payload(v)
        return b""
    except ValueError as exc:
        raise MalformedMessageError from exc

服务发现协议的单例管理类,不然发布多个服务会导致端口冲突

class ServiceDiscoveryManager:
_instance = None

def __new__(cls, *args, **kwargs):
    if not cls._instance:
        cls._instance = super(ServiceDiscoveryManager, cls).__new__(cls)
        cls._instance.created = False
    return cls._instance

async def create_endpoints(self, local_addr, multicast_addr, sd_port, multicast_port):
    if not self.created:
        self.sd_trsp_u, self.sd_trsp_m, self.sd_prot = await ServiceDiscoveryProtocol.create_endpoints(
            family=socket.AF_INET,
            local_addr=local_addr,
            multicast_addr=multicast_addr,
            local_port=sd_port,
            multicast_port=multicast_port,
            ttl=1
        )
        self.created = True
    return self.sd_trsp_u, self.sd_trsp_m, self.sd_prot

def setup_log(fmt="", **kwargs):
try:
import coloredlogs # type: ignore[import]
coloredlogs.install(fmt="%(asctime)s,%(msecs)03d " + fmt, **kwargs)
except ModuleNotFoundError:
logging.basicConfig(format="%(asctime)s " + fmt, **kwargs)
logging.info("install coloredlogs for colored logs :-)")

def eth_service_release(**kwargs):

asyncio.run(run(**kwargs))

class AsyncServiceThread(QThread):
finished = pyqtSignal()
prot_signal = pyqtSignal(object) # 定义一个信号,用于发送 prot
stop_requested = False # 控制循环的标志

def __init__(self, **kwargs):
    super().__init__()
    self.kwargs = kwargs

async def run_async(self):
    sd_manager = ServiceDiscoveryManager()
    public_data = self.kwargs['public']
    services_data = dict(self.kwargs['service_data'])
    sd_trsp_u, sd_trsp_m, sd_prot = await sd_manager.create_endpoints(
        local_addr=public_data['local_addr'],
        multicast_addr=public_data['multicast_addr'],
        sd_port=public_data['sd_port'],
        multicast_port=public_data['multicast_port']
    )

    sd_prot.timings.CYCLIC_OFFER_DELAY = 1

    for servce_id,service_info in services_data.items():
        main_info = {
            'service_id': servce_id,
            'version_major': service_info['version_major'],
            'version_minor': service_info['version_minor'],
            'groups_id': service_info['groups_id'],
        }

        prot = await Prot.start_datagram_endpoint(
            instance_id=service_info['instance_id'],
            announcer=sd_prot.announcer,
            local_addr=(public_data['local_addr'], public_data['local_port']),
            extend_data=service_info['extend_data'],
            main_info=main_info
        )
        sd_prot.start()  # 启动服务发现协议
    self.prot_signal.emit(prot)  # 在这里发出 prot 信号

    try:
        while not self.stop_requested:  # 根据标志控制循环
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        pass
    finally:
        sd_prot.stop()  # 停止服务发现协议
        sd_trsp_u.close()  # 假设有 close 方法
        sd_trsp_m.close()  # 假设有 close 方法
        prot.stop()  # 假设 prot 有 stop 方法

def run(self):
    asyncio.run(self.run_async())
    self.finished.emit()  # 发出完成信号
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant