Netty集成HTTP的GET和POST通讯

netty,集成,http,get,post,通讯 · 浏览次数 : 11

小编点评

This code provides methods for extracting and generating JSON and form data from and for HTTP requests. **Extracting JSON data:** * `getPostRequestParams` parses the query string and returns a JSON object. **Generating JSON data:** * `generatePostRequestParams` creates a `FormRequestDecoder` instance and extracts data from the request body. * `generateFormRequestParams` parses the request URI and returns a map of string to string values. **Extracting form data:** * `getFormRequestParams` uses an `HttpPostRequestDecoder` to parse the request body and returns a map of string to string values. **Additional Notes:** * The code uses the `org.apache.commons.codec.binary.Base64` class to encode and decode base64-encoded data. * The `org.apache.commons.io.StringWriter` and `org.apache.commons.io.StringBuilder` classes are used for generating JSON and form data in a compact and efficient format. * The code assumes that the request method is `POST` and extracts form data from the request body. * The `CONTENT_TYPE` header is used to determine the content type of the request body. * The code uses the `query string decoder` and `post request decoder` classes to handle query string and form data, respectively.

正文

核心就是ChannelInitializer的实现使用http 消息解码器

package com.coremain.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 * @description: Netty服务端回调处理  HTTP
 * @author: GuoTong
 * @createTime: 2023-05-16 22:05
 * @since JDK 1.8 OR 11
 **/
public class NettyServerHTTPHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) {
        socketChannel.pipeline()
                // http 消息解码器
                .addLast(new HttpServerCodec())
                // http 消息合并(2048 为消息最大长度)
                .addLast(new HttpObjectAggregator(2048))
                // 自定义消息处理业务类
                .addLast(new HttpMessageHandler());
        // 此处还可以进行添加其他处理(例如ssl)等
    }
}

核心2就是ChannelInboundHandlerAdapter的实现

package com.coremain.handler;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.coremain.netty.init.ServiceInit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.MemoryAttribute;
import io.netty.util.CharsetUtil;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @description: HTTP消息处理器: 通道入站处理适配器。
 * 整个的IO处理操作环节包括:
 * 从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端,
 * 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到 ChannelInboundHandler入站处理器。
 * 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器 到Netty的内部(如通道)。
 * 按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器 的工作;
 * 后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的 工作
 * @author: GuoTong
 * @createTime: 2023-05-16 22:06
 * @since JDK 1.8 OR 11
 **/
@SuppressWarnings("all")
public class HttpMessageHandler extends ChannelInboundHandlerAdapter {

    private static final String CONTENT_TYPE = "Content-Type";
    private static final String CONTENT_LENGTH = "Content-Length";
    private static final String CONNECTION = "Connection";
    private static final String KEEP_ALIVE = "keep-alive";

    private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded";
    private static final String JSON_CONTENT_TYPE = "application/json";
    private static final String MULTIPART_CONTENT_TYPE = "multipart/form-data";
    private static final String TEXT_CONTENT_TYPE = "text/xml";

