Add `CloseHandler` to support graceful connection termination.

- Extend `test.proto` with `CloseCommand` and `CloseResponse` messages.
- Introduce `CloseHandler` to process "close" actions and respond with connection termination notice.
- Update `DelegateActionManager` to register `CloseHandler`.
- Refactor `IPCManager` to handle "close" response headers and terminate client connections gracefully.
This commit is contained in:
Gustavo Henrique Santos Souza de Miranda 2025-11-22 23:27:04 -03:00
parent 6576b54057
commit 269780d8cf
6 changed files with 70 additions and 11 deletions

View File

@ -2,6 +2,7 @@ package com.mediamanager.service.delegate;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.mediamanager.protocol.TransportProtocol; import com.mediamanager.protocol.TransportProtocol;
import com.mediamanager.service.delegate.handler.CloseHandler;
import com.mediamanager.service.delegate.handler.EchoHandler; import com.mediamanager.service.delegate.handler.EchoHandler;
import com.mediamanager.service.delegate.handler.HeartbeatHandler; import com.mediamanager.service.delegate.handler.HeartbeatHandler;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -25,6 +26,7 @@ public class DelegateActionManager {
private void registerHandlers() { private void registerHandlers() {
handlerRegistry.put("echo",new EchoHandler()); handlerRegistry.put("echo",new EchoHandler());
handlerRegistry.put("heartbeat",new HeartbeatHandler()); handlerRegistry.put("heartbeat",new HeartbeatHandler());
handlerRegistry.put("close", new CloseHandler());
} }
public void start(){ public void start(){

View File

@ -0,0 +1,32 @@
package com.mediamanager.service.delegate.handler;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mediamanager.protocol.TestProtocol.CloseCommand;
import com.mediamanager.protocol.TestProtocol.CloseResponse;
import com.mediamanager.protocol.TransportProtocol;
import com.mediamanager.service.delegate.ActionHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CloseHandler implements ActionHandler {
private static final Logger logger = LogManager.getLogger(CloseHandler.class);
@Override
public TransportProtocol.Response.Builder handle(ByteString requestPayload)
throws InvalidProtocolBufferException {
CloseCommand.parseFrom(requestPayload); // Valida
logger.info("Close command received - connection will close");
CloseResponse response = CloseResponse.newBuilder()
.setMessage("Connection closing. Goodbye!")
.build();
return TransportProtocol.Response.newBuilder()
.setPayload(ByteString.copyFrom(response.toByteArray()))
.setStatusCode(200)
.putHeaders("Connection", "close"); // Marca para fechar
}
}

View File

@ -33,7 +33,7 @@ public class EchoHandler implements ActionHandler {
// 4. Retorna Response // 4. Retorna Response
return TransportProtocol.Response.newBuilder() return TransportProtocol.Response.newBuilder()
.setPayload(responsePayload) .setPayload(responsePayload)
.setStatusCode(200) .setStatusCode(200);
.putHeaders("Content-Type", "application/x-protobuf");
} }
} }

View File

@ -30,7 +30,7 @@ public class HeartbeatHandler implements ActionHandler {
return TransportProtocol.Response.newBuilder() return TransportProtocol.Response.newBuilder()
.setPayload(ByteString.copyFrom(response.toByteArray())) .setPayload(ByteString.copyFrom(response.toByteArray()))
.setStatusCode(200) .setStatusCode(200);
.putHeaders("Content-Type", "application/x-protobuf");
} }
} }

View File

@ -223,9 +223,16 @@ public class IPCManager {
logger.debug("Client {} handler thread started", clientId); logger.debug("Client {} handler thread started", clientId);
try { try {
TransportProtocol.Request request = readRequest(channel); // LOOP: processa múltiplas requests na mesma conexão
while (channel.isOpen()) {
TransportProtocol.Request request = readRequest(channel);
if (request == null) {
// Cliente desconectou gracefully
logger.info("Client {} disconnected (end of stream)", clientId);
break;
}
if (request != null) {
logger.info("Client {} sent request {}", clientId, request.getRequestId()); logger.info("Client {} sent request {}", clientId, request.getRequestId());
// Processa usando o DelegateActionManager // Processa usando o DelegateActionManager
@ -233,17 +240,29 @@ public class IPCManager {
// Envia resposta de volta // Envia resposta de volta
writeResponse(channel, response); writeResponse(channel, response);
logger.info("Client {} response sent", clientId); logger.info("Client {} response sent", clientId);
} else {
logger.warn("Client {} sent null message", clientId); // Verifica se é comando CLOSE
String connectionHeader = response.getHeadersOrDefault("Connection", "");
if ("close".equals(connectionHeader)) {
logger.info("Client {} requested connection close", clientId);
break; // Sai do loop e fecha
}
} }
} catch (IOException e) {
if (channel.isOpen()) {
logger.error("IO error handling client {}", clientId, e);
} else {
logger.debug("Client {} connection closed by peer", clientId);
}
} catch (Exception e) { } catch (Exception e) {
logger.error("Error handling client {}", clientId, e); logger.error("Unexpected error handling client {}", clientId, e);
} finally { } finally {
try { try {
channel.close(); if (channel.isOpen()) {
channel.close();
}
logger.debug("Client {} channel closed", clientId); logger.debug("Client {} channel closed", clientId);
} catch (IOException e) { } catch (IOException e) {
logger.error("Error closing client {} channel", clientId, e); logger.error("Error closing client {} channel", clientId, e);

View File

@ -21,4 +21,10 @@ message HeartbeatCommand {
message HeartbeatResponse { message HeartbeatResponse {
int64 client_timestamp = 1; int64 client_timestamp = 1;
int64 server_timestamp = 2; int64 server_timestamp = 2;
}
message CloseCommand {
// Vazio - apenas sinaliza fechamento
}
message CloseResponse {
string message = 1;
} }