Pub/Sub messaging

Hello, I’m trying to implement pub/sub messaging for chat in my android app.

I subscribed like that

https://cannypancake.backendless.app/api/messaging/liveChat/subscribe

with this body

{
  "selector": {
    "discussionId": "36"
  }
}

and I have this response

{
  "subscriptionId": "FBE6B578-3A3E-4717-FF76-7100F43F5400"
}

Then, I get the messages with this

https://cannypancake.backendless.app/api/messaging/liveChat/FBE6B578-3A3E-4717-FF76-7100F43F5400

with a kotlin flow

private fun detectNewMessagesForChat(subscriptionId:String, discussionId:Long) {
        viewModelScope.launch {
            notificationRepository.getChatMessagesPubSub(
                subscriptionId = subscriptionId
            ).collect { networkResult ->
                when (networkResult.status) {
                    NetworkStatus.SUCCESS -> {
                        //we get messages and refresh the UI
                        Logger.d(tag = "debaga"){ "in detect message for discussion: $discussionId" }
                       //todo debago refresh UI
                        getChatMessages()
                    }

                    NetworkStatus.ERROR -> {

                    }

                    NetworkStatus.LOADING -> {

                    }
                }


            }
        }
    }

The first time I receive this but it’s normal

{
  "messages": []
}

Then I publish a message on this channel like this

POST/ https://cannypancake.backendless.app/api/messaging/liveChat

with this body

{
  "message": "ok",
  "headers": {
    "discussionId": "36"
  }
}

and also discussionId:36 in the header of the call (because I’m not sure if it’s only in the body or also in the header.

I receive that

{
  "errorMessage": null,
  "messageId": "message:349DAC1E-B9A5-44AC-A1C0-4CABBC61E7E0",
  "status": "scheduled"
}

The status is scheduled, and I didn’t receive anymore in my detectNewMessageForChat.

Why the status is scheduled?

When I try to send the message from backendless directly, I didn’t receive anything too. In my Kotlin code, when I launch the detectNewMessageForChat flow, should I receive the publish inside all time my viewmodel is active?

Thanks you for precisions

Hi @Indevers ,

Regarding your subscription attempt. It looks like you specified “selector” in the wrong format. According to the documentation value for “selector” field should be a valid SQL-92 string. In your case selector should be discussionId='36'

Regarding publishing.
It is enough to send data which is used in “selector” in “headers” field of the body as described in documentation

The status is scheduled, and I didn’t receive anymore in my detectNewMessageForChat.
Why the status is scheduled?

This is expected since messages are not sent during publishing but queued for sending in the background.
You can check message status using this endpoint.

In your case you do not receive messages since you formed invalid “selector”. Could you please correct selector and try again to test your logic?

Regards, Andriy

Hi @Andriy_Konoz Ok so I change the logic to have th SQL-92 string valid format, I have this in my body request:

{
  "selector": {
    "discussionId": '39'
  }
}

but I have this error message now

{
  "code": 8002,
  "message": "Could not parse request with message: Error decoding json body: com.fasterxml.jackson.core.JsonParseException: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 3, column: 22], status code 400, headers POST /FACE4E98-...F/messaging/liveChat/subscribe",
  "errorData": {}
}

@Indevers ,

You need to pass string to the “selector” field.
Your body should look in the next way:

{
  "selector": "discussionId='39'"
}

Could you try to use this body instead?

Regards, Andriy

@Andriy_Konoz

Ok so I did this for subscribe (in the body)

{
  "selector": "discussionId = '41'"
}

I receive my subscription body. I do this

https://cannypancake.backendless.app/api/messaging/liveChat/9CFB84B5-3F8A-C88A-FF8D-179619FB5E00

to listen the pub/sub
I receive this the ifrst time but it’s normal

{
  "messages": []
}

Then I publish something on this channel

{
  "message": "ok",
  "headers": {
    "discussionId": "41"
  }
}

but I receive again scheduled
{
“errorMessage”: null,
“messageId”: “message:CC172543-A8C4-4118-8036-9248F065311F”,
“status”: “scheduled”
}

I do this with postman to get the message status and I receive this

{
    "sendingTimeInMillis": 0,
    "failedSendsAmount": 1,
    "errorMessage": "[ deviceId: \"null\", message: \"No devices for push notification\"]\n",
    "messageId": "message:CC172543-A8C4-4118-8036-9248F065311F",
    "successfulSendsAmount": 0,
    "status": "failed"
}

So maybe I didn’t understand the principle of pub/sub messaging. I thought that it permit to receive as a signal when a user publish on the channel, which permit me to refresh my UI inside my app. But it seems to be just like a notification where a device has to be registered? is it that?

@Indevers ,

I thought that it permit to receive as a signal when a user publish on the channel, which permit me to refresh my UI inside my app

You understood principle correctly. Looks like status endpoint does’t work properly for pub/sub. That endpoint is used in most of cases when you send push notifications via Backendless Messaging Service. I will create an internal ticket to fix that problem.

In your case you can skip status check.

As I said before, you need simply to build correctly selector value and then send field which is used in selector in message “headers” field.

Here is example from my test app (domain is omitted) using REST API

  1. Publish message to the channel
curl --location --request POST '<app domain>/api/messaging/demo' \
--header 'Content-Type: application/json' \
--data-raw '{
    "message": "test message",
    "headers": {
        "field": "1q"
    }
}'
  1. Subscribe with selector:
curl --location --request POST '<app domain>/api/messaging/demo/subscribe' \
--header 'Content-Type: application/json' \
--data-raw '{
    "selector": "field='\''q'\''"
}'
  1. Read messages using subscription ID from step #2
curl --location --request GET '<app domain>/api/messaging/demo/<subscription ID>'

Also you can use our Java Client SDK to simplify interaction with Backendless API

Regards, Andriy

Ok @Andriy_Konoz last question about this.
If I do this in my code
-location --request GET ‘/api/messaging/demo/’

when I use kotlin flow for example, I’m supposed listen all events about the publish on this channel? That’s it? So if during 5 minutes we are talking together in a chat with someone, when the other user send me a message and I publish the message on the channel, I could receive an update (like a websocket?) in my kotlin flow to update my chat UI? I didn’t understand if I receive update each time a message is published on the channel I subscribe and how I receive this message. Thanks you

@Indevers ,

Not sure about case with Kotlin Flows. If you want to receive message on the client as soon as it was published then you can use Backendless Java Client SDK since it will be the simplest way. SDK has socket-based method for this.
With plain REST you need constantly call GET <app domain>/api/messaging/demo/<subscription ID> endpoint to obtain new messages.

Regards, Andriy

1 Like

Hello @Andriy_Konoz last question about that because I have something very weird.

I implemented the SDK realtime messaging like this

actual fun subscribeChat(discussionId: Long,update:()->Unit){

    val channel = Backendless.Messaging.subscribe(LiveChatMessagingChannelName)
    Logger.d(tag = "debaga") { "in subscribe chat, discussion Id is: $discussionId" }
    val selector = "discussionId = '$discussionId'"
    channel.addMessageListener(selector, object : AsyncCallback<String> {
        override fun handleResponse(stringMessage: String?) {
            Logger.d(tag = "debaga") { "Received string message $stringMessage" }
            update()
        }

        override fun handleFault(fault: BackendlessFault?) {
            Logger.d(tag = "debaga") { "Error processing a message $fault" }
        }
    })
}

actual fun publishInChatChannel(discussionId: Long){

    val publishOptions = PublishOptions().apply {
        putHeader("discussionId", "$discussionId")
    }

    Backendless.Messaging.publish(BackendlessMessageRequest, publishOptions, object : AsyncCallback<MessageStatus> {
        override fun handleResponse(response: MessageStatus?) {
            Logger.d(tag = "debaga") { "message has been published" }
        }

        override fun handleFault(fault: BackendlessFault?) {
            Logger.d(tag = "debaga") { "Server reported an error $fault" }
        }
    })

}

I launched the app on two devices which have the same codebase. Both subcribe successfully. In my code, when I pass into the update:()->Unit I refresh my UI.
Device A succeed all time with that (when the device B send something, it refresh)
Device B never succeed (whn the device A send something, it never refresh)
The both have the same codebase.

I didn’t understand. Could the device be different for this?

@Indevers ,

Do you see any errors related to “subscribe”/“addMessageListener” operations on the side of device B or to the “publish” operation on the side of device A?

Since code is identical for both devices, it looks like the problem somewhere near those methods calls.

