8. RPC-Manager
The RPC-Manager is used to establish a request-response pattern in NoPE
.
Description
The rpcManager
is essentially a service registry.
Service Registry
A service registry is a tool used to store and manage information about available services in a distributed system. It is an important component of microservices architectures, where applications are divided into smaller, independent services that communicate over the network.
A service registry serves as a central repository for metadata about each service, including its address, port number, protocol, and API version. When a service is started, it registers with the service registry, and when it is stopped, it is removed from it.
Other services in the architecture can then query the Service Registry to find out which services are available and how they can communicate. This reduces the complexity of managing distributed systems and improves scalability and flexibility.
Service Broker
A broker in the services world refers to a software tool or mechanism that acts as an intermediary between different services or applications. A broker is typically used in a service-oriented architecture (SOA) to facilitate and manage interaction and communication between different services.
A broker provides various functions, such as message routing and transformation, monitoring, and security management. The broker can also perform tasks such as caching messages and routing requests to the most appropriate service.
In an SOA environment, applications or services may communicate using different protocols and transports, and the broker acts as an intermediary to ensure that messages are exchanged correctly and reliably between the different systems. The broker can also help improve the scalability and flexibility of services by providing centralized control and management of service interactions.
Implementation of a service registry and broker in NoPE
by the rpcManager
.
A service in NoPE
is defined by an id
. This usually corresponds to a name with which the service is to be addressed.
In order to implement the required functionalities of a service registry, the rpcManager
has the following methods and attributes:
registerService
: This can be used to register services. These are then made available to all participants of the NoPE network.unregisterService
: This can be used to remove services from the network.- The
services
property provides an overview of which services are available (including frequency and their parameters and description). - The
serviceExists
method can be used to test whether the service is available. performCall
execute a service. All relevant communications are mapped by therpcManager
. The user does not know which runtime provides the service.- The execution leads to a so called
task
which can be aborted bycancelTask
. This leads to an exception at the calling unit. - If several service providers (NoPE-Runtime) are able to execute the service, the provider can be selected via a callback. For this purpose there are predefined `selectors
master
(seeconnectivityManager
) the master must execute the processfirst
: any provider executes the serives (the first in the list)dispatcher
: a specific dispatcher must run the service (defined by its id)host
: a dispatcher on the defined host.cpu-usage
: the dispatcher with the least CPU usagefree-ram
: The dispatcher with the lowest RAM usage
- services with
callbacks
can also be hosted via a plugin
- The execution leads to a so called
Connecting the rpcManager
in the NoPE network
The regristrated services of a rpcManager
are distributed in the network by the bridge
. This is done at each regristration or deregistration of the services.
In order to distribute the RPC requests within the network, the rpcManager
uses the bridge
to send messages. All rpcManagers
respond to these messages and identify whether they have been addressed (for this purpose such a message can address a special dispatcher). If so, the appropriate RPC manager creates a task and runs the service in a parallel thread. As soon as this is finished or an error occurs, the task is considered as finished. If an error occurs -> this is transmitted in the response message, so that the error must be handled in the calling element. Otherwise the result is stored and the calling element gets the result.
Since a communication overhead arises during the execution of services, all services needs to be implemented async
Interact with the RPC-Manager
Firstly we have to create a Dispatcher, that we are using to access the rpcManager
// First lets install nope using npm
import * as nope from "nope-js-node";
// Create a communicator:
// We will use the event layer (which just runs internally)
const communicator = nope.getLayer("event");
// Lets create our dispatcher
// 1. Dispatcher simulates our local system
const localDispatcher = nope.dispatcher.getDispatcher({
communicator,
id: "local"
}, {
singleton: false,
useBaseServices: false
});
// Lets wait for our element to be ready.
await localDispatcher.ready.waitFor();
// Create a short cut:
const rpcManager = localDispatcher.rpcManager;
We are now able to interact with the rpc Manager.
Create a Service
To show the functionality of the rpc-manager, we first create a helloworld service. It must be implemented async
.
const helloworld = async (greetings) => {
return "Hello " + greetings + "!";
};
We now register that service at our RPC-Manager. Therefore a service requries a unique id
. Additionally we must share a JSON
-Schema, describing the interface of the service.
await rpcManager.registerService(helloworld, {
id: "helloworld",
schema: {
"description": "Hello World Service",
"type": "function",
// To describe the used inputs of a function or serive we added the field "inputs" to the schema.
// It contains a list of all required inputs.
"inputs": [
{
// The Description of the Parameter
"description": "The name which should receive a Greeting",
// Its used name in the function (see the arguments of the function)
"name": "greetings",
// The Schema follows a default JSON-Schema
"schema": {
"type": "string"
}
}
],
// To describe the return of a function we added the field "outputs". It contains a
// JSON-Schema Object.
"outputs":{
"type": "string",
// We provide some extra Info for the other users.
"description": "The greeting Message!"
}
},
})
We can now check which services are present:
// Receivce only the ids
rpcManager.services.extractedKey;
This will return:
[ 'helloworld' ]
Receivce the services with the defintion:
// Receivce the services with the defintion:
console.log(
// Use JSON for pretty printing:
JSON.stringify(
rpcManager.services.data.getContent(),
undefined,
4
)
);
This will return:
[
{
"id": "helloworld",
"schema": {
"description": "Hello World Service",
"type": "function",
"inputs": [
{
"description": "The name which should receive a Greeting",
"name": "greetings",
"schema": {
"type": "string"
}
}
],
"outputs": {
"type": "string",
"description": "The greeting Message!"
}
}
}
]
Additionally we can ask the manager, whether a service with an id exists:
rpcManager.serviceExists("helloworld")
This will return:
true
Or we can as how many times the service is provided (currently only once, because we only have 1 dispatcher, we will change that later)
rpcManager.services.amountOf.get("helloworld")
Returns:
1
Or we could as which dispatcher is providing the service:
rpcManager.services.keyMappingReverse.get("helloworld")
Returns:
Set(1) { 'local' }
Because the service attribute is a mergedData
element, we can susbcribe to update or changes
.
Calling services
Know that we know, that we have a helloworld service we can use the rpc manager to call it.
rpcManager.performCall("helloworld", ["first Parameter"]).then(console.log);
NopePromise [Promise] { <pending> }
Hello first Parameter!
To simplify the call we can use the methoInterface
of the RPC-Manager
rpcManager.methodInterface.helloworld("first Parameter").then(console.log);
NopePromise [Promise] { <pending> }
Hello first Parameter!
Timeouts
Sometimes it is necessary, to call services with a defined timeout therefore we can use addtional options.If the Timeout is reached, an Error is thrown.
To show that behavior, we define a slow function:
const slowFunction = async (greetings) => {
// Delay the function
await nope.sleep(2500); // wait 2,5 [s]
return "Hello " + greetings + "!";
};
// Register the Function without a schema <- Bad Practice!!!
await rpcManager.registerService(slowFunction, {
id: "slow",
schema: {},
})
Now lets call the function again with our timelimits
rpcManager.performCall(
"slow",
[
"first Parameter"
], {
timeout: 0.5 // [ms]
}
).then(console.log).catch(console.error);
Error: TIMEOUT. The Service allowed execution time of 0.5[ms] has been excided
at Timeout._onTimeout (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\dispatcher\RpcManager\NopeRpcManager.js:699:50)
at listOnTimeout (node:internal/timers:559:17)
at processTimers (node:internal/timers:502:7)
Error Propagation
In the case that a service throws an exception, the Exeception is forwarded to the call. Lets see this behavior by defining a funciton that allways will throw an error.
const errorFunction = async (greetings) => {
// Delay the function
throw Error("Some internal Exception")
};
// Register the Function without a schema <- Bad Practice!!!
await rpcManager.registerService(errorFunction, {
id: "error",
schema: {},
});
Now, if we call the service, we receive an error
rpcManager.performCall(
"error",
[
"first Parameter"
]
).then(console.log).catch(console.error);
{
error: Error: Some internal Exception
at errorFunction (evalmachine.<anonymous>:4:11)
at NopeRpcManager._handleExternalRequest (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\dispatcher\RpcManager\NopeRpcManager.js:151:40)
at EventEmitter.<anonymous> (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\dispatcher\RpcManager\NopeRpcManager.js:295:19)
at EventEmitter.emit (node:events:527:28)
at Bridge._emit (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\communication\bridge.js:139:35)
at Bridge.emit (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\communication\bridge.js:47:21)
at C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\dispatcher\RpcManager\NopeRpcManager.js:691:43
at new Promise (<anonymous>)
at new NopePromise (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\promise\nopePromise.js:40:9)
at NopeRpcManager._performCall (C:\Users\m.karkowski\Documents\00-Repos\NoPE-JS\dist-nodejs\dispatcher\RpcManager\NopeRpcManager.js:628:21),
msg: 'Error: Some internal Exception'
}
Cancel Services
We have the posibility to define services that are able to be canceled. Lets see how it works:
const cancelFunction = function(parameter) {
// We must use a nope.NopePromise
const promise = new nope.NopePromise(
(resolve, reject) => {
// We resolve our Promise in 5000 ms
setTimeout(resolve, 5000)
}
)
promise.cancel = (reason) => {
console.log("Function canceled. Reason = ", reason);
}
return promise;
}
// Register the Function without a schema <- Bad Practice!!!
await rpcManager.registerService(cancelFunction, {
id: "cancel",
schema: {},
});
We can allways use the promise return by our perform call, to cancel a rpc request.
const promise = rpcManager.performCall(
"cancel",
[
"first Parameter"
]
);
promise.catch(console.error);
promise.then(console.log)
const timeout = setTimeout( _ => {
promise.cancel("Demo")
}, 100);
This leads to canceling the function:
Function canceled. Reason = Demo
Demo
UnhandledPromiseRejection: This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). The promise rejected with the reason "Demo".
Connect to RPC-Managers
The main idea is, to keep the interface the same. It doesnt madder, whether a service is running in a local dispatcher or in a remote dispatcher. This hides the distribution an unifies the interface.
Lets create an additional RPC-Manager simulating a external Manager.
// 2. Dispatcher simulates our remote system
const remoteDispatcher = nope.dispatcher.getDispatcher({
communicator,
id: "remote"
}, {
singleton: false,
useBaseServices: false
});
// Lets wait for our element to be ready.
await remoteDispatcher.ready.waitFor();
Now we can check which services are available for the 2. Dispatcher.
remoteDispatcher.rpcManager.services.data.getContent();
As we can see, all services are included:
[
{
id: 'helloworld',
schema: {
description: 'Hello World Service',
type: 'function',
inputs: [Array],
outputs: [Object]
}
},
{ id: 'slow', schema: {} },
{ id: 'error', schema: {} },
{ id: 'cancel', schema: {} }
]
Now lets use the second RPC-Manager to perform a call
remoteDispatcher.rpcManager.methodInterface.helloworld("first Parameter").then(console.log);
Hello first Parameter!
As you can see there is no difference, whether a service is running locally or externally.
Multiple Service Providers
But you may ask, what happens, if 2 providers are hosting services with the same id. Lets register our Helloworld service to the second dispatcher to discuss that situation.
const helloworldRemote = async (greetings) => {
return "Hello " + greetings + " from Remote!";
};
// Register the Function with the same schema
await remoteDispatcher.rpcManager.registerService(helloworldRemote, {
id: "helloworld",
schema: {
"description": "Hello World Service",
"type": "function",
// To describe the used inputs of a function or serive we added the field "inputs" to the schema.
// It contains a list of all required inputs.
"inputs": [
{
// The Description of the Parameter
"description": "The name which should receive a Greeting",
// Its used name in the function (see the arguments of the function)
"name": "greetings",
// The Schema follows a default JSON-Schema
"schema": {
"type": "string"
}
}
],
// To describe the return of a function we added the field "outputs". It contains a
// JSON-Schema Object.
"outputs": {
"type": "string",
// We provide some extra Info for the other users.
"description": "The greeting Message!"
}
},
});
If we now checkout our providers, we should be able, to see, that there are 2 providers:
remoteDispatcher.rpcManager.services.amountOf.get("helloworld")
This will return:
2
Now we can ask, which dispatcher is providing the services:
rpcManager.services.keyMappingReverse.get("helloworld")
Set(2) { 'local', 'remote' }
In our case - as desired - both dispatchers are able to execute the service. Now the question arises, what happes, if we perform the service?
remoteDispatcher.rpcManager.methodInterface.helloworld("first Parameter").then(console.log);
Hello first Parameter!
The Service is executed. But how do we controll, which provider should execute the request?
To control, which provider should execute the service, we are using a default provided strategy. This strategy uses the first dispatcher it finds providing this service. There are although some other predefined strategies like:
master
(seeconnectivityManager
) the master must execute the processfirst
: any provider executes the serives (the first in the list)dispatcher
: a specific dispatcher must run the service (defined by its id)host
: a dispatcher on the defined host.cpu-usage
: the dispatcher with the least CPU usagefree-ram
: The dispatcher with the lowest RAM usage
So lets now use the dispatcher
strategy and force the execution on our second dispatcher:
remoteDispatcher.rpcManager.methodInterfaceWithOptions.helloworld(
{
// We create an async callback which must return an dispatcher ID
selector: async () => {
return "remote"
}
},
"first Parameter"
).then(console.log);
This will lead to call the second dispatcher:
Hello first Parameter from Remote!
FAQ
- I provided services multiple times, but it is only listed once. Whats the problem?
- Make shure the json schema matches. If there is a difference, the RPC-Manager will list those services as
conflicts
because there are different descriptions for the service.
- Make shure the json schema matches. If there is a difference, the RPC-Manager will list those services as