Does the new support for Application events cover sending messages between layers?

Hi
I’m referring to https://youtrack.cuba-platform.com/issue/PL-9731 planned for 6.7 release.

Specifically, I’m asking for the ability to publish an event from a middleware block in a cluster node, and receiving it on a client block running in another process/host, then update some UI controls even when not inside a client request pipeline.

This scenario is what needed to implement “live” refresh of UI on server sent events.

Thx
Paolo

Hello, Paolo!
Unfortunately, you cannot organize interaction between layers using application events - they are only propagated to the block where they were published. Maybe it will be implemented someday.
Best regards,
Daniil.

Thx Daniil, that’s ok, I was hoping there was something more “under the hood”, but the changeset were too small to be possible :wink:

Regarding this functionality, this is something I’d really like to have, because I’m coming from other setups where this was supported pretty well…

Have you considered an underlying technology for exchanging data between blocks/nodes, that is, do you already have some preference in mind (rabbitmq, hazelcast, …)?

To start with, a simple message bus transport could fit well, for example see this (supported by spring boot OOB): https://reflectoring.io/event-messaging-with-spring-boot-and-rabbitmq/

I ask this in the event I find some spare time for experimenting with this feature…

Bye,
Paolo

At the moment, we don’t provide built-in support for messaging between layers. You can use any library/tool with CUBA application, for instance, hazelcast just works. We are planning to introduce something like this using our default communication mechanism - jgroups in the future: https://youtrack.cuba-platform.com/issue/PL-9024

1 Like

Hi Paolo,
I developed a proof of concept that does what I think you want and the source is attached. As this is a prototype, please take the code with a grain of salt. My original idea was to make this a component that could be added to any project and be as you say “Under the hood” but there are some pieces missing from the framework to allow it to be a component … at least back when I did this.
To summarize, this prototype allows you to instrument and entity so that the web tier can be notified about changes to a specific entity or collection of entities. Many of my systems require what I call “reactive screen updates”. i.e. These screens may be collections such as “Dispatch Boards” to say show the status of equipment and deliveries but also may be detail screens such as real time “Product Inventory” where I want to show in real time product inventory and other metrics.
In most cases this means a real time (not polled), loosely coupled (non persistent) event that has a light weight payload. This means I do not require a MQ as all I care about is the event along with some payload that is only relevant at the time it was sent.
It uses Atmosphere to communicate between layers and then VAADIN push to update the browser. Note Atmosphere is also used in VAADIN push and as such there are some versioning issues that need to be considered. It was done some time ago so is based on 6.5 release of the framework.
The solution starts with adding an Atmosphere Servlet to app-core to handle the endpoint http://hostname/changes/*:

<servlet>
    <servlet-name>AtmosphereServlet</servlet-name>
    <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
    <load-on-startup>2</load-on-startup>
    <async-supported>true</async-supported>
</servlet>

<servlet-mapping>
    <servlet-name>AtmosphereServlet</servlet-name>
    <url-pattern>/changes/{entity}</url-pattern>
</servlet-mapping>

I then add a Atmosphere ManagedService as a hub to handle the broadcasting of the changes to subscribers:

@Singleton
@ManagedService(path = "/changes/{entity}")
public class EntityChangedManagedService {
    @Message(encoders = {EntityChangedEncodeDecoder.class}, decoders = {EntityChangedEncodeDecoder.class})
    public final EntityChanged onMessage(final EntityChanged message) throws IOException {
        return message;
    }
}

Subscribers can subscribe to changes in a single table at http://hostname/changes/MyTable or to a specific row at http://hostname/changes/MyTable/MyTableRowId and an EntityChangedService has been created to handle the actual broadcasting which is injected into an EntityListener and instrumented like:

@Component("testmodules_MainEntityListener")
public class MainEntityListener implements AfterDeleteEntityListener<BaseUuidEntity>, AfterInsertEntityListener<BaseUuidEntity>, AfterUpdateEntityListener<BaseUuidEntity> {
    @Inject
    private EntityChangedService changeService;

    @Override
    public void onAfterDelete(BaseUuidEntity entity, Connection connection) {
        changeService.publish(entity.getClass(), entity.getId().toString(), EntityChanged.Types.DELETE);
    }

    @Override
    public void onAfterInsert(BaseUuidEntity entity, Connection connection) {
        changeService.publish(entity.getClass(), entity.getId().toString(), EntityChanged.Types.INSERT);
    }

    @Override
    public void onAfterUpdate(BaseUuidEntity entity, Connection connection) {
        changeService.publish(entity.getClass(), entity.getId().toString(), EntityChanged.Types.UPDATE);
    }
}

The EntityChangedService looks like:

@Service(EntityChangedService.NAME)
public class EntityChangedServiceBean implements EntityChangedService {
    @Override
    public void publish(Class entityType, String id, EntityChanged.Types type) {
        ServletContext servletContext = ServletContextFactory.getDefault().getServletContext();
        BroadcasterFactory broadcasterFactory = (BroadcasterFactory) servletContext.getAttribute(BroadcasterFactory.class.getName());
        String name = entityType.getName().split("\\.");
        String filter = "/changes/" + name;

        Broadcaster changes = broadcasterFactory.lookup(filter);
        if (changes == null) {
            return;
        }

        filter = filter + "/" + id;
        Broadcaster changesForId = broadcasterFactory.lookup(filter);

        EntityChanged change = new EntityChanged();
        change.setId(id);
        change.setName(name);
        change.setType(type);

        changes.broadcast(change);
        if (changesForId != null) {
            changesForId.broadcast(change);
        }
    }
}

On the web side, I have created a EntityChangesWatcher so that you can subscribe to changes to an entity. The screen is instrumented like:

class MainBrowse extends AbstractLookup {
    @Inject
    private GroupTable mainsTable

    @Inject
    private EntityChangedWatcher watcher

    @Override
    void init(Map<String, Object> params) {
        super.init(params)

        watcher.init(this, "Main", { change ->
            Notification.show("Changed: ${change.name} ${change.type} ${change.id}", Notification.Type.TRAY_NOTIFICATION)
            mainsTable.datasource.refresh()
        })
    }
}

In this case, I am monitoring a table called Main and when it changes, I display a notification and refresh the datasource for the table. Note that the watcher handles thread marshaling so that VAADIN push can be used to the browser. If you open the Main browse screen in two browsers, try adding, deleting an changing in one browser and you will see the corresponding changes in the other browser.
The implementation is non polled (Web Socket), minimal payload and quite simple to instrument a screen to monitor for changes to an entity or collection of entities.
Good luck!

The code was not uploaded. Here is a link to a zip file with the Cuba project https://1drv.ms/u/s!ArXzPwEK-0K_i7ZmnNc6gc7jggFeGA