Nodejs Asynchronous programming is an essential part of building efficient and user-friendly Node.js applications. However, properly handling asynchronous code in a Node addon for a Node.js application can be challenging. This blog will talk about how to call a Nodejs asynchronous function in the C++ Native code

Let’s introduce the case we have encountered first. We are attempting to use a Pulsar consumer message listener from C++ native code to call a Node.js function when triggered events occur in the Pulsar consumer. The interface of this message listener is part of the Pulsar C client library, and we have set up a C++ function to hook this listener. The user function, which is a Node.js function, will be called in that C++ function. Here is a simplified workflow for this use case:

  1. Pulsar consumer receives the trigger event
  2. Pulsar consumer notifies the message listener written in C++ native code
  3. The message listener calls the JS function
  4. The user JS function processes the event asynchronously
  5. The message listener waits for the completion of the JS function

Everything works as expected when the user JS function is synchronous. However, when we try to use asynchronous functions like the following:

listener: async (message, messageConsumer) => {
          await new Promise((resolve) => setTimeout(resolve, 10));
          consumer1Recv += 1;
          await messageConsumer.acknowledge(message);
        },

The message listener does not wait for the completion of the user JS function. This is because the asynchronous JS function returns a promise object and executes the logic asynchronously. As a result, step 5 is actually waiting for the promise object instead of the completion of the user JS function.

This blog post discusses how to resolve this issue. We need to first determine when the user JS function has completed and then figure out how to set a guard in the C++ native code to wait until the user JS function has fully completed. And we will also talk about how to handle errors properly.

The Workflow for Invoking the JS Function

Let’s start by taking a look at the overview of the workflow for invoking the JS function. We added a configure method to allow users to pass their JS callback function to the C++ native code:

Napi::Function jsFunction;
Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
        consumerConfig.Env(), jsFunction, "Listener Callback", 1,
        1, (void *)NULL, FinalizeListenerCallback, listener);

Assuming that jsFunction is passed from the user JS code, we create a ThreadSafeFunction for this jsFunction. The Napi::ThreadSafeFunction type provides APIs for threads to communicate with the Nodejs’s main thread to invoke JavaScript functions on their behalf. [0] For more documentation on Napi::ThreadSafeFunction::New, check here.

When a triggered event occurs, we invoke the callback that we created above in our implementation of the message listener:

callback.BlockingCall(dataPtr, MessageListenerProxy);
callback.Release();

When calling the BlockingCall, the current thread will be blocked until the actual invoking of the JS function becomes available in the queue inside the Node.js thread. [1] The MessageListenerProxy is a pointer to a function that we will call the internal JS function.

Here is the simplified implementation of our MessageListenerProxy:

void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
  Napi::Object msg = Message::NewInstance({}, data->cMessage);
  jsCallback.Call({ msg }});
}

The above is an overview of the workflow. The issue arises from jsCallback.Call({ msg }});. When the jsCallback is an asynchronous function, the jsCallback.Call({ msg }}) will return immediately instead of waiting for the user function to complete. [2]

Use Nodejs Promise to reach the point where the user function is finished

Fortunately, the jsCallback.Call will return a Node.js Promise object for us to handle if the function is asynchronous. We can handle the Promise the same way we did in JavaScript. There is an inner method in Promise called then. We can use it to register a callback for the Promise.

Here is how to achieve it:

Napi::Value ret = jsCallback.Call({msg});
if (ret.IsPromise()) {
  Napi::Promise promise = ret.As<Napi::Promise>();
  Napi::Value thenValue = promise.Get("then");
  Napi::Function then = thenValue.As<Napi::Function>();
  Napi::Function callback =
      Napi::Function::New(env, [](const Napi::CallbackInfo &info) { 
          // the point where the user function is finished
        });
  then.Call(promise, {callback});
}

We need to check if the return value is a Promise or not to also handle the case of a synchronous function. We create a new Napi::Function as a callback. Then we register it to Promise.then().

Use std::promise to Wait for the Result

Now, we can use std::promise to set up a guard for it. Here is a simple sample code to set up a guard:

std::promise<void> promise;
std::future<void> future = promise.get_future();
Napi::Function callback =
      Napi::Function::New(env, [&promise](const Napi::CallbackInfo &info) { 
          promise.set_value();
        });
then.Call(promise, {callback});

// Will be blocked until the jsCallback is finished.
future.wait();

It looks like we can now wait for the result of the asynchronous JS function. However, this code is also incorrect. It will block the entire main thread of the Node.js! Running this code will block your entire program.

Don’t Wait in the Node.js Thread

To fix the above problem, we need to understand the thread context changes throughout the workflow.

The message listener is running on the thread coming from the listener thread pool inside the Pulsar C client library. Then we use ThreadSafeFunction to change the calling context into a more thread-safe context: the Node.js main thread context. We are calling the inner JS function on the Node.js main thread. Therefore, we need to wait for the std::future outside the Node.js main thread.

We need to move the guard that we set above to the message listener implementation.

std::promise<void> promise;
std::future<void> future = promise.get_future();
MessageListenerProxyData *dataPtr =
    new MessageListenerProxyData(cMessage, [&promise]() { promise.set_value(); });
listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
listenerCallback->callback.Release();

future.wait();
delete dataPtr;

We use MessageListenerProxyData to pass the lambda callback to MessageListenerProxy. And in MessageListenerProxy, we can simply call this callback:

void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
  Napi::Object msg = Message::NewInstance({}, data->cMessage);
  Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
  if (ret.IsPromise()) {
    Napi::Promise promise = ret.As<Napi::Promise>();
    Napi::Value thenValue = promise.Get("then");
    if (thenValue.IsFunction()) {
      Napi::Function then = thenValue.As<Napi::Function>();
      Napi::Function callback =
          Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
      then.Call(promise, {callback});
      return;
    }
  }
  data->callback();
}

Note that we also set the guard even if the jsCallback is a synchronous function. Because we can’t determine if it’s synchronous or not before executing the BlockingCall. So we still need to execute data->callback() when it’s synchronous.

Error handling

Similarly, we can also use promise.catch() to handle the error that is thrown from the Nodejs code asynchronously.

Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
ret = catchFunc.Call(promise, {Napi::Function::New(env, [](const Napi::CallbackInfo &info) {
                             Napi::Error error = info[0].As<Napi::Error>();
                             LOG_INFO(error.what())
                             data->callback();
                           })});

Here, we use info[0].As<Napi::Error>() to get the first parameter of promise.catch() which is of type Error. error.what() will return the message of the error.

Finally, we can optimize our code by moving the data->callback to the promise.finally() to avoid calling the data->callback twice:

promise = ret.As<Napi::Promise>();
Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();

finallyFunc.Call(
    promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })});

In conclusion, properly handling asynchronous code in a C++ addon for a Node.js application can be tricky. It requires understanding the thread context changes and properly managing Promise objects and guards. By using ThreadSafeFunction and std::promise, we can ensure that our addon is thread-safe and does not block the Node.js main thread. Properly handling asynchronous code in a C++ addon is essential for creating efficient and reliable Node.js applications.