    /**
     * Description: 当通道注册完成后,Netty会调用fireChannelRegistered, 触发通道注册事件。
     * 通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到
     *
     * @param ctx
     * @author: GuoTong
     * @date: 2023-05-17 19:53:18
     * @return:void
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

        super.channelRegistered(ctx);
    }

    /**
     * Description: 通道未注册
     *
     * @param ctx
     * @author: GuoTong
     * @date: 2023-05-17 19:55:48
     * @return:void
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    /**
     * Description: 当通道激活完成后,Netty会调用fireChannelActive, 触发通道激活事件。通道会启动该入站操作的流水线处理,
     * 在通道注册过的入站处理器Handler的channelActive方法,会被调用到。
     *
     * @param ctx
     * @author: GuoTong
     * @date: 2023-05-17 19:56:37
     * @return:void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /**
     * Description: 当连接被断开或者不可用,Netty会调用fireChannelInactive, 触发连接不可用事 件。通道会启动对应的流水线处理,
     * 在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。
     *
     * @param ctx
     * @author: GuoTong
     * @date: 2023-05-17 19:57:06
     * @return:void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        super.channelInactive(ctx);
    }

    /**
     * Description: 当通道缓冲区读完,Netty会调用fireChannelReadComplete, 触发通道读完事 件。通道会启动该入站操作的流水线处理
     * ,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。
     *
     * @param ctx
     * @author: GuoTong
     * @date: 2023-05-17 19:57:31
     * @return:void
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        ctx.flush();
    }

    /**
     * Description: 当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,
     * 在通道注册过的处理器Handler的 exceptionCaught方法,会被调用到。
     * 注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到了该方法。
     *
     * @param ctx
     * @param cause
     * @author: GuoTong
     * @date: 2023-05-17 19:58:31
     * @return:void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        cause.printStackTrace();
        ctx.close();
    }

    /**
     * Description: 当通道缓冲区可读,Netty会调用fireChannelRead, 触发通道可读事件。通道会启动该入站操作的流水线处理,
     * 在通道注册过的入站处理器Handler的channelRead方法,会被调用到。
     *
     * @param ctx
     * @param msg
     * @author: GuoTong
     * @date: 2023-05-17 19:58:09
     * @return:void
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 仅处理http请求
        if (msg instanceof FullHttpRequest) {
            //客户端的请求对象
            FullHttpRequest req = (FullHttpRequest) msg;
            //新建一个返回消息的Json对象
            String responseJson = "";
            // 获取请求方式
            HttpMethod method = req.method();
            // 判断请求方式是GET还是POST
            boolean isPost = method.equals(HttpMethod.POST);
            //把客户端的请求数据格式化为Json对象
            JSONObject requestJson = null;
            try {
                if (isPost) {
                    requestJson = getPostRequestParams(req);
                } else {
                    requestJson = getUrlRequestParams(req);
                }
            } catch (Exception e) {
                ResponseJson(ctx, req, "参数解析失败:" + e.getMessage());
                return;
            }
            // 此处不进行请求方式区分,分局请求url 进行区分(url格式:/类标识/方法标识)
            // 维护两个map, a: Map<类标识, Class(类对应的class)> b: Map<方法名, Class(方法参数对应的class)>
            //获取客户端的URL
            String uri = req.getUri();
            String[] uris;
            if (uri.contains("?")) {
                uris = uri.substring(1, uri.indexOf("?")).split("/");
            } else {
                uris = uri.substring(1).split("/");
            }
            if (uris.length == 3) {
                try {
                    Object result = dealService(requestJson, uris, isPost);
                    if (result instanceof String) {
                        responseJson = (String) result;
                    } else {
                        responseJson = JSON.toJSONString(result);
                    }
                } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                    ResponseJson(ctx, req, "业务调用异常" + e.getMessage());
                } catch (Exception e) {
                    ResponseJson(ctx, req, "业务调用异常" + e.getMessage());
                }
            } else {
                ResponseJson(ctx, req, "访问路径不合法");
            }
            //向客户端发送结果
            ResponseJson(ctx, req, responseJson);
        }
    }

    /**
     * 处理真正业务
     *
     * @param requestJson 请求参数
     * @param uris        ([项目标识,类标识,方法标识])
     * @return
     */
    private Object dealService(JSONObject requestJson, String[] uris, boolean isPost) throws Exception {
        // 获取项目名
        String project_identity = uris[0];
        // 获取类标识
        String class_identity = uris[1];
        // 获取方法标识
        String method_identity = uris[2];
        // 获取类class 以及 方法参数class
        Map<Class, Map<String, Class>> classClassMap = ServiceInit.CLASS_MAPPING.get(class_identity).get(method_identity);
        List<Map.Entry<Class, Map<String, Class>>> collect = classClassMap.entrySet().stream().collect(Collectors.toList());
        Map.Entry<Class, Map<String, Class>> classEntry = collect.get(0);
        // 业务类class
        Class serviceClass = classEntry.getKey();
        // 参数class
        Map<String, Class> paramClasses = classEntry.getValue();
        // 封装参数的值
        Object[] params = new Object[paramClasses.size()];
        // 反射
        Class[] classes = new Class[params.length];
        if (isPost && paramClasses.size() == 1) {
            Class c = paramClasses.entrySet().iterator().next().getValue();
            params[0] = JSON.parseObject(requestJson.toJSONString(), c);
            classes[0] = c;
        } else {
            Object[] webSendParams = requestJson.values().toArray();
            if (paramClasses.size() != webSendParams.length) {
                throw new Exception("参数不匹配");
            }
            // 获取参数类型,解析拼装参数的值
            int i = 0;
            for (Map.Entry<String, Class> entry : paramClasses.entrySet()) {
                String key = entry.getKey();
                Class pClass = entry.getValue();
                // 以此拼接传入参数
                Object value = webSendParams[i];
                // 判断参数类型,强转
                if (pClass.equals(String.class)) {
                    params[i] = (String) value;
                } else {
                    // 默认不是String就是用Integer接收
                    params[i] = Integer.parseInt(value.toString());
                }
                classes[i] = pClass;
                i++;
            }
        }
        // 业务类对象
        Object service = serviceClass.getDeclaredConstructor().newInstance();
        // 根据方法标识反射调用该方法
        Method method = serviceClass.getDeclaredMethod(method_identity, classes);
        // 这样设置完之后,在外部也是可以访问private的。
        method.setAccessible(true);
        Object result = method.invoke(service, params);
        // 返回结果
        return result;
    }

