From a0ad10b1bce99ae9168a942a9f31a50caa47905c Mon Sep 17 00:00:00 2001 From: Gustavo Henrique Santos Souza de Miranda Date: Mon, 17 Nov 2025 07:21:52 -0300 Subject: [PATCH] Integrate `DelegateActionManager` for request handling and refactor IPC: - Replaced `SimpleProtocol` with `TransportProtocol` in `IPCManager`. - Added `DelegateActionManager` to route requests to handlers, starting with `EchoHandler`. - Updated `MediaManagerApplication` to initialize and manage `DelegateActionManager`. - Extended Protocol Buffers schema with `Request` and `Response` definitions. - Enhanced logging for request processing and redesigned response flow. --- .../mediamanager/MediaManagerApplication.java | 10 +++- .../service/delegate/ActionHandler.java | 11 ++++ .../delegate/DelegateActionManager.java | 60 +++++++++++++++++++ .../service/delegate/handler/EchoHandler.java | 17 ++++++ .../mediamanager/service/ipc/IPCManager.java | 54 ++++++++--------- src/main/proto/messages.proto | 16 ++++- 6 files changed, 133 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/mediamanager/service/delegate/ActionHandler.java create mode 100644 src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java create mode 100644 src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java diff --git a/src/main/java/com/mediamanager/MediaManagerApplication.java b/src/main/java/com/mediamanager/MediaManagerApplication.java index d483da3..bc04be1 100644 --- a/src/main/java/com/mediamanager/MediaManagerApplication.java +++ b/src/main/java/com/mediamanager/MediaManagerApplication.java @@ -1,5 +1,6 @@ package com.mediamanager; +import com.mediamanager.service.delegate.DelegateActionManager; import com.mediamanager.service.ipc.IPCManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,6 +15,7 @@ public class MediaManagerApplication { private static final Logger logger = LogManager.getLogger(MediaManagerApplication.class); private static Properties config; private static DatabaseManager databaseManager; + private static DelegateActionManager actionManager; private static IPCManager ipcManager; public static void main(String[] args) { @@ -24,8 +26,9 @@ public class MediaManagerApplication { loadConfiguration(); databaseManager = new DatabaseManager(config); databaseManager.init(); - - ipcManager = new IPCManager(config); + actionManager = new DelegateActionManager(); + actionManager.start(); + ipcManager = new IPCManager(config,actionManager); ipcManager.init(); // TODO: Start application services @@ -51,6 +54,9 @@ public class MediaManagerApplication { } } + if (actionManager != null) { + actionManager.stop(); + } logger.info("MediaManager Core shutdown successfully"); logger.info("Goodbye!"); diff --git a/src/main/java/com/mediamanager/service/delegate/ActionHandler.java b/src/main/java/com/mediamanager/service/delegate/ActionHandler.java new file mode 100644 index 0000000..27228fc --- /dev/null +++ b/src/main/java/com/mediamanager/service/delegate/ActionHandler.java @@ -0,0 +1,11 @@ +package com.mediamanager.service.delegate; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.mediamanager.protocol.TransportProtocol; + +@FunctionalInterface +public interface ActionHandler { + TransportProtocol.Response.Builder handle(ByteString requestPayload) throws InvalidProtocolBufferException; +} + diff --git a/src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java b/src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java new file mode 100644 index 0000000..6293947 --- /dev/null +++ b/src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java @@ -0,0 +1,60 @@ +package com.mediamanager.service.delegate; + +import com.google.protobuf.ByteString; +import com.mediamanager.protocol.TransportProtocol; +import com.mediamanager.service.delegate.handler.EchoHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +public class DelegateActionManager { + private static final Logger logger = LogManager.getLogger(DelegateActionManager.class); + + private final Map handlerRegistry; + + public DelegateActionManager() { + + logger.debug("DelegateActionManager created"); + this.handlerRegistry = new HashMap<>(); + registerHandlers(); + } + + private void registerHandlers() { + handlerRegistry.put("echo",new EchoHandler()); + } + + public void start(){ + logger.info("DelegateActionManager started"); + } + + public void stop(){ + logger.info("DelegateActionManager stopped"); + } + + public TransportProtocol.Response ProcessedRequest(TransportProtocol.Request request){ + String requestId = request.getRequestId(); + logger.info("Processing request: {}", requestId); + String action = request.getHeadersMap().getOrDefault("action", "unknown"); + ActionHandler handler = handlerRegistry.get(action); + TransportProtocol.Response.Builder responseBuilder; + if (handler == null) { + logger.warn("No handler found for action: {}", action); + responseBuilder = TransportProtocol.Response.newBuilder() + .setStatusCode(404) // 404 Not Found + .setPayload(ByteString.copyFromUtf8("Error: Action '" + action + "' not found.")); + } else{ + try { + logger.debug("Delegating action '{}' to handler...", action); + responseBuilder = handler.handle(request.getPayload()); + }catch (Exception e) { + logger.error("Handler for action '{}' threw an exception:", action, e); + responseBuilder = TransportProtocol.Response.newBuilder() + .setStatusCode(500) // 500 Internal Server Error + .setPayload(ByteString.copyFromUtf8("Error: " + e.getMessage())); + } + } + return responseBuilder.setRequestId(requestId).build(); + } +} diff --git a/src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java b/src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java new file mode 100644 index 0000000..23463e6 --- /dev/null +++ b/src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java @@ -0,0 +1,17 @@ +package com.mediamanager.service.delegate.handler; + +import com.google.protobuf.ByteString; +import com.mediamanager.protocol.TransportProtocol; +import com.mediamanager.service.delegate.ActionHandler; + +public class EchoHandler implements ActionHandler { + @Override + public TransportProtocol.Response.Builder handle(ByteString requestPayload) { + String payloadText = requestPayload.toStringUtf8(); + String responseText = "Server received: " + payloadText; + + return TransportProtocol.Response.newBuilder() + .setPayload(ByteString.copyFromUtf8(responseText)) + .setStatusCode(200); + } +} diff --git a/src/main/java/com/mediamanager/service/ipc/IPCManager.java b/src/main/java/com/mediamanager/service/ipc/IPCManager.java index 50d2e50..17ff8b1 100644 --- a/src/main/java/com/mediamanager/service/ipc/IPCManager.java +++ b/src/main/java/com/mediamanager/service/ipc/IPCManager.java @@ -1,12 +1,11 @@ package com.mediamanager.service.ipc; - -import com.mediamanager.protocol.SimpleProtocol; +import com.mediamanager.protocol.TransportProtocol; +import com.mediamanager.service.delegate.DelegateActionManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.nio.ByteBuffer; -import java.io.File; + import java.io.IOException; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; @@ -28,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class IPCManager { private final Properties configuration; private static final Logger logger = LogManager.getLogger(IPCManager.class); + private final DelegateActionManager actionManager; + private Path socketPath; private UnixDomainSocketAddress socketAddress; private ServerSocketChannel serverChannel; @@ -37,8 +38,9 @@ public class IPCManager { private final AtomicInteger clientIdCounter = new AtomicInteger(0); - public IPCManager(Properties config){ + public IPCManager(Properties config, DelegateActionManager actionManager){ configuration = config; + this.actionManager = actionManager; logger.debug("IPCManager created with configuration:"); @@ -221,29 +223,25 @@ public class IPCManager { logger.debug("Client {} handler thread started", clientId); try { - SimpleProtocol.TextMessage request = readMessage(channel); + TransportProtocol.Request request = readRequest(channel); + if (request != null) { - logger.info("Client {} sent: '{}'", clientId, request.getContent()); - String response = "Server Received: " + request.getContent(); + logger.info("Client {} sent request {}", clientId, request.getRequestId()); - SimpleProtocol.TextMessage responseMsg = SimpleProtocol.TextMessage.newBuilder() - .setContent(response).build(); + // Processa usando o DelegateActionManager + TransportProtocol.Response response = actionManager.ProcessedRequest(request); - writeMessage(channel,responseMsg); - logger.info("Client {} response sent: '{}'", clientId, response); + // Envia resposta de volta + writeResponse(channel, response); - }else { + logger.info("Client {} response sent", clientId); + } else { logger.warn("Client {} sent null message", clientId); } - Thread.sleep(100); - - logger.debug("Client {} processing complete", clientId); } catch (Exception e) { logger.error("Error handling client {}", clientId, e); } finally { - // SEMPRE fecha o canal quando terminar - // O finally garante que isso acontece mesmo se houver exceção try { channel.close(); logger.debug("Client {} channel closed", clientId); @@ -252,8 +250,7 @@ public class IPCManager { } } } - - private SimpleProtocol.TextMessage readMessage(SocketChannel channel) throws IOException { + private TransportProtocol.Request readRequest(SocketChannel channel) throws IOException { // Primeiro, lê o tamanho da mensagem (4 bytes = int32) java.nio.ByteBuffer sizeBuffer = java.nio.ByteBuffer.allocate(4); int bytesRead = 0; @@ -294,28 +291,25 @@ public class IPCManager { byte[] messageBytes = new byte[messageSize]; messageBuffer.get(messageBytes); - return SimpleProtocol.TextMessage.parseFrom(messageBytes); + return TransportProtocol.Request.parseFrom(messageBytes); } - private void writeMessage(SocketChannel channel, SimpleProtocol.TextMessage message) throws IOException { - // Serializa a mensagem - byte[] messageBytes = message.toByteArray(); + private void writeResponse(SocketChannel channel, TransportProtocol.Response response) throws IOException { + byte[] messageBytes = response.toByteArray(); int messageSize = messageBytes.length; - logger.debug("Writing message of {} bytes", messageSize); + logger.debug("Writing response of {} bytes", messageSize); - // Cria buffer com tamanho + mensagem java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate(4 + messageSize); - buffer.putInt(messageSize); // 4 bytes de tamanho - buffer.put(messageBytes); // N bytes da mensagem + buffer.putInt(messageSize); + buffer.put(messageBytes); buffer.flip(); - // Escreve tudo no canal while (buffer.hasRemaining()) { channel.write(buffer); } - logger.debug("Message written successfully"); + logger.debug("Response written successfully"); } } } diff --git a/src/main/proto/messages.proto b/src/main/proto/messages.proto index 23b6925..106e59e 100644 --- a/src/main/proto/messages.proto +++ b/src/main/proto/messages.proto @@ -1,9 +1,19 @@ syntax = "proto3"; option java_package = "com.mediamanager.protocol"; -option java_outer_classname = "SimpleProtocol"; +option java_outer_classname = "TransportProtocol"; + package mediamanager; -message TextMessage { - string content = 1; // O texto da mensagem +message Request { + string request_id = 1; + bytes payload = 2; + map headers = 3; +} + +message Response { + string request_id = 1; + int32 status_code = 2; + bytes payload = 3; + map headers = 4; } \ No newline at end of file