博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊sentinel的SimpleHttpCommandCenter
阅读量:6256 次
发布时间:2019-06-22

本文共 16395 字,大约阅读时间需要 54 分钟。

本文主要研究一下sentinel的SimpleHttpCommandCenter

SimpleHttpCommandCenter

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/SimpleHttpCommandCenter.java

/*** * The simple command center provides service to exchange information. * * @author youji.zj */public class SimpleHttpCommandCenter implements CommandCenter {    private static final int PORT_UNINITIALIZED = -1;    private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;    private static final int DEFAULT_PORT = 8719;    private static final Map
handlerMap = new HashMap
(); private ExecutorService executor = Executors.newSingleThreadExecutor( new NamedThreadFactory("sentinel-command-center-executor")); private ExecutorService bizExecutor; private ServerSocket socketReference; @Override public void beforeStart() throws Exception { // Register handlers Map
handlers = CommandHandlerProvider.getInstance().namedHandlers(); registerCommands(handlers); } @Override public void start() throws Exception { int nThreads = Runtime.getRuntime().availableProcessors(); this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue
(10), new NamedThreadFactory("sentinel-command-center-service-executor"), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { CommandCenterLog.info("EventTask rejected"); throw new RejectedExecutionException(); } }); Runnable serverInitTask = new Runnable() { int port; { try { port = Integer.parseInt(TransportConfig.getPort()); } catch (Exception e) { port = DEFAULT_PORT; } } @Override public void run() { int repeat = 0; int tmpPort = port; boolean success = false; while (true) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(tmpPort); } catch (IOException ex) { CommandCenterLog.info( String.format("IO error occurs, port: %d, repeat times: %d", tmpPort, repeat), ex); tmpPort = adjustPort(repeat); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e1) { break; } repeat++; } if (serverSocket != null) { CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort()); socketReference = serverSocket; executor.submit(new ServerThread(serverSocket)); success = true; break; } } if (!success) { tmpPort = PORT_UNINITIALIZED; } TransportConfig.setRuntimePort(tmpPort); executor.shutdown(); } /** * Adjust the port to settle down. */ private int adjustPort(int repeat) { int mod = repeat / 5; return port + mod; } }; new Thread(serverInitTask).start(); } @Override public void stop() throws Exception { if (socketReference != null) { try { socketReference.close(); } catch (IOException e) { CommandCenterLog.warn("Error when releasing the server socket", e); } } bizExecutor.shutdownNow(); executor.shutdownNow(); TransportConfig.setRuntimePort(PORT_UNINITIALIZED); handlerMap.clear(); } /** * Get the name set of all registered commands. */ public static Set
getCommands() { return handlerMap.keySet(); } public static CommandHandler getHandler(String commandName) { return handlerMap.get(commandName); } public static void registerCommands(Map
handlerMap) { if (handlerMap != null) { for (Entry
e : handlerMap.entrySet()) { registerCommand(e.getKey(), e.getValue()); } } } public static void registerCommand(String commandName, CommandHandler handler) { if (StringUtil.isEmpty(commandName)) { return; } if (handlerMap.containsKey(commandName)) { CommandCenterLog.info("Register failed (duplicate command): " + commandName); return; } handlerMap.put(commandName, handler); } /** * Avoid server thread hang, 3 seconds timeout by default. */ private void setSocketSoTimeout(Socket socket) throws SocketException { if (socket != null) { socket.setSoTimeout(DEFAULT_SERVER_SO_TIMEOUT); } }}复制代码
  • 这里直接使用的是java的ServerSocket(bio)构建的tcp服务
  • 启动之前先调用CommandHandlerProvider.getInstance().namedHandlers()获取支持的命令及handler,然后调用registerCommands进行注册
  • 之后创建bizExecutor以及异步serverInitTask线程,里头再异步启动ServerThread

ServerThread

class ServerThread extends Thread {        private ServerSocket serverSocket;        ServerThread(ServerSocket s) {            this.serverSocket = s;            setName("sentinel-courier-server-accept-thread");        }        @Override        public void run() {            while (true) {                Socket socket = null;                try {                    socket = this.serverSocket.accept();                    setSocketSoTimeout(socket);                    HttpEventTask eventTask = new HttpEventTask(socket);                    bizExecutor.submit(eventTask);                } catch (Exception e) {                    CommandCenterLog.info("Server error", e);                    if (socket != null) {                        try {                            socket.close();                        } catch (Exception e1) {                            CommandCenterLog.info("Error when closing an opened socket", e1);                        }                    }                    try {                        // In case of infinite log.                        Thread.sleep(10);                    } catch (InterruptedException e1) {                        // Indicates the task should stop.                        break;                    }                }            }        }    }复制代码
  • ServerThread采取的是worker线程池模式,接收一个请求之后,包装为HttpEventTask,然后提交给bizExecutor

HttpEventTask

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/http/HttpEventTask.java

public class HttpEventTask implements Runnable {    private final Socket socket;    private boolean writtenHead = false;    public HttpEventTask(Socket socket) {        this.socket = socket;    }    public void close() throws Exception {        socket.close();    }    @Override    public void run() {        if (socket == null) {            return;        }        BufferedReader in = null;        PrintWriter printWriter = null;        try {            long start = System.currentTimeMillis();            in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));            OutputStream outputStream = socket.getOutputStream();            printWriter = new PrintWriter(                new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));            String line = in.readLine();            CommandCenterLog.info("[CommandCenter] socket income:" + line                + "," + socket.getInetAddress());            CommandRequest request = parseRequest(line);            // Validate the target command.            String commandName = HttpCommandUtils.getTarget(request);            if (StringUtil.isBlank(commandName)) {                badRequest(printWriter, "Invalid command");                return;            }            // Find the matching command handler.            CommandHandler
commandHandler = SimpleHttpCommandCenter.getHandler(commandName); if (commandHandler != null) { CommandResponse
response = commandHandler.handle(request); handleResponse(response, printWriter, outputStream); } else { // No matching command handler. badRequest(printWriter, "Unknown command `" + commandName + '`'); } printWriter.flush(); long cost = System.currentTimeMillis() - start; CommandCenterLog.info("[CommandCenter] deal a socket task:" + line + "," + socket.getInetAddress() + ", time cost=" + cost + " ms"); } catch (Throwable e) { CommandCenterLog.info("CommandCenter error", e); try { if (printWriter != null) { String errorMessage = SERVER_ERROR_MESSAGE; if (!writtenHead) { internalError(printWriter, errorMessage); } else { printWriter.println(errorMessage); } printWriter.flush(); } } catch (Exception e1) { CommandCenterLog.info("CommandCenter close serverSocket failed", e); } } finally { closeResource(in); closeResource(printWriter); closeResource(socket); } } private void closeResource(Closeable closeable) { if (closeable != null) { try { closeable.close(); } catch (Exception e) { CommandCenterLog.info("CommandCenter close resource failed", e); } } } private void handleResponse(CommandResponse response, /*@NonNull*/ final PrintWriter printWriter, /*@NonNull*/ final OutputStream rawOutputStream) throws Exception { if (response.isSuccess()) { if (response.getResult() == null) { writeOkStatusLine(printWriter); return; } // Write 200 OK status line. writeOkStatusLine(printWriter); // Here we directly use `toString` to encode the result to plain text. byte[] buffer = response.getResult().toString().getBytes(SentinelConfig.charset()); rawOutputStream.write(buffer); rawOutputStream.flush(); } else { String msg = SERVER_ERROR_MESSAGE; if (response.getException() != null) { msg = response.getException().getMessage(); } badRequest(printWriter, msg); } } /** * Write `400 Bad Request` HTTP response status line and message body, then flush. */ private void badRequest(/*@NonNull*/ final PrintWriter out, String message) { out.print("HTTP/1.1 400 Bad Request\r\n" + "Connection: close\r\n\r\n"); out.print(message); out.flush(); writtenHead = true; } /** * Write `500 Internal Server Error` HTTP response status line and message body, then flush. */ private void internalError(/*@NonNull*/ final PrintWriter out, String message) { out.print("HTTP/1.1 500 Internal Server Error\r\n" + "Connection: close\r\n\r\n"); out.print(message); out.flush(); writtenHead = true; } /** * Write `200 OK` HTTP response status line and flush. */ private void writeOkStatusLine(/*@NonNull*/ final PrintWriter out) { out.print("HTTP/1.1 200 OK\r\n" + "Connection: close\r\n\r\n"); out.flush(); writtenHead = true; } /** * Parse raw HTTP request line to a {@link CommandRequest}. * * @param line HTTP request line * @return parsed command request */ private CommandRequest parseRequest(String line) { CommandRequest request = new CommandRequest(); if (StringUtil.isBlank(line)) { return request; } int start = line.indexOf('/'); int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?'); int space = line.lastIndexOf(' '); String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length()); request.addMetadata(HttpCommandUtils.REQUEST_TARGET, target); if (ask == -1 || ask == space) { return request; } String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length()); for (String parameter : parameterStr.split("&")) { if (StringUtil.isBlank(parameter)) { continue; } String[] keyValue = parameter.split("="); if (keyValue.length != 2) { continue; } String value = StringUtil.trim(keyValue[1]); try { value = URLDecoder.decode(value, SentinelConfig.charset()); } catch (UnsupportedEncodingException e) { } request.addParam(StringUtil.trim(keyValue[0]), value); } return request; } private static final String SERVER_ERROR_MESSAGE = "Command server error";}复制代码
  • 这里直接readLine,然后解析为CommandRequest,然后调用SimpleHttpCommandCenter.getHandler获取处理程序,执行返回结果
  • 可以看到这里是手工解析http协议

CommandHandlerProvider

sentinel-transport-common-0.1.1-sources.jar!/com/alibaba/csp/sentinel/command/CommandHandlerProvider.java

/** * Provides and filters command handlers registered via SPI. * * @author Eric Zhao */public class CommandHandlerProvider implements Iterable
{ private final ServiceLoader
serviceLoader = ServiceLoader.load(CommandHandler.class); /** * Get all command handlers annotated with {@link CommandMapping} with command name. * * @return list of all named command handlers */ public Map
namedHandlers() { Map
map = new HashMap
(); for (CommandHandler handler : serviceLoader) { String name = parseCommandName(handler); if (!StringUtil.isEmpty(name)) { map.put(name, handler); } } return map; } private String parseCommandName(CommandHandler handler) { CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class); if (commandMapping != null) { return commandMapping.name(); } else { return null; } } @Override public Iterator
iterator() { return serviceLoader.iterator(); } private static final CommandHandlerProvider INSTANCE = new CommandHandlerProvider(); public static CommandHandlerProvider getInstance() { return INSTANCE; }}复制代码
  • 这个类采用单例模式,以及SPI机制,来动态加载CommandHandler的实现,然后namedHandlers方法提供一个命令名及handler的map

com.alibaba.csp.sentinel.command.CommandHandler

sentinel-transport-common-0.1.1.jar!/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler

com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandlercom.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandlercom.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandlercom.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandlercom.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandlercom.alibaba.csp.sentinel.command.handler.SendMetricCommandHandlercom.alibaba.csp.sentinel.command.handler.VersionCommandHandler复制代码
  • 这里在META-INF的services目录,使用SPI机制,在com.alibaba.csp.sentinel.command.CommandHandler文件下记录了各种实现

小结

  • sentinel的transport部分,有一个common基础包,然后就是simple-http以及netty的实现。
  • simple-http采用的是bio外加工作线程池模式,来一个请求,往线程池丢一个HttpEventTask。
  • HttpEventTask解析请求的命令然后获取相应的handler执行返回
  • commandHandler采用的是SPI的模式进行动态加载

doc

转载地址:http://ivxsa.baihongyu.com/

你可能感兴趣的文章
c#public、private、protected、internal、protected internal
查看>>
hdoj-5099-Comparison of Android versions
查看>>
小波变换简单介绍(2)
查看>>
Dubbo -- 系统学习 笔记 -- 示例 -- 线程模型
查看>>
Dubbo -- 系统学习 笔记 -- 示例 -- 多注册中心
查看>>
使用C++实现学生管理系统
查看>>
BroadcastReceiver类
查看>>
大杂烩 -- 查找单向链表倒数第m个元素
查看>>
SQL笔记 --- 数据库设计步骤(转)
查看>>
cocos2d-iphone 动作
查看>>
[Preact] Integrate react-router with Preact
查看>>
函数指针的说明
查看>>
Python操作redis学习系列之(集合)set,redis set详解 (六)
查看>>
Linux时间子系统之二:表示时间的单位和结构
查看>>
leetcode 221: Maximal Square
查看>>
Oracle更新时间字段
查看>>
Android 四大组件学习之ContentProvider二
查看>>
使用jcaptcha插件生成验证码
查看>>
centos6.5 (linux) 禁用模块 IPV6模块的方法
查看>>
用webpack2.0构建vue2.0超详细精简版
查看>>