安装
pip install rpyc
服务端代码
from rpyc import Service
from rpyc import ThreadedServer
class TestService(Service):
# 函数名必须以exposed_开头
def exposed_test(self, filename):
return filename
if __name__ == '__main__':
config={"allow_public_attrs":True, "sync_request_timeout": 10}
s = ThreadedServer(TestService, port=33132, protocol_config=protocol_config)
s.start()
客户端代码
import rpyc
# 默认超时时间30秒,可设置更长超时时间
config={"allow_public_attrs":True, "sync_request_timeout": 10}
c = rpyc.connect("127.0.0.1", 33132, config=config)
# 调用服务端函数时无需exposed_
filename = c.root.test('test.jpg')
# 调用后手动关闭连接
c.close()