    /**
     * 响应HTTP的请求
     *
     * @param ctx
     * @param req
     * @param jsonStr
     */
    private void ResponseJson(ChannelHandlerContext ctx, FullHttpRequest req, String jsonStr) {
        byte[] jsonByteByte = (jsonStr + "\r\n").getBytes();
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(jsonByteByte));
        String contentType = req.headers().get(CONTENT_TYPE);
        switch (contentType) {
            case FORM_CONTENT_TYPE:
                // 最常见的 POST 提交数据的方式了。浏览器的原生 表单
                response.headers().set(CONTENT_TYPE, FORM_CONTENT_TYPE);
                break;
            case JSON_CONTENT_TYPE:
                // 告诉服务端消息主体是序列化后的 JSON 字符串
                response.headers().set(CONTENT_TYPE, JSON_CONTENT_TYPE);
                break;
            case MULTIPART_CONTENT_TYPE:
                // 常见的 POST 数据提交的方式。我们使用表单上传文件时,必须让 表单的 enctype 等于 multipart/form-data。
                response.headers().set(CONTENT_TYPE, MULTIPART_CONTENT_TYPE);
                break;
            case TEXT_CONTENT_TYPE:
                // 它是一种使用 HTTP 作为传输协议,XML 作为编码方式的远程调用规范
                response.headers().set(CONTENT_TYPE, TEXT_CONTENT_TYPE);
                break;
            default:
                response.headers().set(CONTENT_TYPE, "text/json");
        }
        response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
        response.headers().set(CONNECTION, KEEP_ALIVE);
        ctx.write(response);
        ctx.flush();
        ctx.close();
    }


    /**
     * 获取post请求的内容
     *
     * @param request
     * @return
     */
    private JSONObject getPostRequestParams(FullHttpRequest request) {
        // 获取请求的 Content-Type 类型
        String contentType = request.headers().get(CONTENT_TYPE);
        // 获取报文信息
        ByteBuf jsonBuf = request.content();
        String jsonStr = jsonBuf.toString(CharsetUtil.UTF_8);
        JSONObject json = JSONObject.of();
        switch (contentType) {
            case FORM_CONTENT_TYPE:
                // 最常见的 POST 提交数据的方式了。浏览器的原生 表单
                String[] keyvalues = jsonStr.split("&");
                for (int i = 0; i < keyvalues.length; i++) {
                    // 放入键值对
                    json.put(keyvalues[i], keyvalues[i + 1]);
                    // 指针前进一格
                    i++;
                }
                break;
            case JSON_CONTENT_TYPE:
                // 告诉服务端消息主体是序列化后的 JSON 字符串
                json = JSON.parseObject(jsonStr);
                break;
            case MULTIPART_CONTENT_TYPE:
                // 常见的 POST 数据提交的方式。我们使用表单上传文件时,必须让 表单的 enctype 等于 multipart/form-data。
                Map<String, String> form = getFormRequestParams(request);
                json = (JSONObject) JSON.toJSON(form);
                break;
            case TEXT_CONTENT_TYPE:
                // 它是一种使用 HTTP 作为传输协议,XML 作为编码方式的远程调用规范
                break;
        }
        return json;
    }

    /**
     * 获取url参数
     *
     * @param request
     * @return
     */
    private JSONObject getUrlRequestParams(FullHttpRequest request) {
        QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
        Map<String, List<String>> parameters = decoder.parameters();
        JSONObject jsonObject = new JSONObject();
        Iterator<Map.Entry<String, List<String>>> iterator = parameters.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<String>> next = iterator.next();
            List<String> value = next.getValue();
            jsonObject.put(next.getKey(), value.size() > 1 ? value : next.getValue().get(0));
        }
        return jsonObject;
    }


    /**
     * 获取表单参数
     *
     * @param request
     * @return
     */
    private Map<String, String> getFormRequestParams(FullHttpRequest request) {
        HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), request);
        List<InterfaceHttpData> httpPostData = decoder.getBodyHttpDatas();
        Map<String, String> params = new HashMap<>();
        for (InterfaceHttpData data : httpPostData) {
            if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                MemoryAttribute attribute = (MemoryAttribute) data;
                params.put(attribute.getName(), attribute.getValue());
            }
        }
        return params;
    }


}

