Skip to content

Commit

Permalink
client, server, core: Add StreamService interface and TerminalService…
Browse files Browse the repository at this point in the history
… device (#1171)

* wip

* clean up shell on disconnect

* fix null reference

* remove debug logs

* use async queue in buffered buffer, add max buffer size before connection teardown

* Revert "use async queue in buffered buffer, add max buffer size before connection teardown"

This reverts commit 1b3c283.

* reimplement per feedback

* feedback
  • Loading branch information
bjia56 authored Nov 13, 2023
1 parent 1b15453 commit 8cb2e15
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 42 deletions.
4 changes: 2 additions & 2 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { DataChannelDebouncer } from "../../../plugins/webrtc/src/datachannel-de
import type { IOSocket } from '../../../server/src/io';
import { MediaObject } from '../../../server/src/plugin/mediaobject';
import { attachPluginRemote } from '../../../server/src/plugin/plugin-remote';
import type { ClusterObject, ConnectRPCObject } from '../../../server/src/plugin/connect-rpc-object';
import type { ClusterObject, ConnectRPCObject } from '../../../server/src/cluster/connect-rpc-object';
import { RpcPeer } from '../../../server/src/rpc';
import { createRpcDuplexSerializer, createRpcSerializer } from '../../../server/src/rpc-serializer';
import packageJson from '../package.json';
Expand Down Expand Up @@ -744,7 +744,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const serializer = createRpcDuplexSerializer({
write: data => clusterPeerSocket.send(data),
});
clusterPeerSocket.on('message', data => serializer.onData(data));
clusterPeerSocket.on('message', data => serializer.onData(Buffer.from(data)));

const clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => {
try {
Expand Down
1 change: 1 addition & 0 deletions plugins/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@scrypted/common": "file:../../common",
"@scrypted/sdk": "file:../../sdk",
"mime": "^3.0.0",
"node-pty-prebuilt-multiarch": "^0.10.1-pre.5",
"router": "^1.3.6",
"typescript": "^4.5.5"
},
Expand Down
14 changes: 14 additions & 0 deletions plugins/core/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { LauncherMixin } from './launcher-mixin';
import { MediaCore } from './media-core';
import { ScriptCore, ScriptCoreNativeId } from './script-core';
import { UsersCore, UsersNativeId } from './user';
import { TerminalService, TerminalServiceNativeId } from './terminal-service';

const { systemManager, deviceManager, endpointManager } = sdk;

Expand Down Expand Up @@ -39,6 +40,7 @@ class ScryptedCore extends ScryptedDeviceBase implements HttpRequestHandler, Eng
aggregateCore: AggregateCore;
automationCore: AutomationCore;
users: UsersCore;
terminalService: TerminalService;
localAddresses: string[];
storageSettings = new StorageSettings(this, {
localAddresses: {
Expand Down Expand Up @@ -83,6 +85,16 @@ class ScryptedCore extends ScryptedDeviceBase implements HttpRequestHandler, Eng
},
);
})();
(async () => {
await deviceManager.onDeviceDiscovered(
{
name: 'Terminal Service',
nativeId: TerminalServiceNativeId,
interfaces: [ScryptedInterface.StreamService],
type: ScryptedDeviceType.Builtin,
},
);
})();

(async () => {
await deviceManager.onDeviceDiscovered(
Expand Down Expand Up @@ -157,6 +169,8 @@ class ScryptedCore extends ScryptedDeviceBase implements HttpRequestHandler, Eng
return this.aggregateCore ||= new AggregateCore();
if (nativeId === UsersNativeId)
return this.users ||= new UsersCore();
if (nativeId === TerminalServiceNativeId)
return this.terminalService ||= new TerminalService();
}

async releaseDevice(id: string, nativeId: string): Promise<void> {
Expand Down
76 changes: 76 additions & 0 deletions plugins/core/src/terminal-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { ScryptedDeviceBase, StreamService } from "@scrypted/sdk";
import type { IPty, spawn as ptySpawn } from 'node-pty-prebuilt-multiarch';
import { createAsyncQueue } from '@scrypted/common/src/async-queue'

export const TerminalServiceNativeId = 'terminalservice';

export class TerminalService extends ScryptedDeviceBase implements StreamService {
async connectStream(input: AsyncGenerator<any, void>): Promise<AsyncGenerator<any, void>> {
const spawn = require('node-pty-prebuilt-multiarch').spawn as typeof ptySpawn;
const cp: IPty = spawn(process.env.SHELL as string, [], {});
const queue = createAsyncQueue<Buffer>();
cp.onExit(() => queue.end());

let bufferedLength = 0;
const MAX_BUFFERED_LENGTH = 64000;
cp.onData(async data => {
const buffer = Buffer.from(data);
bufferedLength += buffer.length;
const promise = queue.enqueue(buffer).then(() => bufferedLength -= buffer.length);
if (bufferedLength >= MAX_BUFFERED_LENGTH) {
cp.pause();
await promise;
if (bufferedLength < MAX_BUFFERED_LENGTH)
cp.resume();
}
});

async function* generator() {
try {
while (true) {
const buffers = queue.clear();
if (buffers.length) {
yield Buffer.concat(buffers);
continue;
}

yield await queue.dequeue();
}
}
finally {
cp.kill();
}
}

(async () => {
try {
for await (const message of input) {
if (!message) {
cp.kill();
return;
}
if (Buffer.isBuffer(message)) {
cp.write(message.toString());
continue;
}
try {
const parsed = JSON.parse(message.toString());
if (parsed.dim) {
cp.resize(parsed.dim.cols, parsed.dim.rows);
}
} catch {
cp.write(message.toString());
}
}
}
catch (e) {
this.console.log(e);
}
finally {
cp.kill();
}
})();

return generator();
}
}
14 changes: 7 additions & 7 deletions plugins/core/ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/core/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
"sass-loader": "^10.2.0",
"stylus": "^0.54.8",
"stylus-loader": "^3.0.1",
"typescript": "^4.8.2",
"typescript": "^5.2.2",
"vue-cli-plugin-vuetify": "^2.4.2",
"vue-cli-plugin-webpack-bundle-analyzer": "~4.0.0",
"vue-template-compiler": "^2.7.14",
Expand Down
50 changes: 21 additions & 29 deletions plugins/core/ui/src/components/builtin/ShellComponent.vue
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
<script>
import { Terminal } from "xterm";
import { FitAddon } from "xterm-addon-fit";
import eio from "engine.io-client";
import { getCurrentBaseUrl } from "../../../../../../packages/client/src";
import { createAsyncQueue } from "@scrypted/common/src/async-queue";
export default {
socket: null,
mounted() {
const term = new Terminal({
theme: this.$vuetify.theme.dark
Expand All @@ -28,36 +26,30 @@ export default {
term.open(this.$refs.terminal);
fitAddon.fit();
const baseUrl = getCurrentBaseUrl();
const eioPath = `engine.io/shell`;
const eioEndpoint = baseUrl ? new URL(eioPath, baseUrl).pathname : '/' + eioPath;
const options = {
path: eioEndpoint,
};
const rootLocation = `${window.location.protocol}//${window.location.host}`;
this.socket = eio(rootLocation, options);
this.setupShell(term);
},
methods: {
async setupShell(term) {
const termSvc = await this.$scrypted.systemManager.getDeviceByName("@scrypted/core").getDevice("terminalservice");
const termSvcDirect = await this.$scrypted.connectRPCObject(termSvc);
const queue = createAsyncQueue();
this.socket.send(JSON.stringify({ dim: { cols: term.cols, rows: term.rows } }));
queue.enqueue(JSON.stringify({ dim: { cols: term.cols, rows: term.rows } }));
this.socket.on("message", (data) => {
term.write(new Uint8Array(Buffer.from(data)));
});
term.onData(data => queue.enqueue(Buffer.from(data, 'utf8')));
term.onBinary(data => queue.enqueue(Buffer.from(data, 'binary')));
term.onResize(dim => queue.enqueue(JSON.stringify({ dim })));
term.onData((data) => {
this.socket.send(Buffer.from(data, 'utf8'));
});
const localGenerator = queue.queue;
const remoteGenerator = await termSvcDirect.connectStream(localGenerator);
term.onBinary((data) => {
// https://github.com/xtermjs/xterm.js/blob/2e02c37e528c1abc200ce401f49d0d7eae330e63/typings/xterm.d.ts#L859-L868
this.socket.send(Buffer.from(data, 'binary'));
});
term.on('resize', dim => {
this.socket.send(JSON.stringify({ dim }));
});
},
destroyed() {
this.socket?.close();
for await (const message of remoteGenerator) {
if (!message) {
break;
}
term.write(new Uint8Array(Buffer.from(message)));
}
}
},
};
</script>
1 change: 1 addition & 0 deletions sdk/polyfill/nodeptyprebuiltmultiarch.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions sdk/types/scrypted_python/scrypted_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class ScryptedInterface(str, Enum):
SecuritySystem = "SecuritySystem"
Settings = "Settings"
StartStop = "StartStop"
StreamService = "StreamService"
TamperSensor = "TamperSensor"
TemperatureSetting = "TemperatureSetting"
Thermometer = "Thermometer"
Expand Down Expand Up @@ -1343,6 +1344,13 @@ async def stop(self) -> None:
pass


class StreamService:
"""Generic bidirectional stream connection."""

async def connectStream(self, input: Any) -> Any:
pass


class TamperSensor:

tampered: TamperState
Expand Down Expand Up @@ -1810,6 +1818,7 @@ class ScryptedInterfaceMethods(str, Enum):
createRTCSignalingSession = "createRTCSignalingSession"
getScryptedUserAccessControl = "getScryptedUserAccessControl"
generateVideoFrames = "generateVideoFrames"
connectStream = "connectStream"

class DeviceState:

Expand Down Expand Up @@ -3004,6 +3013,13 @@ def applicationInfo(self, value: LauncherApplicationInfo):
"generateVideoFrames"
],
"properties": []
},
"StreamService": {
"name": "StreamService",
"methods": [
"connectStream"
],
"properties": []
}
}

Expand Down
7 changes: 7 additions & 0 deletions sdk/types/src/types.input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,12 @@ export interface VideoFrameGeneratorOptions extends ImageOptions {
export interface VideoFrameGenerator {
generateVideoFrames(mediaObject: MediaObject, options?: VideoFrameGeneratorOptions): Promise<AsyncGenerator<VideoFrame, void>>;
}
/**
* Generic bidirectional stream connection.
*/
export interface StreamService {
connectStream(input: AsyncGenerator<any, void>): Promise<AsyncGenerator<any, void>>;
}
/**
* Logger is exposed via log.* to allow writing to the Scrypted log.
*/
Expand Down Expand Up @@ -2022,6 +2028,7 @@ export enum ScryptedInterface {
LauncherApplication = "LauncherApplication",
ScryptedUser = "ScryptedUser",
VideoFrameGenerator = 'VideoFrameGenerator',
StreamService = 'StreamService',
}

/**
Expand Down
5 changes: 2 additions & 3 deletions server/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ export class ScryptedRuntime extends PluginHttp<HttpPluginData> {
* Handle incoming connections that will be
* proxied to a connectRPCObject socket.
*
* It is the responsibility of the caller of
* this function to verify the signature of
* clusterObject using the clusterSecret.
* Note that the clusterObject hash must be
* verified before connecting to the target port.
*/
this.connectRPCObjectIO.on('connection', connection => {
try {
Expand Down

0 comments on commit 8cb2e15

Please sign in to comment.