이전 글에서 gRPC를 이용하여 Unary RPC를 구현했던 것에 이어서, 이번에는 Server Streaming, Client Streaming 및 Bidirectional Streaming RPC를 Java로 구현해보도록 하겠습니다.
Streaming 패턴 구현에 앞서 gRPC에서 제공하는 클라이언트 스텁 3가지에 대해서 먼저 살펴보겠습니다.
gRPC 클라이언트 스텁: BlockingStub, AsyncStub, FutureStub
gRPC의 클라이언트 스텁은 다음과 같이 3가지의 객체로 제공됩니다. 사용하는 스텁에 따라 클라이언트의 수신 처리 방식이 달라집니다.
- BlockingStub: 동기적으로 통신하는 방법으로, 서버로부터 응답이 올 때까지 대기합니다. 이 스텁은 Unary RPC와 Server Streaming RPC에서만 사용할 수 있습니다.
- AsyncStub (Stub): 비동기적으로 통신하는 방법으로, 서버로부터 오는 응답을 StreamObserver 객체가 대신 받아서 처리합니다. 이 스텁은 모든 방식의 RPC에서 사용할 수 있습니다.
- FutureStub: 비동기적으로 통신하는 방법으로, 서버로부터의 응답 도달에 상관 없이 일단 ListenableFuture로 래핑된 객체를 반환합니다. 서버로부터 오는 응답이 오면 ListenableFuture 객체를 통해 전달받은 메시지를 언래핑할 수 있습니다. 이 스텁은 Unary RPC에서만 사용할 수 있습니다. (ListenableFuture와 Future 인터페이스에 대해서는 Guava 라이브러리를 참고하시기 바랍니다.)
즉, 정리하면 gRPC는 4가지의 통신 방식을 지원하며, 클라이언트의 메시지 수신 처리 방식에 따라 총 7가지의 구현 방법이 존재한다고 할 수 있습니다.
이전 글에서 구현한 Unary RPC는 BlockingStub을 사용했지만, AsyncStub이나 FutureStub을 이용하여 수신을 처리하도록 구성할 수도 있습니다. 여기서는 Server Streaming RPC를 BlockingStub으로 수신하고, Client Streaming과 Bidirectional Streaming RPC는 AsyncStub으로 수신하도록 클라이언트를 구현합니다.
세 가지의 Streaming RPC 구현하기 (Java)
우선 UserService.proto를 다음과 같이 수정합니다. 기존의 Unary RPC 메서드 아래에 각각 Client Streaming, Server Streaming, Bidirectional Streaming을 담당하는 3개의 메서드를 추가로 정의합니다. 서비스 메서드의 파라미터 혹은 반환 메시지에 stream을 추가하면 연속적으로 메시지를 전달받거나, 연속적으로 메시지를 전송할 수 있습니다. 또한, UserIdx 메서드의 idx 필드를 repeated로 변경합니다.
- [Client Streaming RPC] setUsers: 클라이언트가 등록할 사용자 정보가 포함된 메시지(User)를 연속적으로 전달하면, 서버는 등록이 처리된 사용자 식별번호들이 포함된 메시지(UserIdx)를 한 번에 반환합니다.
- [Server Streaming RPC] getUsers: 클라이언트가 조회할 사용자 식별번호들이 포함된 메시지(UserIdx)를 한 번에 전달하면, 서버는 해당 사용자 정보가 포함된 메시지(User)를 연속적으로 반환합니다.
- [Bidirectional Streaming RPC] getUsersRealtime: 클라이언트가 조회할 사용자 식별번호들이 포함된 메시지(UserIdx)를 연속적으로 전달하면, 서버는 해당 사용자 정보가 포함된 메시지(User)를 연속적으로 반환합니다.
// UserService.proto syntax = "proto3"; package grpc; option java_multiple_files = true; option java_package = "com.lattechiffon.grpc"; option java_outer_classname = "UserServiceOuterClass"; service UserService { rpc setUser(User) returns (UserIdx); rpc getUser(UserIdx) returns (User); rpc setUsers(stream User) returns (UserIdx); // Client Streaming RPC rpc getUsers(UserIdx) returns (stream User); // Server Streaming RPC rpc getUsersRealtime(stream UserIdx) returns(stream User); // Bidirectional Streaming RPC } message User { int64 idx = 1; string username = 2; string email = 3; repeated string roles = 4; } message UserIdx { repeated int64 idx = 1; }
Protocol Buffer 코드의 작성이 완료되면, Gradle 빌드 시스템을 이용하는 경우에는 ‘GenerateProto’를 수행하도록 합니다. Protocol Buffer Compiler를 이용한다면 Java 언어로 컴파일하여 소스코드를 생성합니다.
스텁 코드들의 생성이 완료되면, 이제 UserServiceImpl.java를 수정할 순서입니다. 이 클래스는 서버에서 각 메서드마다 수행해야 할 비즈니스 로직을 가지고 있습니다. 다음과 같이 새로 추가한 3개의 메서드에 대해 구현을 추가합니다.
// UserServiceImpl.java package com.lattechiffon.grpc; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.stub.StreamObserver; import java.util.HashMap; import java.util.Map; public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase { private final Map<Long, User> userMap = new HashMap<>(); private long idxCounter = 1; @Override public void setUser(User request, StreamObserver<UserIdx> responseObserver) { // Unary RPC 생략 } @Override public void getUser(UserIdx request, StreamObserver<User> responseObserver) { // Unary RPC 생략 } @Override public StreamObserver<User> setUsers(final StreamObserver<UserIdx> responseObserver) { return new StreamObserver<User>() { final UserIdx.Builder responseBuilder = UserIdx.newBuilder(); @Override public void onNext(User user) { user = user.toBuilder().setIdx(idxCounter++).build(); userMap.put(user.getIdx(), user); responseBuilder.addIdx(user.getIdx()); } @Override public void onError(Throwable t) { responseObserver.onError(t); } @Override public void onCompleted() { responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } }; } @Override public void getUsers(UserIdx request, StreamObserver<User> responseObserver) { for (long idx : request.getIdxList()) { if (userMap.containsKey(idx)) { responseObserver.onNext(userMap.get(idx)); } else { responseObserver.onError(new StatusException(Status.NOT_FOUND)); } } responseObserver.onCompleted(); } @Override public StreamObserver<UserIdx> getUsersRealtime(StreamObserver<User> responseObserver) { return new StreamObserver<UserIdx>() { @Override public void onNext(UserIdx userIdx) { for (long idx : userIdx.getIdxList()) { if (userMap.containsKey(idx)) { responseObserver.onNext(userMap.get(idx)); } else { responseObserver.onError(new StatusException(Status.NOT_FOUND)); } } } @Override public void onError(Throwable t) { responseObserver.onError(t); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; } }
마지막으로 GrpcApplication.java의 메인 함수에 다음과 같이 클라이언트 요청을 구현합니다. Client Streaming RPC와 Bidirectional RPC는 Unary RPC, Server Streaming RPC에 비해 구현이 복잡합니다. 이는 AsyncStub을 이용한 메시지 수신 처리에 기인하며, 비동기 처리를 위해 서버로부터 응답을 받을 StreamObserver 객체를 생성하여 클라이언트 스텁 메서드의 파라미터로 전달합니다.
// GrpcApplication.java (src/main/java/com.lattechiffon.com/) package com.lattechiffon.grpc; import io.grpc.*; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class GrpcApplication { public static void main(String[] args) { // Initialize gRPC Server ... // Initialize gRPC Client ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); UserServiceGrpc.UserServiceBlockingStub stub = UserServiceGrpc.newBlockingStub(channel); UserServiceGrpc.UserServiceStub asyncStub = UserServiceGrpc.newStub(channel); UserServiceGrpc.UserServiceFutureStub futureStub = UserServiceGrpc.newFutureStub(channel); // Client: Unary RPC, 3 times System.out.println("(1) Unary RPC"); UserIdx setUserResult = stub.setUser(User.newBuilder().setUsername("GO YONGGUK") .setEmail("lattechiffon@gmail.com").addRoles("USER").addRoles("ADMIN").build()); System.out.println("Client: " + setUserResult.getIdx(0)); setUserResult = stub.setUser(User.newBuilder().setUsername("KIM MINSU") .setEmail("minsu@test.com").addRoles("USER").build()); System.out.println("Client: " + setUserResult.getIdx(0)); User getUserResult = stub.getUser(setUserResult); System.out.println(getUserResult.toString()); // Client: Client-side Streaming RPC System.out.println("(2) Client-side Streaming RPC"); final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver<UserIdx> responseObserver = new StreamObserver<UserIdx>() { @Override public void onNext(UserIdx userIdx) { for (long idx : userIdx.getIdxList()) { System.out.println("Client: " + idx); } } @Override public void onError(Throwable t) { finishLatch.countDown(); } @Override public void onCompleted() { finishLatch.countDown(); } }; StreamObserver<User> requestObserver = asyncStub.setUsers(responseObserver); try { for (int i = 0; i < 5; i++) { requestObserver.onNext(User.newBuilder().setUsername("NEW USER - " + i) .setEmail("test@test.com").addRoles("USER").build()); Thread.sleep(500); } } catch (StatusRuntimeException|InterruptedException ignored) { } requestObserver.onCompleted(); try { finishLatch.await(1, TimeUnit.MINUTES); } catch (InterruptedException ignored) { } // Client: Server-side Streaming RPC System.out.println("(3) Server-side Streaming RPC"); try { Iterator<User> getUsersResult = stub.getUsers(UserIdx.newBuilder().addIdx(1).addIdx(2).build()); while (getUsersResult.hasNext()) { System.out.println(getUsersResult.next().toString()); } } catch (StatusRuntimeException ignored) { } // Client: Bidirectional Streaming RPC System.out.println("(4) Bidirectional Streaming RPC"); final CountDownLatch finishLatch2 = new CountDownLatch(1); StreamObserver<User> responseObserver2 = new StreamObserver<User>() { @Override public void onNext(User user) { System.out.println(user.toString()); } @Override public void onError(Throwable t) { finishLatch2.countDown(); } @Override public void onCompleted() { finishLatch2.countDown(); } }; StreamObserver<UserIdx> requestObserver2 = asyncStub.getUsersRealtime(responseObserver2); try { for (int i = 1; i <= 5; i++) { requestObserver2.onNext(UserIdx.newBuilder().addIdx(i).build()); Thread.sleep(1000); } requestObserver2.onNext(UserIdx.newBuilder().addIdx(6).addIdx(7).build()); } catch (StatusRuntimeException|InterruptedException ignored) { } requestObserver2.onCompleted(); try { finishLatch2.await(1, TimeUnit.MINUTES); } catch (InterruptedException ignored) { } // Release channel.shutdown(); Runtime.getRuntime().exit(0); } }
이제 Gradle로 프로젝트 전체를 빌드하고 실행하면 정상적으로 gRPC가 동작하여 메시지가 출력되는 것을 확인할 수 있습니다.
이 예제의 전체 소스코드는 아래의 GitHub 저장소에서 확인할 수 있습니다.
- GitHub Repository: https://github.com/lattechiffon/grpc/tree/2bc8fd69a6fa0da0d55579c4f24b33ec811d880b
- (참고) UserService.proto의 UserIdx 메시지를 수정했기 때문에 Unary RPC의 일부 코드를 수정해야 합니다. 글에서는 언급하지 않았으나 확인하려면 위의 GitHub 저장소를 참고하면 됩니다.
- (참고) JDK 9부터는 JAVA EE 모듈이 제거되었기 때문에 gRPC 컴파일 코드를 포함한 프로젝트 전체를 빌드하기 위해서는 다음과 같이 javax.annotation 의존성을 추가해야 합니다.
// build.gradle dependencies { ... implementation 'javax.annotation:javax.annotation-api:1.3.2' ... }
답글 남기기