Regards, Andriy

No @Andriy_Konoz I always have this above when I published

But for the
Logger.d(tag = “debaga”) { “Received string message $stringMessage” }

I didn’t enter in, both from device A or B. But it work on device A.

@Indevers ,

Could you please try to subscribe without selector to the channel and check if messages start to arrive on device B?

Regards, Andriy

@Andriy_Konoz

okay, I actually figured out why I have one of the devices working. In fact, my device A is registered in the devices and receives notifications (every time a message is sent to it). And device B doesn’t have notifications enabled, so it doesn’t receive the push. I have an internal mechanism so that when I receive a message and I’m in the chat, it refreshes me, but when I’m not in the chat I receive a push. In fact, the pub/sub doesn’t work at all, but the pussh I send simultaneously does. So I never get the Received message log from the pub/sub but it refreshes because of my internal mechanism.

So I don’t understand why the pub/sub doesn’t work. I don’t have a trust certificate, so I removed the selector to try. The publish works fine but I never receive anything.

@Indevers ,

Did you initialized Java SDK with custom domain or used combination of application ID and API key?
It looks like there is a problem with RT socket when you initialize SDK with custom domain - I stumbled on it accidentally when tried to figure out why pub/sub do not work for you.

If you initialize your Backendless SDK with this approach:

Backendless.initApp( "custom or generated domain" );

could you please try to initialize it with this approach?

Backendless.initApp( "app ID", "API key" );

Regards, Andriy

Hello @Andriy_Konoz yes I already use

       Backendless.initApp(this,GospayMePublicConfig.BACKENDLESS_APP_ID,GospayMePublicConfig.BACKENDLESS_API_KEY)

@Indevers ,

Are you running your logic on real devices or emulators?
Do you have access to logs from com.backendless.rt package?

They look in next way:

Aug 01, 2024 5:33:52 PM com.backendless.rt.RTClientSocketIO subscribe
INFO: try to subscribe RTSubscription{id='2510eeee-91a0-4b7d-9259-ad71e7a00e53', callback=com.backendless.rt.ConnectListener$1@e6dec2e, subscriptionName=PUB_SUB_CONNECT, options={channel=demo}}
Aug 01, 2024 5:33:52 PM com.backendless.rt.SocketIOConnectionManager get
INFO: Socket not connected. Try to get lock
Aug 01, 2024 5:33:52 PM com.backendless.rt.SocketIOConnectionManager get
INFO: Got lock
Aug 01, 2024 5:33:54 PM com.backendless.rt.SocketIOConnectionManager get
INFO: Looked up for server https://rt-cloud-us.backendless.com:3001/<app ID>
Aug 01, 2024 5:33:54 PM com.backendless.rt.SocketIOConnectionManager get
INFO: try to connect with to host with query: apiKey=<API key>&clientId=<client ID>&binary=true
Aug 01, 2024 5:33:54 PM com.backendless.rt.SocketIOConnectionManager get
INFO: Socket object created
Aug 01, 2024 5:33:56 PM com.backendless.rt.SocketIOConnectionManager$7 call
...

I believe that in your case there is a problem with creation of RT socket but it is suppressed somewhere inside of SDK.

If you can access these logs please check them for presence of errors and if there are errors - please post part of logs with error here. Before posting remove any sensitive data like app API key from logs

Regards, Andriy

@Andriy_Konoz

I only have this

2024-08-01 20:50:49.433 20240-20763 RTClient                package                I  try to subscribe RTSubscription{id='app', callback=com.backendless.rt.ConnectListener$1@4350f59, subscriptionName=PUB_SUB_CONNECT, options={channel=liveChat}}
2024-08-01 20:50:51.680 20240-20802 RTClient                package                I  try to subscribe RTSubscription{id='app', callback=com.backendless.rt.messaging.ChannelImpl$3@3956ec9, subscriptionName=PUB_SUB_MESSAGES, options={channel=liveChat}}

@Indevers ,

Looks like it hangs on that step since other logs are with the same level but they are not present.
I will ask our QA to try to reproduce it.
Which Android version do you use on your devices?
Do you use latest version of Java Client SDK which I have mentioned earlier or you went with Android version of SDK?

Regards, Andriy

@Andriy_Konoz
Sorry I was using the bad version, I had the 5.7.0. So I change to the 7.0.9 et now I have this like error message

try to subscribe RTSubscription{id='9ee327bb-57c0-4e0b-acae-39b090e093da', callback=com.backendless.rt.ConnectListener$1@ba63603, subscriptionName=PUB_SUB_CONNECT, options={channel=liveChat}}
2024-08-02 14:25:51.064 19356-19709 RTLookupService         package                E  Lookup failed BackendlessException{ code: 'Internal client exception', message: 'Hostname api.backendless.com not verified:
                                                                                                        certificate: sha1/xFx7NbVhsj1+2CcE3L7xsnsOtf4=
                                                                                                        DN: CN=*.backendless.com
                                                                                                        subjectAltNames: [*.backendless.com, backendless.com]', detail: 'javax.net.ssl.SSLPeerUnverifiedException: Hostname api.backendless.com not verified:
                                                                                                        certificate: sha1/xFx7NbVhsj1+2CcE3L7xsnsOtf4=
                                                                                                        DN: CN=*.backendless.com
                                                                                                        subjectAltNames: [*.backendless.com, backendless.com]
                                                                                                    	at com.android.okhttp.internal.io.RealConnection.connectTls(RealConnection.java:205)
                                                                                                    	at com.android.okhttp.internal.io.RealConnection.connectSocket(RealConnection.java:153)
                                                                                                    	at com.android.okhttp.internal.io.RealConnection.connect(RealConnection.java:116)
                                                                                                    	at com.android.okhttp.internal.http.StreamAllocation.findConnection(StreamAllocation.java:186)
                                                                                                    	at com.android.okhttp.internal.http.StreamAllocation.findHealthyConnection(StreamAllocation.java:128)
                                                                                                    	at com.android.okhttp.internal.http.StreamAllocation.newStream(StreamAllocation.java:97)
                                                                                                    	at com.android.okhttp.internal.http.HttpEngine.connect(HttpEngine.java:289)
                                                                                                    	at com.android.okhttp.internal.http.HttpEngine.sendRequest(HttpEngine.java:232)
                                                                                                    	at com.android.okhttp.internal.huc.HttpURLConnectionImpl.execute(HttpURLConnectionImpl.java:465)
                                                                                                    	at com.android.okhttp.internal.huc.HttpURLConnectionImpl.connect(HttpURLConnectionImpl.java:131)
                                                                                                    	at com.android.okhttp.internal.huc.HttpURLConnectionImpl.getOutputStream(HttpURLConnectionImpl.java:262)
                                                                                                    	at com.android.okhttp.internal.huc.DelegatingHttpsURLConnection.getOutputStream(DelegatingHttpsURLConnection.java:219)
                                                                                                    	at com.android.okhttp.internal.huc.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:30)
                                                                                                    	at weborb.client.ioEngine.HttpIOEngine.send(HttpIOEngine.java:195)
                                                                                                    	at weborb.client.ioEngine.HttpIOEngine.invoke(HttpIOEngine.java:140)
                                                                                                    	at weborb.client.WeborbClient.invoke(WeborbClient.java:138)
                                                                                                    	at com.backendless.Invoker.invokeSync(Invoker.java:102)
                                                                                                    	at com.backendless.Invoker.invokeSync(Invoker.java:114)
                                                                                                    	at com.backendless.rt.RTLookupService.lookup(RTLookupService.java:27)
                                                                                                    	at com.backendless.rt.SocketIOConnectionManager.get(SocketIOConnectionManager.java:68)
                                                                                                    	at com.backendless.rt.RTClientSocketIO.subscribe(RTClientSocketIO.java:139)
                                                                                                    	at com.backendless.rt.AsynRTClient$1.run(AsynRTClient.java:26)
                                                                                                    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                                                                                                    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:644)
                                                                                                    	at java.lang.Thread.run(Thread.java:1012)
                                                                                                    ', extendedData: '{}', httpStatusCode: '400' }

I used SSL certificate in my app for backendless, but now I delete all and I always the same error meessage. is this what creates the error?

@Indevers ,

Do you call Backendless.setUrl(...) in your code during SDK configuration? If yes, please provide code snipped with that call. I am interested in value which you are passing to the method call.

I also suggest you to check if you made setup properly as it is described here Client-side Setup - Backendless SDK for Android/Java API Documentation

Regards, Andriy