Using Business Logic as proxy for SSE

Hey everyone,

I’m trying to use server sent events (SSE) to stream data from OpenAI completion endpoint. I stumbled across this library for SSE. Working fine in the UI builder using the custom code block. However I can’t put my api key on the public client side, therefore I’m trying to achieve the same output with business logic. Unfortunately I’m totally puzzled how to do this. The above mentioned library works by creating custom events. The business logic throws an error regarding those custom events.

This is the code I’m using:

var source = new SSE(
      "https://api.openai.com/v1/engines/curie/completions",
      {
        headers: {
          "Content-Type": "application/json",
          Authorization: "Bearer " + <api-key>,
        },
        method: "POST",
        payload: JSON.stringify({
          prompt: "Say this is a test",
          max_tokens: 1000,
          stream: true,
        }),
      }
    );

source.addEventListener("message", function (e) {
      if(e.data=='[Done]'){
        //OpenAI answered with Done Token
        console.log("DONE!");
      } else {
          try{
            var payload = JSON.parse(e.data);
            console.log(payload.choices[0].text);
          } catch (error) {
            console.log("Error: "+error)
          }
      }
      

    });
    
source.stream();

I’m wondering if anyone knows how to achieve this, or if there’s a neat workaround to stream data from an enpoint and deliver it to the ui?

Best,
Anton

Hello @Anton_Henkelmann!

Could you please clarify what specific error you are encountering?

You might find this guide helpful:

How to use NPM modules in Codeless logic in API Services, Event Handlers and Timers

Regards,
Alexander

Hey @Alexander_Pavelko

Prior to the code shown above I’m running the following code. This is the actual code from the library mentioned before. I took it out of the library as I wasn’t sure on how to import it. Thank you for the helpful link :wink: However the problem lies not in installing the library. When I invoke the method I receive “CustomEvent is not defined”.

var SSE = function (url, options) {
  if (!(this instanceof SSE)) {
    return new SSE(url, options);
  }

  this.INITIALIZING = -1;
  this.CONNECTING = 0;
  this.OPEN = 1;
  this.CLOSED = 2;

  this.url = url;

  options = options || {};
  this.headers = options.headers || {};
  this.payload = options.payload !== undefined ? options.payload : '';
  this.method = options.method || (this.payload && 'POST' || 'GET');
  this.withCredentials = !!options.withCredentials;

  this.FIELD_SEPARATOR = ':';
  this.listeners = {};

  this.xhr = null;
  this.readyState = this.INITIALIZING;
  this.progress = 0;
  this.chunk = '';

  this.addEventListener = function(type, listener) {
    if (this.listeners[type] === undefined) {
      this.listeners[type] = [];
    }

    if (this.listeners[type].indexOf(listener) === -1) {
      this.listeners[type].push(listener);
    }
  };

  this.removeEventListener = function(type, listener) {
    if (this.listeners[type] === undefined) {
      return;
    }

    var filtered = [];
    this.listeners[type].forEach(function(element) {
      if (element !== listener) {
        filtered.push(element);
      }
    });
    if (filtered.length === 0) {
      delete this.listeners[type];
    } else {
      this.listeners[type] = filtered;
    }
  };

  this.dispatchEvent = function(e) {
    if (!e) {
      return true;
    }

    e.source = this;

    var onHandler = 'on' + e.type;
    if (this.hasOwnProperty(onHandler)) {
      this[onHandler].call(this, e);
      if (e.defaultPrevented) {
        return false;
      }
    }

    if (this.listeners[e.type]) {
      return this.listeners[e.type].every(function(callback) {
        callback(e);
        return !e.defaultPrevented;
      });
    }

    return true;
  };

  this._setReadyState = function(state) {
    var event = new CustomEvent('readystatechange');
    event.readyState = state;
    this.readyState = state;
    this.dispatchEvent(event);
  };

  this._onStreamFailure = function(e) {
    var event = new CustomEvent('error');
    event.data = e.currentTarget.response;
    this.dispatchEvent(event);
    this.close();
  }

  this._onStreamAbort = function(e) {
    this.dispatchEvent(new CustomEvent('abort'));
    this.close();
  }

  this._onStreamProgress = function(e) {
    if (!this.xhr) {
      return;
    }

    if (this.xhr.status !== 200) {
      this._onStreamFailure(e);
      return;
    }

    if (this.readyState == this.CONNECTING) {
      this.dispatchEvent(new CustomEvent('open'));
      this._setReadyState(this.OPEN);
    }

    var data = this.xhr.responseText.substring(this.progress);
    this.progress += data.length;
    data.split(/(\r\n|\r|\n){2}/g).forEach(function(part) {
      if (part.trim().length === 0) {
        this.dispatchEvent(this._parseEventChunk(this.chunk.trim()));
        this.chunk = '';
      } else {
        this.chunk += part;
      }
    }.bind(this));
  };

  this._onStreamLoaded = function(e) {
    this._onStreamProgress(e);

    // Parse the last chunk.
    this.dispatchEvent(this._parseEventChunk(this.chunk));
    this.chunk = '';
  };

  /**
   * Parse a received SSE event chunk into a constructed event object.
   */
  this._parseEventChunk = function(chunk) {
    if (!chunk || chunk.length === 0) {
      return null;
    }

    var e = {'id': null, 'retry': null, 'data': '', 'event': 'message'};
    chunk.split(/\n|\r\n|\r/).forEach(function(line) {
      line = line.trimRight();
      var index = line.indexOf(this.FIELD_SEPARATOR);
      if (index <= 0) {
        // Line was either empty, or started with a separator and is a comment.
        // Either way, ignore.
        return;
      }

      var field = line.substring(0, index);
      if (!(field in e)) {
        return;
      }

      var value = line.substring(index + 1).trimLeft();
      if (field === 'data') {
        e[field] += value;
      } else {
        e[field] = value;
      }
    }.bind(this));

    var event = new CustomEvent(e.event);
    event.data = e.data;
    event.id = e.id;
    return event;
  };

  this._checkStreamClosed = function() {
    if (!this.xhr) {
      return;
    }

    if (this.xhr.readyState === XMLHttpRequest.DONE) {
      this._setReadyState(this.CLOSED);
    }
  };

  this.stream = function() {
    this._setReadyState(this.CONNECTING);

    this.xhr = new XMLHttpRequest();
    this.xhr.addEventListener('progress', this._onStreamProgress.bind(this));
    this.xhr.addEventListener('load', this._onStreamLoaded.bind(this));
    this.xhr.addEventListener('readystatechange', this._checkStreamClosed.bind(this));
    this.xhr.addEventListener('error', this._onStreamFailure.bind(this));
    this.xhr.addEventListener('abort', this._onStreamAbort.bind(this));
    this.xhr.open(this.method, this.url);
    for (var header in this.headers) {
      this.xhr.setRequestHeader(header, this.headers[header]);
    }
    this.xhr.withCredentials = this.withCredentials;
    this.xhr.send(this.payload);
  };

  this.close = function() {
    if (this.readyState === this.CLOSED) {
      return;
    }

    this.xhr.abort();
    this.xhr = null;
    this._setReadyState(this.CLOSED);
  };
};

