As many know, gRPC allows services (both traditional call/response [aka unary] and streaming services) to be defined by a 'proto3' file. For example, here is a simple service with four methods, one unary (check) and 3 streaming (server streaming, client streaming, and bi-directional streaming)
syntax = "proto3"; package grpc.health.v1; option java_multiple_files = true; option java_outer_classname = "HealthProto"; option java_package = "io.grpc.health.v1.rx3"; message HealthCheckRequest { string message = 1; } message HealthCheckResponse { enum ServingStatus { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; SERVICE_UNKNOWN = 3; // Used only by the Watch method. } ServingStatus status = 1; } service HealthCheck { // Unary method rpc Check(HealthCheckRequest) returns (HealthCheckResponse); // Server streaming method rpc WatchServer(HealthCheckRequest) returns (stream HealthCheckResponse); // Client streaming method rpc WatchClient(stream HealthCheckRequest) returns (HealthCheckResponse); // bidi streaming method rpc WatchBidi(stream HealthCheckRequest) returns (stream HealthCheckResponse); }
package io.grpc.health.v1.rx3; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.core.Flowable; @javax.annotation.Generated( value = "by grpc-osgi-generator (REACTIVEX) - A protoc plugin for ECF's grpc remote services distribution provider at https://github.com/ECF/grpc-RemoteServiceSProvider ", comments = "Source: health.proto. ") public interface HealthCheckService { /** * <pre> * Unary method * </pre> */ default Single<io.grpc.health.v1.rx3.HealthCheckResponse> check(Single<io.grpc.health.v1.rx3.HealthCheckRequest> requests) { return null; } /** * <pre> * Server streaming method * </pre> */ default Flowable<io.grpc.health.v1.rx3.HealthCheckResponse> watchServer(Single<io.grpc.health.v1.rx3.HealthCheckRequest> requests) { return null; } /** * <pre> * Client streaming method * </pre> */ default Single<io.grpc.health.v1.rx3.HealthCheckResponse> watchClient(Flowable<io.grpc.health.v1.rx3.HealthCheckRequest> requests) { return null;
}
/** * <pre> * bidi streaming method * </pre> */ default Flowable<io.grpc.health.v1.rx3.HealthCheckResponse> watchBidi(Flowable<io.grpc.health.v1.rx3.HealthCheckRequest> requests) { return null; } }
Note that it uses the two ReactiveX 3 classes: io.reactivex.rxjava3.core.Single, and io.reactivex.rxjava3.core.Flowable. These two classes provide api for event-driven/reactive sending and receiving of unary (Single) and streaming (Flowable) arguments and return values.
The ReactiveX API...particularly Flowable...makes it very easy to implement both consumers and implementers of the streaming API, while maintaining ordered delivery and non-blocking communication.
For example, this is a simple implementation of the HealthCheckService. Note how the Single and flowable methods are able to express the implementation logic through methods such as Flowable.map.
Here is a simple implementation of a consumer of the HealthCheckService.
The use of the ReactiveX API simplifies both the implementation and the consumer use of both unary and streaming services. As an added bonus: the reactive-grpc library used in the ECF Distribution provider provides *flow-control* using backpressure.
In next article I'll describe how OSGi Remote Services can be easily used to export, publish, discover, and import remote services with full support for service versioning, security, and dynamics. I'll also describe one can use tools like maven or bndtools+eclipse to generate source code (as above) from a proto3 file and easily run a generated service as an OSGi Remote Service.