序
本文主要研究一下sentinel的SimpleHttpCommandCenter
SimpleHttpCommandCenter
sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/SimpleHttpCommandCenter.java
/*** * The simple command center provides service to exchange information. * * @author youji.zj */public class SimpleHttpCommandCenter implements CommandCenter { private static final int PORT_UNINITIALIZED = -1; private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000; private static final int DEFAULT_PORT = 8719; private static final MaphandlerMap = new HashMap (); private ExecutorService executor = Executors.newSingleThreadExecutor( new NamedThreadFactory("sentinel-command-center-executor")); private ExecutorService bizExecutor; private ServerSocket socketReference; @Override public void beforeStart() throws Exception { // Register handlers Map handlers = CommandHandlerProvider.getInstance().namedHandlers(); registerCommands(handlers); } @Override public void start() throws Exception { int nThreads = Runtime.getRuntime().availableProcessors(); this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue (10), new NamedThreadFactory("sentinel-command-center-service-executor"), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { CommandCenterLog.info("EventTask rejected"); throw new RejectedExecutionException(); } }); Runnable serverInitTask = new Runnable() { int port; { try { port = Integer.parseInt(TransportConfig.getPort()); } catch (Exception e) { port = DEFAULT_PORT; } } @Override public void run() { int repeat = 0; int tmpPort = port; boolean success = false; while (true) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(tmpPort); } catch (IOException ex) { CommandCenterLog.info( String.format("IO error occurs, port: %d, repeat times: %d", tmpPort, repeat), ex); tmpPort = adjustPort(repeat); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e1) { break; } repeat++; } if (serverSocket != null) { CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort()); socketReference = serverSocket; executor.submit(new ServerThread(serverSocket)); success = true; break; } } if (!success) { tmpPort = PORT_UNINITIALIZED; } TransportConfig.setRuntimePort(tmpPort); executor.shutdown(); } /** * Adjust the port to settle down. */ private int adjustPort(int repeat) { int mod = repeat / 5; return port + mod; } }; new Thread(serverInitTask).start(); } @Override public void stop() throws Exception { if (socketReference != null) { try { socketReference.close(); } catch (IOException e) { CommandCenterLog.warn("Error when releasing the server socket", e); } } bizExecutor.shutdownNow(); executor.shutdownNow(); TransportConfig.setRuntimePort(PORT_UNINITIALIZED); handlerMap.clear(); } /** * Get the name set of all registered commands. */ public static Set getCommands() { return handlerMap.keySet(); } public static CommandHandler getHandler(String commandName) { return handlerMap.get(commandName); } public static void registerCommands(Map handlerMap) { if (handlerMap != null) { for (Entry e : handlerMap.entrySet()) { registerCommand(e.getKey(), e.getValue()); } } } public static void registerCommand(String commandName, CommandHandler handler) { if (StringUtil.isEmpty(commandName)) { return; } if (handlerMap.containsKey(commandName)) { CommandCenterLog.info("Register failed (duplicate command): " + commandName); return; } handlerMap.put(commandName, handler); } /** * Avoid server thread hang, 3 seconds timeout by default. */ private void setSocketSoTimeout(Socket socket) throws SocketException { if (socket != null) { socket.setSoTimeout(DEFAULT_SERVER_SO_TIMEOUT); } }}复制代码
- 这里直接使用的是java的ServerSocket(
bio
)构建的tcp服务 - 启动之前先调用CommandHandlerProvider.getInstance().namedHandlers()获取支持的命令及handler,然后调用registerCommands进行注册
- 之后创建bizExecutor以及异步serverInitTask线程,里头再异步启动ServerThread
ServerThread
class ServerThread extends Thread { private ServerSocket serverSocket; ServerThread(ServerSocket s) { this.serverSocket = s; setName("sentinel-courier-server-accept-thread"); } @Override public void run() { while (true) { Socket socket = null; try { socket = this.serverSocket.accept(); setSocketSoTimeout(socket); HttpEventTask eventTask = new HttpEventTask(socket); bizExecutor.submit(eventTask); } catch (Exception e) { CommandCenterLog.info("Server error", e); if (socket != null) { try { socket.close(); } catch (Exception e1) { CommandCenterLog.info("Error when closing an opened socket", e1); } } try { // In case of infinite log. Thread.sleep(10); } catch (InterruptedException e1) { // Indicates the task should stop. break; } } } } }复制代码
- ServerThread采取的是worker线程池模式,接收一个请求之后,包装为HttpEventTask,然后提交给bizExecutor
HttpEventTask
sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/http/HttpEventTask.java
public class HttpEventTask implements Runnable { private final Socket socket; private boolean writtenHead = false; public HttpEventTask(Socket socket) { this.socket = socket; } public void close() throws Exception { socket.close(); } @Override public void run() { if (socket == null) { return; } BufferedReader in = null; PrintWriter printWriter = null; try { long start = System.currentTimeMillis(); in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset())); OutputStream outputStream = socket.getOutputStream(); printWriter = new PrintWriter( new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset()))); String line = in.readLine(); CommandCenterLog.info("[CommandCenter] socket income:" + line + "," + socket.getInetAddress()); CommandRequest request = parseRequest(line); // Validate the target command. String commandName = HttpCommandUtils.getTarget(request); if (StringUtil.isBlank(commandName)) { badRequest(printWriter, "Invalid command"); return; } // Find the matching command handler. CommandHandler commandHandler = SimpleHttpCommandCenter.getHandler(commandName); if (commandHandler != null) { CommandResponse response = commandHandler.handle(request); handleResponse(response, printWriter, outputStream); } else { // No matching command handler. badRequest(printWriter, "Unknown command `" + commandName + '`'); } printWriter.flush(); long cost = System.currentTimeMillis() - start; CommandCenterLog.info("[CommandCenter] deal a socket task:" + line + "," + socket.getInetAddress() + ", time cost=" + cost + " ms"); } catch (Throwable e) { CommandCenterLog.info("CommandCenter error", e); try { if (printWriter != null) { String errorMessage = SERVER_ERROR_MESSAGE; if (!writtenHead) { internalError(printWriter, errorMessage); } else { printWriter.println(errorMessage); } printWriter.flush(); } } catch (Exception e1) { CommandCenterLog.info("CommandCenter close serverSocket failed", e); } } finally { closeResource(in); closeResource(printWriter); closeResource(socket); } } private void closeResource(Closeable closeable) { if (closeable != null) { try { closeable.close(); } catch (Exception e) { CommandCenterLog.info("CommandCenter close resource failed", e); } } } private void handleResponse(CommandResponse response, /*@NonNull*/ final PrintWriter printWriter, /*@NonNull*/ final OutputStream rawOutputStream) throws Exception { if (response.isSuccess()) { if (response.getResult() == null) { writeOkStatusLine(printWriter); return; } // Write 200 OK status line. writeOkStatusLine(printWriter); // Here we directly use `toString` to encode the result to plain text. byte[] buffer = response.getResult().toString().getBytes(SentinelConfig.charset()); rawOutputStream.write(buffer); rawOutputStream.flush(); } else { String msg = SERVER_ERROR_MESSAGE; if (response.getException() != null) { msg = response.getException().getMessage(); } badRequest(printWriter, msg); } } /** * Write `400 Bad Request` HTTP response status line and message body, then flush. */ private void badRequest(/*@NonNull*/ final PrintWriter out, String message) { out.print("HTTP/1.1 400 Bad Request\r\n" + "Connection: close\r\n\r\n"); out.print(message); out.flush(); writtenHead = true; } /** * Write `500 Internal Server Error` HTTP response status line and message body, then flush. */ private void internalError(/*@NonNull*/ final PrintWriter out, String message) { out.print("HTTP/1.1 500 Internal Server Error\r\n" + "Connection: close\r\n\r\n"); out.print(message); out.flush(); writtenHead = true; } /** * Write `200 OK` HTTP response status line and flush. */ private void writeOkStatusLine(/*@NonNull*/ final PrintWriter out) { out.print("HTTP/1.1 200 OK\r\n" + "Connection: close\r\n\r\n"); out.flush(); writtenHead = true; } /** * Parse raw HTTP request line to a {@link CommandRequest}. * * @param line HTTP request line * @return parsed command request */ private CommandRequest parseRequest(String line) { CommandRequest request = new CommandRequest(); if (StringUtil.isBlank(line)) { return request; } int start = line.indexOf('/'); int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?'); int space = line.lastIndexOf(' '); String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length()); request.addMetadata(HttpCommandUtils.REQUEST_TARGET, target); if (ask == -1 || ask == space) { return request; } String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length()); for (String parameter : parameterStr.split("&")) { if (StringUtil.isBlank(parameter)) { continue; } String[] keyValue = parameter.split("="); if (keyValue.length != 2) { continue; } String value = StringUtil.trim(keyValue[1]); try { value = URLDecoder.decode(value, SentinelConfig.charset()); } catch (UnsupportedEncodingException e) { } request.addParam(StringUtil.trim(keyValue[0]), value); } return request; } private static final String SERVER_ERROR_MESSAGE = "Command server error";}复制代码
- 这里直接readLine,然后解析为CommandRequest,然后调用SimpleHttpCommandCenter.getHandler获取处理程序,执行返回结果
- 可以看到这里是手工解析http协议
CommandHandlerProvider
sentinel-transport-common-0.1.1-sources.jar!/com/alibaba/csp/sentinel/command/CommandHandlerProvider.java
/** * Provides and filters command handlers registered via SPI. * * @author Eric Zhao */public class CommandHandlerProvider implements Iterable{ private final ServiceLoader serviceLoader = ServiceLoader.load(CommandHandler.class); /** * Get all command handlers annotated with {@link CommandMapping} with command name. * * @return list of all named command handlers */ public Map namedHandlers() { Map map = new HashMap (); for (CommandHandler handler : serviceLoader) { String name = parseCommandName(handler); if (!StringUtil.isEmpty(name)) { map.put(name, handler); } } return map; } private String parseCommandName(CommandHandler handler) { CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class); if (commandMapping != null) { return commandMapping.name(); } else { return null; } } @Override public Iterator iterator() { return serviceLoader.iterator(); } private static final CommandHandlerProvider INSTANCE = new CommandHandlerProvider(); public static CommandHandlerProvider getInstance() { return INSTANCE; }}复制代码
- 这个类采用单例模式,以及SPI机制,来动态加载CommandHandler的实现,然后namedHandlers方法提供一个命令名及handler的map
com.alibaba.csp.sentinel.command.CommandHandler
sentinel-transport-common-0.1.1.jar!/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandlercom.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandlercom.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandlercom.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandlercom.alibaba.csp.sentinel.command.handler.SendMetricCommandHandlercom.alibaba.csp.sentinel.command.handler.VersionCommandHandler复制代码
- 这里在META-INF的services目录,使用SPI机制,在com.alibaba.csp.sentinel.command.CommandHandler文件下记录了各种实现
小结
- sentinel的transport部分,有一个common基础包,然后就是simple-http以及netty的实现。
- simple-http采用的是bio外加工作线程池模式,来一个请求,往线程池丢一个HttpEventTask。
- HttpEventTask解析请求的命令然后获取相应的handler执行返回
- commandHandler采用的是SPI的模式进行动态加载