// Export our SSE module for npm.js
if (typeof exports !== 'undefined') {
  exports.SSE = SSE;
}

Hi @Anton_Henkelmann ,

I apologize, with the standard support procedures, we do not review custom code that you build with some external system, let alone debug it - it is clearly outside of the Backendless platform support we provide. if you believe the problem is with Backendless, try to minimize the issue and any related code that demonstrates it and re-share again.

Regards,
Mark

Hey Mark,

no need to apoligize - I totally understand that it is out of scope to review or debug custom user scripts. Luckily I didn’t request you to do so. I apoligize for the confusion and try to rephrase my question, so that it is easier to understand:

Originally I tried to find a way to integrate server sent events (SSE) into business logic. My goal is to communicate partial responses prior to the full response. Let’s asume my method runs for 60s. I probably don’t want my users to wait this long until they receive a response. To solve this I can send small updates in between. Using return would end the whole call, so I turned to SSE, which appears to be quite common.

To achieve this goal I need this to be two sided.

  1. Receive SSEs from an external service in business logic
  2. Push SSEs from said business logic to my own codeless UI

Currently I’m working on the first topic.
I wrote my JS script, which I placed inside the custom code block in the UI builder. Everything is working as expected. I receive partial responses prior to the full response. But it stops working as soon as I take the same script and copy it in a custom code block in a business logic api method.

I narrowed the code down to where the error occurs:

  this._setReadyState = function(state) {
    var event = new CustomEvent('readystatechange');
    event.readyState = state;
    this.readyState = state;
    this.dispatchEvent(event);
  };

The error thrown is “CustomEvent is not defined”, which leads me to belive the “new CustomEvent” command is not available in cloud business logic.

So my questions are:

  1. Am I right in asumming that code, working in UI builder, not necessarily works in business logic?
  2. Is there a known (an maybe even better way) to receive and push SSEs?

Best,
Anton

Hi @Anton_Henkelmann,

the reason why your code does not work in business logic is that it uses classes (like CustomEvent or XMLHttpRequest that are available only on client-side JavaScript (in the browser environment), which UI-Builder is. And business logic is running on the server side, where Node.js is used, thus it does not have some classes from the browser environment and works a little differently in general (since it is server env). Seems the library you’re using is not meant to be used in Node.js.

Regards,
Stanislaw

Awesome. Thank you.