Tuesday, October 19, 2021

OSGi Services with gRPC - Let's be reactive

ECF has just introduced an upgrade to the grpc distribution provider.   Previously, this distribution provider used ReaxtiveX java version 2 only.  With this release, ReactiveX java version 3 is also supported.

As many know, gRPC allows services (both traditional call/response [aka unary] and streaming services) to be defined by a 'proto3' file.  For example, here is a simple service with four methods, one unary (check) and 3 streaming (server streaming, client streaming, and bi-directional streaming)
syntax = "proto3";

package grpc.health.v1;

option java_multiple_files = true;
option java_outer_classname = "HealthProto";
option java_package = "io.grpc.health.v1.rx3";

message HealthCheckRequest {
  string message = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;  // Used only by the Watch method.
  }
  ServingStatus status = 1;
}

service HealthCheck {
  // Unary method
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
  // Server streaming method
  rpc WatchServer(HealthCheckRequest) returns (stream HealthCheckResponse);
  // Client streaming method
  rpc WatchClient(stream HealthCheckRequest) returns (HealthCheckResponse);
  // bidi streaming method
  rpc WatchBidi(stream HealthCheckRequest) returns (stream HealthCheckResponse);
}
The gRPC project provides a plugin so that when protoc is run, java code (or other language code) is generated that can then be used on the server and/or clients.

With some additional plugins, the classes generated by protoc can use the ReactiveX API for generating code.   So, for example, here is the java code generated by running protoc, grpc, reactive-grpc, and the osgi-generator plugins on the above HealthCheck service definition.  

Note in particular the HealthCheckService interface generated by the osgi-generator protoc plugin:
package io.grpc.health.v1.rx3;

import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Flowable;

@javax.annotation.Generated(
value = "by grpc-osgi-generator (REACTIVEX) - A protoc plugin for ECF's grpc remote services distribution provider at https://github.com/ECF/grpc-RemoteServiceSProvider ",
comments = "Source: health.proto.  ")
public interface HealthCheckService {
    /**
     * <pre>
     *  Unary method
     * </pre>
     */
    default Single<io.grpc.health.v1.rx3.HealthCheckResponse> check(Single<io.grpc.health.v1.rx3.HealthCheckRequest> requests)  {
        return null;
    }
    /**
     * <pre>
     *  Server streaming method
     * </pre>
     */
    default Flowable<io.grpc.health.v1.rx3.HealthCheckResponse> watchServer(Single<io.grpc.health.v1.rx3.HealthCheckRequest> requests)  {
        return null;
    }
    /**
     * <pre>
     *  Client streaming method
     * </pre>
     */
    default Single<io.grpc.health.v1.rx3.HealthCheckResponse> watchClient(Flowable<io.grpc.health.v1.rx3.HealthCheckRequest> requests)  {
        return null;
    }
    /**
     * <pre>
     *  bidi streaming method
     * </pre>
     */
    default Flowable<io.grpc.health.v1.rx3.HealthCheckResponse> watchBidi(Flowable<io.grpc.health.v1.rx3.HealthCheckRequest> requests)  {
        return null;
    }
}

Note that it uses the two ReactiveX 3 classes: io.reactivex.rxjava3.core.Single, and io.reactivex.rxjava3.core.Flowable. These two classes provide api for event-driven/reactive sending and receiving of unary (Single) and streaming (Flowable) arguments and return values.

The ReactiveX API...particularly Flowable...makes it very easy to implement both consumers and implementers of the streaming API, while maintaining ordered delivery and non-blocking communication.

For example, this is a simple implementation of the HealthCheckService. Note how the Single and flowable methods are able to express the implementation logic through methods such as Flowable.map.
Here is a simple implementation of a consumer of the HealthCheckService.

The use of the ReactiveX API simplifies both the implementation and the consumer use of both unary and streaming services. As an added bonus: the reactive-grpc library used in the ECF Distribution provider provides *flow-control* using backpressure.

In next article I'll describe how OSGi Remote Services can be easily used to export, publish, discover, and import remote services with full support for service versioning, security, and dynamics. I'll also describe one can use tools like maven or bndtools+eclipse to generate source code (as above) from a proto3 file and easily run a generated service as an OSGi Remote Service.

No comments: