前言
了解下http2.0
- 浏览器方式一些网站频繁发送请求,造成一家独大其他网站无法使用。或者所有网站都频发发送请求造成用户体验差等等问题。限制每个url同时并发数量
- 提高请求的响应速度。只有一个连接,只有一次tcp三次握手或者tls的7次握手。一个http1.1请求所用的时间,http2.0可以处理三到四个请求。
- 提高服务端与客服端的性能(尤其是大型互联网公司流量很大,如果使用http2.0,可以减少一半的http服务器)
协商
原因 一
原因二
解决
https 1.1 与https.20的协商
http1.1与http2.0的协商
请求头示例:
GET / HTTP/1.1
Host: example.com
Connection: Upgrade, HTTP2-Settings
Upgrade: h2c
HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>
如果服务端不支持 HTTP/2,它会忽略 Upgrade 字段,直接返回 HTTP/1.1 响应,例如:
HTTP/1.1 200 OK
Content-Length: 243
Content-Type: text/html
如果服务端支持 HTTP/2,那就可以回应 101 状态码及对应头部:
HTTP/1.1 100 Switching Protocols
Connection: Upgrade
Upgrade: h2c
小结
- https 1.1 与https.2.0的协商 与 http1.1与http2.0的协商 是两套设计方案。https 1.1 与https.2.0 TLS帮你做了。http1.1与http2.0的协商需要自己做。
- 现在的趋势,客服端与服务端都需要同时支持http1.1,http2.0,https1.1,https2.0。真是一件很麻烦的事情
netty http2
- http2核心模块
- http1.1与http2协商模块
- http2帧处理模块
- http2协议转http1协议模块
http2核心模块
Http2ConnectionHandler
负责http2模块的组合
protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
this.initialSettings = checkNotNull(initialSettings, "initialSettings");
this.decoder = checkNotNull(decoder, "decoder");
this.encoder = checkNotNull(encoder, "encoder");
if (encoder.connection() != decoder.connection()) {
throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
}
}
负责http2协议下handler生命周期处理
@Override
public void flush(ChannelHandlerContext ctx) {
try {
// Trigger pending writes in the remote flow controller.
encoder.flowController().writePendingBytes();
ctx.flush();
} catch (Http2Exception e) {
onError(ctx, true, e);
} catch (Throwable cause) {
onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
}
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Initialize the encoder, decoder, flow controllers, and internal state.
encoder.lifecycleManager(this);
decoder.lifecycleManager(this);
encoder.flowController().channelHandlerContext(ctx);
decoder.flowController().channelHandlerContext(ctx);
byteDecoder = new PrefaceDecoder(ctx);
}
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
if (byteDecoder != null) {
byteDecoder.handlerRemoved(ctx);
byteDecoder = null;
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (byteDecoder == null) {
byteDecoder = new PrefaceDecoder(ctx);//当链接创建的时候创建PrefaceDecoder对象
}
byteDecoder.channelActive(ctx);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Call super class first, as this may result in decode being called.
super.channelInactive(ctx);
if (byteDecoder != null) {
byteDecoder.channelInactive(ctx);
byteDecoder = null;
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
// the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
try {
if (ctx.channel().isWritable()) {
flush(ctx);
}
encoder.flowController().channelWritabilityChanged();
} finally {
super.channelWritabilityChanged(ctx);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byteDecoder.decode(ctx, in, out);
}
负责http2 Lifecycle Manager(http2生命周期的管理)
public interface Http2LifecycleManager {
void closeStreamLocal(Http2Stream stream, ChannelFuture future);
void closeStreamRemote(Http2Stream stream, ChannelFuture future);
void closeStream(Http2Stream stream, ChannelFuture future);
ChannelFuture resetStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise);
ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise);
closeStreamLocal
closeStreamRemote
closeStream
resetStream
负责校验协议行为
public void onHttpClientUpgrade() throws Http2Exception {
if (connection().isServer()) {
throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
}
if (!prefaceSent()) {
// If the preface was not sent yet it most likely means the handler was not added to the pipeline before
// calling this method.
throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
}
if (decoder.prefaceReceived()) {
throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
}
// Create a local stream used for the HTTP cleartext upgrade.
connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
}
/**
* Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
* @param settings the settings for the remote endpoint.
*/
public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
if (!connection().isServer()) {
throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
}
if (!prefaceSent()) {
// If the preface was not sent yet it most likely means the handler was not added to the pipeline before
// calling this method.
throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
}
if (decoder.prefaceReceived()) {
throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
}
// Apply the settings but no ACK is necessary.
encoder.remoteSettings(settings);
// Create a stream in the half-closed state.
connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
}
管理http2协议中的连接前言
客服端发送前言
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the connection preface to the remote endpoint.
sendPreface(ctx);
}
private void sendPreface(ChannelHandlerContext ctx) throws Exception {
if (prefaceSent || !ctx.channel().isActive()) {
return;
}
prefaceSent = true;
final boolean isClient = !connection().isServer();
if (isClient) {
// Clients must send the preface string as the first bytes on the connection.
ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
// Both client and server must send their initial settings.
encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
if (isClient) {
// If this handler is extended by the user and we directly fire the userEvent from this context then
// the user will not see the event. We should fire the event starting with this handler so this class
// (and extending classes) have a chance to process the event.
userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
}
}
服务端校验连接前言
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
// After the preface is read, it is time to hand over control to the post initialized decoder.
byteDecoder = new FrameDecoder();
byteDecoder.decode(ctx, in, out);
}
} catch (Throwable e) {
onError(ctx, false, e);
}
}
private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
if (clientPrefaceString == null) {
return true;
}
int prefaceRemaining = clientPrefaceString.readableBytes();
int bytesRead = min(in.readableBytes(), prefaceRemaining);
// If the input so far doesn't match the preface, break the connection.
if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
clientPrefaceString, clientPrefaceString.readerIndex(),
bytesRead)) {
int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
int http1Index =
ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
if (http1Index != -1) {
String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
}
String receivedBytes = hexDump(in, in.readerIndex(),
min(in.readableBytes(), clientPrefaceString.readableBytes()));
throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
"Hex dump for received bytes: %s", receivedBytes);
}
in.skipBytes(bytesRead);
clientPrefaceString.skipBytes(bytesRead);
if (!clientPrefaceString.isReadable()) {
// Entire preface has been read.
clientPrefaceString.release();
clientPrefaceString = null;
return true;
}
return false;
}
private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
if (in.readableBytes() < 5) {
// Need more data before we can see the frame type for the first frame.
return false;
}
short frameType = in.getUnsignedByte(in.readerIndex() + 3);
short flags = in.getUnsignedByte(in.readerIndex() + 4);
if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
"Hex dump for first 5 bytes: %s",
hexDump(in, in.readerIndex(), 5));
}
return true;
}
http2链接管理 , streamId的管理,流量管理,权重管理
http2的编码与解码
goAway与resetStream帧的处理方式,是由lifecycleManager(http2生命周期)
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
}
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
// Delegate to the lifecycle manager for proper updating of connection state.
return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
}
重点:FlowControlledHeaders与FlowControlledData主要是用于流量控制与权重使用,不属于编码范畴
- Handler(Http2ConnectionHandle) 负责接受到buf数据
- PrefaceDecoder负责校验连接前言
- FrameDecoder 负责执行Http2ConnectionDecoder。Http2ConnectionDecoder实现类为 DefaultHttp2ConnectionDecoder
- frameReader 负责解析协议。 Http2FrameReader 实现类为DefaultHttp2FrameReader
- FrameListener负责对帧进行处理。FrameListener的实现类有很多,主要分FrameReadListener与其他FrameListener。FrameReadListener负责帧进行http2特性维护,维护成功调用其他FrameListener。由其他FrameListener进行帧处理。
帧的解析与处理
frameReader 负责解析协议。 Http2FrameReader 实现类为DefaultHttp2FrameReader
@Override
public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)
throws Http2Exception {
if (readError) {
input.skipBytes(input.readableBytes());
return;
}
try {
do {
if (readingHeaders) {
processHeaderState(input);// 解析帧
if (readingHeaders) {
// Wait until the entire header has arrived.
return;
}
}
processPayloadState(ctx, input, listener);// 处理帧
if (!readingHeaders) {
// Wait until the entire payload has arrived.
return;
}
} while (input.isReadable());
} catch (Http2Exception e) {
readError = !Http2Exception.isStreamError(e);
throw e;
} catch (RuntimeException e) {
readError = true;
throw e;
} catch (Throwable cause) {
readError = true;
PlatformDependent.throwException(cause);
}
}
private void processHeaderState(ByteBuf in) throws Http2Exception {
if (in.readableBytes() < FRAME_HEADER_LENGTH) {
// Wait until the entire frame header has been read.
return;
}
// Read the header and prepare the unmarshaller to read the frame.
payloadLength = in.readUnsignedMedium();// 帧长度
if (payloadLength > maxFrameSize) {
throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,
maxFrameSize);
}
frameType = in.readByte();// 帧类型
flags = new Http2Flags(in.readUnsignedByte());
streamId = readUnsignedInt(in);// 得到streamId 帧头的9个字节处理完毕
// We have consumed the data, next time we read we will be expecting to read the frame payload.
readingHeaders = false;
switch (frameType) {//进行校验
case DATA:
verifyDataFrame();
break;
case HEADERS:
verifyHeadersFrame();
break;
case PRIORITY:
verifyPriorityFrame();
break;
case RST_STREAM:
verifyRstStreamFrame();
break;
case SETTINGS:
verifySettingsFrame();
break;
case PUSH_PROMISE:
verifyPushPromiseFrame();
break;
case PING:
verifyPingFrame();
break;
case GO_AWAY:
verifyGoAwayFrame();
break;
case WINDOW_UPDATE:
verifyWindowUpdateFrame();
break;
case CONTINUATION:
verifyContinuationFrame();
break;
default:
// Unknown frame type, could be an extension.
verifyUnknownFrame();
break;
}
}
private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
throws Http2Exception {
if (in.readableBytes() < payloadLength) {
// Wait until the entire payload has been read.
return;
}
// Get a view of the buffer for the size of the payload.
ByteBuf payload = in.readSlice(payloadLength);
// We have consumed the data, next time we read we will be expecting to read a frame header.
readingHeaders = true;
// Read the payload and fire the frame event to the listener.
switch (frameType) {
case DATA:
readDataFrame(ctx, payload, listener);
break;
case HEADERS:
readHeadersFrame(ctx, payload, listener);
break;
case PRIORITY:
readPriorityFrame(ctx, payload, listener);
break;
case RST_STREAM:
readRstStreamFrame(ctx, payload, listener);
break;
case SETTINGS:
readSettingsFrame(ctx, payload, listener);
break;
case PUSH_PROMISE:
readPushPromiseFrame(ctx, payload, listener);
break;
case PING:
readPingFrame(ctx, payload.readLong(), listener);
break;
case GO_AWAY:
readGoAwayFrame(ctx, payload, listener);
break;
case WINDOW_UPDATE:
readWindowUpdateFrame(ctx, payload, listener);
break;
case CONTINUATION:
readContinuationFrame(payload, listener);
break;
default:
readUnknownFrame(ctx, payload, listener);
break;
}
}
http2请求头子模块
http1.1与http2协商模块
private void configureSsl(SocketChannel ch) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler());
}
https 1.1 与 https 2.0的协商
public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler {
private static final int MAX_CONTENT_LENGTH = 1024 * 100;
protected Http2OrHttpHandler() {
super(ApplicationProtocolNames.HTTP_1_1);
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(new HttpToHttp2ConnectionHandler(),
new HelloWorldHttp1Handler("ALPN Negotiation"));
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addLast(new HttpServerCodec(),
new HttpObjectAggregator(MAX_CONTENT_LENGTH),
new HelloWorldHttp1Handler("ALPN Negotiation"));
return;
}
throw new IllegalStateException("unknown protocol: " + protocol);
}
}
http 1.1 与 http 2.0的协商
- http客户端与服务端都要支持http1.1协议
- http客户端与服务端都要支持Upgrade机制
- 进行协商
- http客户端与服务端都要删除http1.1协议与Upgrade机制的支持
- http客户端与服务端都要加入http2.0协议的支持
服务端如何支持Upgrade机制
private static final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() {
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(new HttpToHttp2ConnectionHandler());
} else {
return null;
}
}
};
HttpServerCodec sourceCodec = new HttpServerCodec();
HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
cp.addLast(ourceCodec, upgradeHandler);
HttpServerUpgradeHandler Upgrade机制处理handler
private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
// c获得请求头upgrade的数据
final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE));
final int numRequestedProtocols = requestedProtocols.size();
UpgradeCodec upgradeCodec = null;
CharSequence upgradeProtocol = null;
for (int i = 0; i < numRequestedProtocols; i ++) {
final CharSequence p = requestedProtocols.get(i);//获得协议
final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);//识别是否支持改协议
if (c != null) {
upgradeProtocol = p;
upgradeCodec = c;
break;
}
}
// 没有upgradeCodec,不支持。client发过的协议
if (upgradeCodec == null) {
return false;
}
// uconnection请求头,表示upgrade机制不完整
CharSequence connectionHeader = request.headers().get(HttpHeaderNames.CONNECTION);
if (connectionHeader == null) {
return false;
}
Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
List<CharSequence> values = splitHeader(connectionHeader);
if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) ||
!containsAllContentEqualsIgnoreCase(values, requiredHeaders)) {
return false;
}
for (CharSequence requiredHeader : requiredHeaders) {
if (!request.headers().contains(requiredHeader)) {
return false;
}
}
final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);//创建响应,并返回协商后的协议。
// prepareUpgradeResponse解析Http2-setting
if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) {
return false;
}
// 创建事件,谁需要处理,谁去处理。
final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request);
try {
// 返回数据。这里是一个大坑。
final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse);
// 删除http1.1 handler
sourceCodec.upgradeFrom(ctx);
// 添加http2.0 handler,这里会识别上面的大坑
upgradeCodec.upgradeTo(ctx, request);
// 删除自己,即HttpServerUpgradeHandler。那么netty的handler应该只剩下http2.0handler与业务处理handler了
ctx.pipeline().remove(HttpServerUpgradeHandler.this);
// 传播 事件
ctx.fireUserEventTriggered(event.retain());
writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} finally {
event.release();
}
return true;
}
Http2ServerUpgradeCode Upgrade协议解析器
private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, CharSequence settingsHeader)
throws Http2Exception {
ByteBuf header = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(settingsHeader), CharsetUtil.UTF_8);
try {
ByteBuf payload = Base64.decode(header, URL_SAFE);// base64解码
ByteBuf frame = createSettingsFrame(ctx, payload);
return decodeSettings(ctx, frame);
} finally {
header.release();
}
}
// 把解码之后的数据拼接成 http2.0的SETTINGS帧
private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) {
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes());
writeFrameHeader(frame, payload.readableBytes(), SETTINGS, new Http2Flags(), 0);
frame.writeBytes(payload);
payload.release();
return frame;
}
// 下面详解了Http2Settings里面数据的作用。
public void remoteSettings(Http2Settings settings) throws Http2Exception {
Boolean pushEnabled = settings.pushEnabled();
Http2FrameWriter.Configuration config = configuration();
Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
if (pushEnabled != null) {
if (!connection.isServer() && pushEnabled) {
throw connectionError(PROTOCOL_ERROR,
"Client received a value of ENABLE_PUSH specified to other than 0");
}
connection.remote().allowPushTo(pushEnabled);
}
Long maxConcurrentStreams = settings.maxConcurrentStreams();
if (maxConcurrentStreams != null) {
connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
}
Long headerTableSize = settings.headerTableSize();
if (headerTableSize != null) {
outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
}
Long maxHeaderListSize = settings.maxHeaderListSize();
if (maxHeaderListSize != null) {
outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
}
Integer maxFrameSize = settings.maxFrameSize();
if (maxFrameSize != null) {
outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
}
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
flowController().initialWindowSize(initialWindowSize);
}
}
客户端如何支持并且触发Upgrade机制
一个简单的客户端代码
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
HttpClientCodec sourceCodec = new HttpClientCodec();
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(Http2Handler.newHandler(false));
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536);
pipeline.addLast(
sourceCodec,
upgradeHandler,
new UpgradeRequestHandler(),
}
private final class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest =new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeRequest);
ctx.fireChannelActive();
ctx.pipeline().remove(this);
}
}
- 什么时候发送请求
- 谁来发送请求
- 谁来在请求里面加入请求头
HttpClientUpgradeHandler 负责拦截请求并添加请求头
private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
// 添加UPGRADE请求头
request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());
// 添加CONNECTIO请求头
Set<CharSequence> connectionParts = new LinkedHashSet<CharSequence>(2);
connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));// 添加Http2-setting请求头
// Set the CONNECTION header from the set of all protocol-specific headers that were added.
StringBuilder builder = new StringBuilder();
for (CharSequence part : connectionParts) {
builder.append(part);
builder.append(',');
}
builder.append(HttpHeaderValues.UPGRADE);
request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());
}
HttpClientUpgradeHandler 只会对第一次的请求,进行Upgrade操作,之后全部一场啊
http2协议处理模块
以下是每种帧对应一个封装好的实体类。
Http2FrameCodec 内部Http2FrameListener的实现
private final class FrameListener implements Http2FrameListener {
@Override
public void onUnknownFrame(
ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
.stream(requireStream(streamId)).retain());
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
}
@Override
public void onPingRead(ChannelHandlerContext ctx, long data) {
onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, long data) {
onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
if (streamId == 0) {
// Ignore connection window updates.
return;
}
onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight, boolean
exclusive, int padding, boolean endStream) {
onHeadersRead(ctx, streamId, headers, padding, endStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
.stream(requireStream(streamId)));
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
.stream(requireStream(streamId)).retain());
// We return the bytes in consumeBytes() once the stream channel consumed the bytes.
return 0;
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
}
@Override
public void onPriorityRead(
ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
// TODO: Maybe handle me
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) {
// TODO: Maybe handle me
}
@Override
public void onPushPromiseRead(
ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) {
// TODO: Maybe handle me
}
private Http2FrameStream requireStream(int streamId) {
Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
if (stream == null) {
throw new IllegalStateException("Stream object required for identifier: " + streamId);
}
return stream;
}
}
http2协议转http1协议模块
- InboundHttp2ToHttpAdapter(实现转换的类)以及InboundHttp2ToHttpAdapterBuilder
- HttpToHttp2ConnectionHandler(发http1.1的对象解析成http2协议发送出去)以及HttpToHttp2ConnectionHandlerBuilder
下面代码简单描述了http1.1的对象如何解析成http2协议并且发送出去
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) {
ctx.write(msg, promise);
return;
}
boolean release = true;
SimpleChannelPromiseAggregator promiseAggregator =
new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
try {
Http2ConnectionEncoder encoder = encoder();
boolean endStream = false;
if (msg instanceof HttpMessage) {
final HttpMessage httpMsg = (HttpMessage) msg;
// Provide the user the opportunity to specify the streamId
currentStreamId = getStreamId(httpMsg.headers());
// Convert and write the headers.
Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders);
endStream = msg instanceof FullHttpMessage && !((FullHttpMessage) msg).content().isReadable();
writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers,
endStream, promiseAggregator);
}
if (!endStream && msg instanceof HttpContent) {
boolean isLastContent = false;
HttpHeaders trailers = EmptyHttpHeaders.INSTANCE;
Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE;
if (msg instanceof LastHttpContent) {
isLastContent = true;
// Convert any trailing headers.
final LastHttpContent lastContent = (LastHttpContent) msg;
trailers = lastContent.trailingHeaders();
http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders);
}
// Write the data
final ByteBuf content = ((HttpContent) msg).content();
endStream = isLastContent && trailers.isEmpty();
release = false;
encoder.writeData(ctx, currentStreamId, content, 0, endStream, promiseAggregator.newPromise());
if (!trailers.isEmpty()) {
// Write trailing headers.
writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true, promiseAggregator);
}
}
} catch (Throwable t) {
onError(ctx, true, t);
promiseAggregator.setFailure(t);
} finally {
if (release) {
ReferenceCountUtil.release(msg);
}
promiseAggregator.doneAllocatingPromises();
}
}
- Stream转http1.1对象。 请看Http2StreamFrameToHttpObjectCodec。感觉这个类比较鸡肋。
参考资料
HTTP/2 协议规范(很全面文档) https://blog.csdn.net/u010129119/article/details/79361949#1-%E7%AE%80%E4%BB%8B
关于HTTP2和HTTPS,这些你必须要知道 https://cloud.tencent.com/info/b91491d04eae06df8fd4b055545f10ae.html
谈谈 HTTP/2 的协议协商机制 http://web.jobbole.com/85646/