正文
核心就是ChannelInitializer的实现使用http 消息解码器
package com.coremain.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
/**
* @description: Netty服务端回调处理 HTTP
* @author: GuoTong
* @createTime: 2023-05-16 22:05
* @since JDK 1.8 OR 11
**/
public class NettyServerHTTPHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// http 消息解码器
.addLast(new HttpServerCodec())
// http 消息合并(2048 为消息最大长度)
.addLast(new HttpObjectAggregator(2048))
// 自定义消息处理业务类
.addLast(new HttpMessageHandler());
// 此处还可以进行添加其他处理(例如ssl)等
}
}
核心2就是ChannelInboundHandlerAdapter的实现
package com.coremain.handler;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.coremain.netty.init.ServiceInit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.MemoryAttribute;
import io.netty.util.CharsetUtil;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @description: HTTP消息处理器: 通道入站处理适配器。
* 整个的IO处理操作环节包括:
* 从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端,
* 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到 ChannelInboundHandler入站处理器。
* 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器 到Netty的内部(如通道)。
* 按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器 的工作;
* 后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的 工作
* @author: GuoTong
* @createTime: 2023-05-16 22:06
* @since JDK 1.8 OR 11
**/
@SuppressWarnings("all")
public class HttpMessageHandler extends ChannelInboundHandlerAdapter {
private static final String CONTENT_TYPE = "Content-Type";
private static final String CONTENT_LENGTH = "Content-Length";
private static final String CONNECTION = "Connection";
private static final String KEEP_ALIVE = "keep-alive";
private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded";
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String MULTIPART_CONTENT_TYPE = "multipart/form-data";
private static final String TEXT_CONTENT_TYPE = "text/xml";
/**
* Description: 当通道注册完成后,Netty会调用fireChannelRegistered, 触发通道注册事件。
* 通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到
*
* @param ctx
* @author: GuoTong
* @date: 2023-05-17 19:53:18
* @return:void
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
/**
* Description: 通道未注册
*
* @param ctx
* @author: GuoTong
* @date: 2023-05-17 19:55:48
* @return:void
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
/**
* Description: 当通道激活完成后,Netty会调用fireChannelActive, 触发通道激活事件。通道会启动该入站操作的流水线处理,
* 在通道注册过的入站处理器Handler的channelActive方法,会被调用到。
*
* @param ctx
* @author: GuoTong
* @date: 2023-05-17 19:56:37
* @return:void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* Description: 当连接被断开或者不可用,Netty会调用fireChannelInactive, 触发连接不可用事 件。通道会启动对应的流水线处理,
* 在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。
*
* @param ctx
* @author: GuoTong
* @date: 2023-05-17 19:57:06
* @return:void
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
/**
* Description: 当通道缓冲区读完,Netty会调用fireChannelReadComplete, 触发通道读完事 件。通道会启动该入站操作的流水线处理
* ,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。
*
* @param ctx
* @author: GuoTong
* @date: 2023-05-17 19:57:31
* @return:void
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* Description: 当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,
* 在通道注册过的处理器Handler的 exceptionCaught方法,会被调用到。
* 注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到了该方法。
*
* @param ctx
* @param cause
* @author: GuoTong
* @date: 2023-05-17 19:58:31
* @return:void
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* Description: 当通道缓冲区可读,Netty会调用fireChannelRead, 触发通道可读事件。通道会启动该入站操作的流水线处理,
* 在通道注册过的入站处理器Handler的channelRead方法,会被调用到。
*
* @param ctx
* @param msg
* @author: GuoTong
* @date: 2023-05-17 19:58:09
* @return:void
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 仅处理http请求
if (msg instanceof FullHttpRequest) {
//客户端的请求对象
FullHttpRequest req = (FullHttpRequest) msg;
//新建一个返回消息的Json对象
String responseJson = "";
// 获取请求方式
HttpMethod method = req.method();
// 判断请求方式是GET还是POST
boolean isPost = method.equals(HttpMethod.POST);
//把客户端的请求数据格式化为Json对象
JSONObject requestJson = null;
try {
if (isPost) {
requestJson = getPostRequestParams(req);
} else {
requestJson = getUrlRequestParams(req);
}
} catch (Exception e) {
ResponseJson(ctx, req, "参数解析失败:" + e.getMessage());
return;
}
// 此处不进行请求方式区分,分局请求url 进行区分(url格式:/类标识/方法标识)
// 维护两个map, a: Map<类标识, Class(类对应的class)> b: Map<方法名, Class(方法参数对应的class)>
//获取客户端的URL
String uri = req.getUri();
String[] uris;
if (uri.contains("?")) {
uris = uri.substring(1, uri.indexOf("?")).split("/");
} else {
uris = uri.substring(1).split("/");
}
if (uris.length == 3) {
try {
Object result = dealService(requestJson, uris, isPost);
if (result instanceof String) {
responseJson = (String) result;
} else {
responseJson = JSON.toJSONString(result);
}
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
ResponseJson(ctx, req, "业务调用异常" + e.getMessage());
} catch (Exception e) {
ResponseJson(ctx, req, "业务调用异常" + e.getMessage());
}
} else {
ResponseJson(ctx, req, "访问路径不合法");
}
//向客户端发送结果
ResponseJson(ctx, req, responseJson);
}
}
/**
* 处理真正业务
*
* @param requestJson 请求参数
* @param uris ([项目标识,类标识,方法标识])
* @return
*/
private Object dealService(JSONObject requestJson, String[] uris, boolean isPost) throws Exception {
// 获取项目名
String project_identity = uris[0];
// 获取类标识
String class_identity = uris[1];
// 获取方法标识
String method_identity = uris[2];
// 获取类class 以及 方法参数class
Map<Class, Map<String, Class>> classClassMap = ServiceInit.CLASS_MAPPING.get(class_identity).get(method_identity);
List<Map.Entry<Class, Map<String, Class>>> collect = classClassMap.entrySet().stream().collect(Collectors.toList());
Map.Entry<Class, Map<String, Class>> classEntry = collect.get(0);
// 业务类class
Class serviceClass = classEntry.getKey();
// 参数class
Map<String, Class> paramClasses = classEntry.getValue();
// 封装参数的值
Object[] params = new Object[paramClasses.size()];
// 反射
Class[] classes = new Class[params.length];
if (isPost && paramClasses.size() == 1) {
Class c = paramClasses.entrySet().iterator().next().getValue();
params[0] = JSON.parseObject(requestJson.toJSONString(), c);
classes[0] = c;
} else {
Object[] webSendParams = requestJson.values().toArray();
if (paramClasses.size() != webSendParams.length) {
throw new Exception("参数不匹配");
}
// 获取参数类型,解析拼装参数的值
int i = 0;
for (Map.Entry<String, Class> entry : paramClasses.entrySet()) {
String key = entry.getKey();
Class pClass = entry.getValue();
// 以此拼接传入参数
Object value = webSendParams[i];
// 判断参数类型,强转
if (pClass.equals(String.class)) {
params[i] = (String) value;
} else {
// 默认不是String就是用Integer接收
params[i] = Integer.parseInt(value.toString());
}
classes[i] = pClass;
i++;
}
}
// 业务类对象
Object service = serviceClass.getDeclaredConstructor().newInstance();
// 根据方法标识反射调用该方法
Method method = serviceClass.getDeclaredMethod(method_identity, classes);
// 这样设置完之后,在外部也是可以访问private的。
method.setAccessible(true);
Object result = method.invoke(service, params);
// 返回结果
return result;
}
/**
* 响应HTTP的请求
*
* @param ctx
* @param req
* @param jsonStr
*/
private void ResponseJson(ChannelHandlerContext ctx, FullHttpRequest req, String jsonStr) {
byte[] jsonByteByte = (jsonStr + "\r\n").getBytes();
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(jsonByteByte));
String contentType = req.headers().get(CONTENT_TYPE);
switch (contentType) {
case FORM_CONTENT_TYPE:
// 最常见的 POST 提交数据的方式了。浏览器的原生 表单
response.headers().set(CONTENT_TYPE, FORM_CONTENT_TYPE);
break;
case JSON_CONTENT_TYPE:
// 告诉服务端消息主体是序列化后的 JSON 字符串
response.headers().set(CONTENT_TYPE, JSON_CONTENT_TYPE);
break;
case MULTIPART_CONTENT_TYPE:
// 常见的 POST 数据提交的方式。我们使用表单上传文件时,必须让 表单的 enctype 等于 multipart/form-data。
response.headers().set(CONTENT_TYPE, MULTIPART_CONTENT_TYPE);
break;
case TEXT_CONTENT_TYPE:
// 它是一种使用 HTTP 作为传输协议,XML 作为编码方式的远程调用规范
response.headers().set(CONTENT_TYPE, TEXT_CONTENT_TYPE);
break;
default:
response.headers().set(CONTENT_TYPE, "text/json");
}
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response);
ctx.flush();
ctx.close();
}
/**
* 获取post请求的内容
*
* @param request
* @return
*/
private JSONObject getPostRequestParams(FullHttpRequest request) {
// 获取请求的 Content-Type 类型
String contentType = request.headers().get(CONTENT_TYPE);
// 获取报文信息
ByteBuf jsonBuf = request.content();
String jsonStr = jsonBuf.toString(CharsetUtil.UTF_8);
JSONObject json = JSONObject.of();
switch (contentType) {
case FORM_CONTENT_TYPE:
// 最常见的 POST 提交数据的方式了。浏览器的原生 表单
String[] keyvalues = jsonStr.split("&");
for (int i = 0; i < keyvalues.length; i++) {
// 放入键值对
json.put(keyvalues[i], keyvalues[i + 1]);
// 指针前进一格
i++;
}
break;
case JSON_CONTENT_TYPE:
// 告诉服务端消息主体是序列化后的 JSON 字符串
json = JSON.parseObject(jsonStr);
break;
case MULTIPART_CONTENT_TYPE:
// 常见的 POST 数据提交的方式。我们使用表单上传文件时,必须让 表单的 enctype 等于 multipart/form-data。
Map<String, String> form = getFormRequestParams(request);
json = (JSONObject) JSON.toJSON(form);
break;
case TEXT_CONTENT_TYPE:
// 它是一种使用 HTTP 作为传输协议,XML 作为编码方式的远程调用规范
break;
}
return json;
}
/**
* 获取url参数
*
* @param request
* @return
*/
private JSONObject getUrlRequestParams(FullHttpRequest request) {
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
Map<String, List<String>> parameters = decoder.parameters();
JSONObject jsonObject = new JSONObject();
Iterator<Map.Entry<String, List<String>>> iterator = parameters.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, List<String>> next = iterator.next();
List<String> value = next.getValue();
jsonObject.put(next.getKey(), value.size() > 1 ? value : next.getValue().get(0));
}
return jsonObject;
}
/**
* 获取表单参数
*
* @param request
* @return
*/
private Map<String, String> getFormRequestParams(FullHttpRequest request) {
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), request);
List<InterfaceHttpData> httpPostData = decoder.getBodyHttpDatas();
Map<String, String> params = new HashMap<>();
for (InterfaceHttpData data : httpPostData) {
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
MemoryAttribute attribute = (MemoryAttribute) data;
params.put(attribute.getName(), attribute.getValue());
}
}
return params;
}
}