diff --git a/pom.xml b/pom.xml index 3ea8e09..b0ed9b5 100644 --- a/pom.xml +++ b/pom.xml @@ -78,9 +78,22 @@ ${junit.version} test + + com.google.protobuf + protobuf-java + 4.32.0 + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + org.apache.maven.plugins @@ -91,12 +104,54 @@ 17 + org.apache.maven.plugins maven-surefire-plugin 3.2.5 + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + /usr/bin/protoc + ${project.basedir}/src/main/proto + ${project.build.directory}/generated-sources/protobuf/java + false + + + + + compile + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.5.0 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/protobuf/java + + + + + + org.apache.maven.plugins diff --git a/src/main/java/com/mediamanager/MediaManagerApplication.java b/src/main/java/com/mediamanager/MediaManagerApplication.java index d483da3..bc04be1 100644 --- a/src/main/java/com/mediamanager/MediaManagerApplication.java +++ b/src/main/java/com/mediamanager/MediaManagerApplication.java @@ -1,5 +1,6 @@ package com.mediamanager; +import com.mediamanager.service.delegate.DelegateActionManager; import com.mediamanager.service.ipc.IPCManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,6 +15,7 @@ public class MediaManagerApplication { private static final Logger logger = LogManager.getLogger(MediaManagerApplication.class); private static Properties config; private static DatabaseManager databaseManager; + private static DelegateActionManager actionManager; private static IPCManager ipcManager; public static void main(String[] args) { @@ -24,8 +26,9 @@ public class MediaManagerApplication { loadConfiguration(); databaseManager = new DatabaseManager(config); databaseManager.init(); - - ipcManager = new IPCManager(config); + actionManager = new DelegateActionManager(); + actionManager.start(); + ipcManager = new IPCManager(config,actionManager); ipcManager.init(); // TODO: Start application services @@ -51,6 +54,9 @@ public class MediaManagerApplication { } } + if (actionManager != null) { + actionManager.stop(); + } logger.info("MediaManager Core shutdown successfully"); logger.info("Goodbye!"); diff --git a/src/main/java/com/mediamanager/service/delegate/ActionHandler.java b/src/main/java/com/mediamanager/service/delegate/ActionHandler.java new file mode 100644 index 0000000..27228fc --- /dev/null +++ b/src/main/java/com/mediamanager/service/delegate/ActionHandler.java @@ -0,0 +1,11 @@ +package com.mediamanager.service.delegate; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.mediamanager.protocol.TransportProtocol; + +@FunctionalInterface +public interface ActionHandler { + TransportProtocol.Response.Builder handle(ByteString requestPayload) throws InvalidProtocolBufferException; +} + diff --git a/src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java b/src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java new file mode 100644 index 0000000..6293947 --- /dev/null +++ b/src/main/java/com/mediamanager/service/delegate/DelegateActionManager.java @@ -0,0 +1,60 @@ +package com.mediamanager.service.delegate; + +import com.google.protobuf.ByteString; +import com.mediamanager.protocol.TransportProtocol; +import com.mediamanager.service.delegate.handler.EchoHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +public class DelegateActionManager { + private static final Logger logger = LogManager.getLogger(DelegateActionManager.class); + + private final Map handlerRegistry; + + public DelegateActionManager() { + + logger.debug("DelegateActionManager created"); + this.handlerRegistry = new HashMap<>(); + registerHandlers(); + } + + private void registerHandlers() { + handlerRegistry.put("echo",new EchoHandler()); + } + + public void start(){ + logger.info("DelegateActionManager started"); + } + + public void stop(){ + logger.info("DelegateActionManager stopped"); + } + + public TransportProtocol.Response ProcessedRequest(TransportProtocol.Request request){ + String requestId = request.getRequestId(); + logger.info("Processing request: {}", requestId); + String action = request.getHeadersMap().getOrDefault("action", "unknown"); + ActionHandler handler = handlerRegistry.get(action); + TransportProtocol.Response.Builder responseBuilder; + if (handler == null) { + logger.warn("No handler found for action: {}", action); + responseBuilder = TransportProtocol.Response.newBuilder() + .setStatusCode(404) // 404 Not Found + .setPayload(ByteString.copyFromUtf8("Error: Action '" + action + "' not found.")); + } else{ + try { + logger.debug("Delegating action '{}' to handler...", action); + responseBuilder = handler.handle(request.getPayload()); + }catch (Exception e) { + logger.error("Handler for action '{}' threw an exception:", action, e); + responseBuilder = TransportProtocol.Response.newBuilder() + .setStatusCode(500) // 500 Internal Server Error + .setPayload(ByteString.copyFromUtf8("Error: " + e.getMessage())); + } + } + return responseBuilder.setRequestId(requestId).build(); + } +} diff --git a/src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java b/src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java new file mode 100644 index 0000000..23463e6 --- /dev/null +++ b/src/main/java/com/mediamanager/service/delegate/handler/EchoHandler.java @@ -0,0 +1,17 @@ +package com.mediamanager.service.delegate.handler; + +import com.google.protobuf.ByteString; +import com.mediamanager.protocol.TransportProtocol; +import com.mediamanager.service.delegate.ActionHandler; + +public class EchoHandler implements ActionHandler { + @Override + public TransportProtocol.Response.Builder handle(ByteString requestPayload) { + String payloadText = requestPayload.toStringUtf8(); + String responseText = "Server received: " + payloadText; + + return TransportProtocol.Response.newBuilder() + .setPayload(ByteString.copyFromUtf8(responseText)) + .setStatusCode(200); + } +} diff --git a/src/main/java/com/mediamanager/service/ipc/IPCManager.java b/src/main/java/com/mediamanager/service/ipc/IPCManager.java index 19537c5..17ff8b1 100644 --- a/src/main/java/com/mediamanager/service/ipc/IPCManager.java +++ b/src/main/java/com/mediamanager/service/ipc/IPCManager.java @@ -1,10 +1,11 @@ package com.mediamanager.service.ipc; - +import com.mediamanager.protocol.TransportProtocol; +import com.mediamanager.service.delegate.DelegateActionManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.File; + import java.io.IOException; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; @@ -26,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class IPCManager { private final Properties configuration; private static final Logger logger = LogManager.getLogger(IPCManager.class); + private final DelegateActionManager actionManager; + private Path socketPath; private UnixDomainSocketAddress socketAddress; private ServerSocketChannel serverChannel; @@ -35,8 +38,9 @@ public class IPCManager { private final AtomicInteger clientIdCounter = new AtomicInteger(0); - public IPCManager(Properties config){ + public IPCManager(Properties config, DelegateActionManager actionManager){ configuration = config; + this.actionManager = actionManager; logger.debug("IPCManager created with configuration:"); @@ -219,24 +223,25 @@ public class IPCManager { logger.debug("Client {} handler thread started", clientId); try { - // TODO: No próximo passo, vamos implementar: - // 1. Ler mensagens JSON do SocketChannel - // 2. Parsear o JSON para objetos Java - // 3. Processar a requisição - // 4. Criar uma resposta JSON - // 5. Escrever a resposta de volta no SocketChannel + TransportProtocol.Request request = readRequest(channel); - // Por enquanto, apenas mantém a conexão aberta brevemente - // para testar que o sistema de aceitação e threads está funcionando - Thread.sleep(100); + if (request != null) { + logger.info("Client {} sent request {}", clientId, request.getRequestId()); - logger.debug("Client {} processing complete", clientId); + // Processa usando o DelegateActionManager + TransportProtocol.Response response = actionManager.ProcessedRequest(request); + + // Envia resposta de volta + writeResponse(channel, response); + + logger.info("Client {} response sent", clientId); + } else { + logger.warn("Client {} sent null message", clientId); + } } catch (Exception e) { logger.error("Error handling client {}", clientId, e); } finally { - // SEMPRE fecha o canal quando terminar - // O finally garante que isso acontece mesmo se houver exceção try { channel.close(); logger.debug("Client {} channel closed", clientId); @@ -245,6 +250,67 @@ public class IPCManager { } } } + private TransportProtocol.Request readRequest(SocketChannel channel) throws IOException { + // Primeiro, lê o tamanho da mensagem (4 bytes = int32) + java.nio.ByteBuffer sizeBuffer = java.nio.ByteBuffer.allocate(4); + int bytesRead = 0; + + while (bytesRead < 4) { + int read = channel.read(sizeBuffer); + if (read == -1) { + logger.debug("Client disconnected before sending size"); + return null; + } + bytesRead += read; + } + + sizeBuffer.flip(); + int messageSize = sizeBuffer.getInt(); + logger.debug("Expecting message of {} bytes", messageSize); + + // Validação básica de segurança + if (messageSize <= 0 || messageSize > 1024 * 1024) { // Max 1MB + throw new IOException("Invalid message size: " + messageSize); + } + + // Agora lê a mensagem completa + java.nio.ByteBuffer messageBuffer = java.nio.ByteBuffer.allocate(messageSize); + bytesRead = 0; + + while (bytesRead < messageSize) { + int read = channel.read(messageBuffer); + if (read == -1) { + throw new IOException("Client disconnected while reading message"); + } + bytesRead += read; + } + + messageBuffer.flip(); + + // Deserializa o Protocol Buffers + byte[] messageBytes = new byte[messageSize]; + messageBuffer.get(messageBytes); + + return TransportProtocol.Request.parseFrom(messageBytes); + } + + private void writeResponse(SocketChannel channel, TransportProtocol.Response response) throws IOException { + byte[] messageBytes = response.toByteArray(); + int messageSize = messageBytes.length; + + logger.debug("Writing response of {} bytes", messageSize); + + java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate(4 + messageSize); + buffer.putInt(messageSize); + buffer.put(messageBytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + + logger.debug("Response written successfully"); + } } } diff --git a/src/main/proto/messages.proto b/src/main/proto/messages.proto new file mode 100644 index 0000000..106e59e --- /dev/null +++ b/src/main/proto/messages.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option java_package = "com.mediamanager.protocol"; +option java_outer_classname = "TransportProtocol"; + +package mediamanager; + +message Request { + string request_id = 1; + bytes payload = 2; + map headers = 3; +} + +message Response { + string request_id = 1; + int32 status_code = 2; + bytes payload = 3; + map headers = 4; +} \ No newline at end of file