与Netty集成HTTP的GET和POST通讯相似的内容:

Netty集成HTTP的GET和POST通讯

Netty+POST+GET

Springboot集成Netty实现TCP通讯

Netty

美团面试:说说Netty的零拷贝技术?

零拷贝技术(Zero-Copy)是一个大家耳熟能详的技术名词了,它主要用于提升 IO(Input & Output)的传输性能。 那么问题来了,为什么零拷贝技术能提升 IO 性能? 1.零拷贝技术和性能 在传统的 IO 操作中,当我们需要读取并传输数据时,我们需要在用户态(用户空间)和内核态(内核空

面试官:说说Netty对象池的实现原理?

Netty 作为一个高性能的网络通讯框架,它内置了很多恰夺天工的设计,目的都是为了将网络通讯的性能做到极致,其中「对象池技术」也是实现这一目标的重要技术。 1.什么是对象池技术? 对象池技术是一种重用对象以减少对象创建和销毁带来的开销的方法。在对象池中,只有第一次访问时会创建对象,并将其维护在内存中

拼多多面试:Netty如何解决粘包问题?

粘包和拆包问题也叫做粘包和半包问题,它是指在数据传输时,接收方未能正常读取到一条完整数据的情况(只读取了部分数据,或多读取到了另一条数据的情况)就叫做粘包或拆包问题。 从严格意义上来说,粘包问题和拆包问题属于两个不同的问题,接下来我们分别来看。 1.粘包问题 粘包问题是指在网络通信中,发送方连续发送

滴滴面试:谈谈你对Netty线程模型的理解?

Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。 Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一

面试官:说说Netty的核心组件?

Netty 核心组件是指 Netty 在执行过程中所涉及到的重要概念,这些核心组件共同组成了 Netty 框架,使 Netty 框架能够正常的运行。 Netty 核心组件包含以下内容: 启动器 Bootstrap/ServerBootstrap 事件循环器 EventLoopGroup/EventL

抖音面试:说说延迟任务的调度算法?

Netty 框架是以性能著称的框架,因此在它的框架中使用了大量提升性能的机制,例如 Netty 用于实现延迟队列的时间轮调度算法就是一个典型的例子。使用时间轮调度算法可以实现海量任务新增和取消任务的时间度为 O(1),那么什么是时间轮调度算法呢?接下来我们一起来看。 1.延迟任务实现 在 Netty

netty系列之: 在netty中使用 tls 协议请求 DNS 服务器

简介 在前面的文章中我们讲过了如何在netty中构造客户端分别使用tcp和udp协议向DNS服务器请求消息。在请求的过程中并没有进行消息的加密,所以这种请求是不安全的。 那么有同学会问了,就是请求解析一个域名的IP地址而已,还需要安全通讯吗? 事实上,不加密的DNS查询消息是很危险的,如果你在访问一

netty系列之:来,手把手教你使用netty搭建一个DNS tcp服务器

简介 在前面的文章中,我们提到了使用netty构建tcp和udp的客户端向已经公布的DNS服务器进行域名请求服务。基本的流程是借助于netty本身的NIO通道,将要查询的信息封装成为DNSMessage,通过netty搭建的channel发送到服务器端,然后从服务器端接受返回数据,将其编码为DNSR