Integrate `DelegateActionManager` for request handling and refactor IPC:
- Replaced `SimpleProtocol` with `TransportProtocol` in `IPCManager`. - Added `DelegateActionManager` to route requests to handlers, starting with `EchoHandler`. - Updated `MediaManagerApplication` to initialize and manage `DelegateActionManager`. - Extended Protocol Buffers schema with `Request` and `Response` definitions. - Enhanced logging for request processing and redesigned response flow.
This commit is contained in:
parent
23b6d54674
commit
a0ad10b1bc
|
|
@ -1,5 +1,6 @@
|
|||
package com.mediamanager;
|
||||
|
||||
import com.mediamanager.service.delegate.DelegateActionManager;
|
||||
import com.mediamanager.service.ipc.IPCManager;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
|
@ -14,6 +15,7 @@ public class MediaManagerApplication {
|
|||
private static final Logger logger = LogManager.getLogger(MediaManagerApplication.class);
|
||||
private static Properties config;
|
||||
private static DatabaseManager databaseManager;
|
||||
private static DelegateActionManager actionManager;
|
||||
private static IPCManager ipcManager;
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
|
@ -24,8 +26,9 @@ public class MediaManagerApplication {
|
|||
loadConfiguration();
|
||||
databaseManager = new DatabaseManager(config);
|
||||
databaseManager.init();
|
||||
|
||||
ipcManager = new IPCManager(config);
|
||||
actionManager = new DelegateActionManager();
|
||||
actionManager.start();
|
||||
ipcManager = new IPCManager(config,actionManager);
|
||||
ipcManager.init();
|
||||
|
||||
// TODO: Start application services
|
||||
|
|
@ -51,6 +54,9 @@ public class MediaManagerApplication {
|
|||
}
|
||||
}
|
||||
|
||||
if (actionManager != null) {
|
||||
actionManager.stop();
|
||||
}
|
||||
|
||||
logger.info("MediaManager Core shutdown successfully");
|
||||
logger.info("Goodbye!");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
package com.mediamanager.service.delegate;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.mediamanager.protocol.TransportProtocol;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ActionHandler {
|
||||
TransportProtocol.Response.Builder handle(ByteString requestPayload) throws InvalidProtocolBufferException;
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
package com.mediamanager.service.delegate;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.mediamanager.protocol.TransportProtocol;
|
||||
import com.mediamanager.service.delegate.handler.EchoHandler;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DelegateActionManager {
|
||||
private static final Logger logger = LogManager.getLogger(DelegateActionManager.class);
|
||||
|
||||
private final Map<String, ActionHandler> handlerRegistry;
|
||||
|
||||
public DelegateActionManager() {
|
||||
|
||||
logger.debug("DelegateActionManager created");
|
||||
this.handlerRegistry = new HashMap<>();
|
||||
registerHandlers();
|
||||
}
|
||||
|
||||
private void registerHandlers() {
|
||||
handlerRegistry.put("echo",new EchoHandler());
|
||||
}
|
||||
|
||||
public void start(){
|
||||
logger.info("DelegateActionManager started");
|
||||
}
|
||||
|
||||
public void stop(){
|
||||
logger.info("DelegateActionManager stopped");
|
||||
}
|
||||
|
||||
public TransportProtocol.Response ProcessedRequest(TransportProtocol.Request request){
|
||||
String requestId = request.getRequestId();
|
||||
logger.info("Processing request: {}", requestId);
|
||||
String action = request.getHeadersMap().getOrDefault("action", "unknown");
|
||||
ActionHandler handler = handlerRegistry.get(action);
|
||||
TransportProtocol.Response.Builder responseBuilder;
|
||||
if (handler == null) {
|
||||
logger.warn("No handler found for action: {}", action);
|
||||
responseBuilder = TransportProtocol.Response.newBuilder()
|
||||
.setStatusCode(404) // 404 Not Found
|
||||
.setPayload(ByteString.copyFromUtf8("Error: Action '" + action + "' not found."));
|
||||
} else{
|
||||
try {
|
||||
logger.debug("Delegating action '{}' to handler...", action);
|
||||
responseBuilder = handler.handle(request.getPayload());
|
||||
}catch (Exception e) {
|
||||
logger.error("Handler for action '{}' threw an exception:", action, e);
|
||||
responseBuilder = TransportProtocol.Response.newBuilder()
|
||||
.setStatusCode(500) // 500 Internal Server Error
|
||||
.setPayload(ByteString.copyFromUtf8("Error: " + e.getMessage()));
|
||||
}
|
||||
}
|
||||
return responseBuilder.setRequestId(requestId).build();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
package com.mediamanager.service.delegate.handler;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.mediamanager.protocol.TransportProtocol;
|
||||
import com.mediamanager.service.delegate.ActionHandler;
|
||||
|
||||
public class EchoHandler implements ActionHandler {
|
||||
@Override
|
||||
public TransportProtocol.Response.Builder handle(ByteString requestPayload) {
|
||||
String payloadText = requestPayload.toStringUtf8();
|
||||
String responseText = "Server received: " + payloadText;
|
||||
|
||||
return TransportProtocol.Response.newBuilder()
|
||||
.setPayload(ByteString.copyFromUtf8(responseText))
|
||||
.setStatusCode(200);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,11 @@
|
|||
package com.mediamanager.service.ipc;
|
||||
|
||||
|
||||
import com.mediamanager.protocol.SimpleProtocol;
|
||||
import com.mediamanager.protocol.TransportProtocol;
|
||||
import com.mediamanager.service.delegate.DelegateActionManager;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.io.File;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.StandardProtocolFamily;
|
||||
import java.net.UnixDomainSocketAddress;
|
||||
|
|
@ -28,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
public class IPCManager {
|
||||
private final Properties configuration;
|
||||
private static final Logger logger = LogManager.getLogger(IPCManager.class);
|
||||
private final DelegateActionManager actionManager;
|
||||
|
||||
private Path socketPath;
|
||||
private UnixDomainSocketAddress socketAddress;
|
||||
private ServerSocketChannel serverChannel;
|
||||
|
|
@ -37,8 +38,9 @@ public class IPCManager {
|
|||
private final AtomicInteger clientIdCounter = new AtomicInteger(0);
|
||||
|
||||
|
||||
public IPCManager(Properties config){
|
||||
public IPCManager(Properties config, DelegateActionManager actionManager){
|
||||
configuration = config;
|
||||
this.actionManager = actionManager;
|
||||
logger.debug("IPCManager created with configuration:");
|
||||
|
||||
|
||||
|
|
@ -221,29 +223,25 @@ public class IPCManager {
|
|||
logger.debug("Client {} handler thread started", clientId);
|
||||
|
||||
try {
|
||||
SimpleProtocol.TextMessage request = readMessage(channel);
|
||||
TransportProtocol.Request request = readRequest(channel);
|
||||
|
||||
if (request != null) {
|
||||
logger.info("Client {} sent: '{}'", clientId, request.getContent());
|
||||
String response = "Server Received: " + request.getContent();
|
||||
logger.info("Client {} sent request {}", clientId, request.getRequestId());
|
||||
|
||||
SimpleProtocol.TextMessage responseMsg = SimpleProtocol.TextMessage.newBuilder()
|
||||
.setContent(response).build();
|
||||
// Processa usando o DelegateActionManager
|
||||
TransportProtocol.Response response = actionManager.ProcessedRequest(request);
|
||||
|
||||
writeMessage(channel,responseMsg);
|
||||
logger.info("Client {} response sent: '{}'", clientId, response);
|
||||
// Envia resposta de volta
|
||||
writeResponse(channel, response);
|
||||
|
||||
}else {
|
||||
logger.info("Client {} response sent", clientId);
|
||||
} else {
|
||||
logger.warn("Client {} sent null message", clientId);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
|
||||
logger.debug("Client {} processing complete", clientId);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Error handling client {}", clientId, e);
|
||||
} finally {
|
||||
// SEMPRE fecha o canal quando terminar
|
||||
// O finally garante que isso acontece mesmo se houver exceção
|
||||
try {
|
||||
channel.close();
|
||||
logger.debug("Client {} channel closed", clientId);
|
||||
|
|
@ -252,8 +250,7 @@ public class IPCManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SimpleProtocol.TextMessage readMessage(SocketChannel channel) throws IOException {
|
||||
private TransportProtocol.Request readRequest(SocketChannel channel) throws IOException {
|
||||
// Primeiro, lê o tamanho da mensagem (4 bytes = int32)
|
||||
java.nio.ByteBuffer sizeBuffer = java.nio.ByteBuffer.allocate(4);
|
||||
int bytesRead = 0;
|
||||
|
|
@ -294,28 +291,25 @@ public class IPCManager {
|
|||
byte[] messageBytes = new byte[messageSize];
|
||||
messageBuffer.get(messageBytes);
|
||||
|
||||
return SimpleProtocol.TextMessage.parseFrom(messageBytes);
|
||||
return TransportProtocol.Request.parseFrom(messageBytes);
|
||||
}
|
||||
|
||||
private void writeMessage(SocketChannel channel, SimpleProtocol.TextMessage message) throws IOException {
|
||||
// Serializa a mensagem
|
||||
byte[] messageBytes = message.toByteArray();
|
||||
private void writeResponse(SocketChannel channel, TransportProtocol.Response response) throws IOException {
|
||||
byte[] messageBytes = response.toByteArray();
|
||||
int messageSize = messageBytes.length;
|
||||
|
||||
logger.debug("Writing message of {} bytes", messageSize);
|
||||
logger.debug("Writing response of {} bytes", messageSize);
|
||||
|
||||
// Cria buffer com tamanho + mensagem
|
||||
java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate(4 + messageSize);
|
||||
buffer.putInt(messageSize); // 4 bytes de tamanho
|
||||
buffer.put(messageBytes); // N bytes da mensagem
|
||||
buffer.putInt(messageSize);
|
||||
buffer.put(messageBytes);
|
||||
buffer.flip();
|
||||
|
||||
// Escreve tudo no canal
|
||||
while (buffer.hasRemaining()) {
|
||||
channel.write(buffer);
|
||||
}
|
||||
|
||||
logger.debug("Message written successfully");
|
||||
logger.debug("Response written successfully");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,19 @@
|
|||
syntax = "proto3";
|
||||
|
||||
option java_package = "com.mediamanager.protocol";
|
||||
option java_outer_classname = "SimpleProtocol";
|
||||
option java_outer_classname = "TransportProtocol";
|
||||
|
||||
package mediamanager;
|
||||
|
||||
message TextMessage {
|
||||
string content = 1; // O texto da mensagem
|
||||
message Request {
|
||||
string request_id = 1;
|
||||
bytes payload = 2;
|
||||
map<string, string> headers = 3;
|
||||
}
|
||||
|
||||
message Response {
|
||||
string request_id = 1;
|
||||
int32 status_code = 2;
|
||||
bytes payload = 3;
|
||||
map<string, string> headers = 4;
|
||||
}
|
||||
Loading…
Reference in New Issue