# fpush **Repository Path**: ignore-anonymous/fpush ## Basic Information - **Project Name**: fpush - **Description**: 即时消息推送服务(即时通讯),基于Netty+protobuf-- Instant Messaging push service based on Netty+protobuf - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 3 - **Created**: 2020-03-11 - **Last Updated**: 2023-05-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## **Java开发推送服务fpush** ## 1.概述 基于SDK的消息推送服务。旨在做一个类似于极光推送、各推、小米推送之类的Java程序开源实现。开发平台是Java。
用于Java服务端推送实时和离线消息到Android/iOS客户端。 适用于推送消息和App端的即时通讯im场景。


fpush监控后台
👉 [http://fpush-admin.appjishu.com](http://fpush-admin.appjishu.com)
## 2.技术选型 要满足大量的连接数、同时支持双全工通信,并且性能也得有保障。我对Java语言比较擅长,于是选择了Java来实现。在 Java 技术栈中进行选型
首先自然是排除掉了传统IO,那就只有选NIO了,在这个层面其实选择也不多,考虑到社区、资料维护等方面最终选择了Netty。 序列化框架,选择了大名鼎鼎的
protobuf。具体的版本号如下 ``` 1. JDK1.8 2. Netty-4.1.31.Final 3. protobuf-java 3.6.1 ```
## 3.系统架构 ### 3.1系统部署架构图如下:

![](doc/arch-1.png)


### 3.2移动客户端鉴定权限原理

![](doc/client-passport.png)


### 3.3 server端推送消息到client端的原理
tcp通信图如下:

![](doc/tcp.png)
## 4.代码实现 ### 4.1协议定制 HTTP协议和XMPP协议的字节数太多,带宽消耗大,不适合推送服务。我们的推送服务需要自己定制的二进制协议。 下面是编写的protobuf配置文件 [fmessage.proto](https://github.com/liushaoming/fpush/blob/master/fpush-core/src/main/java/fmessage.proto) 进入本地的目录,例如D:\git\flylib\fpush\fpush-core\src\main\java执行 ```shell protoc --java_out=. fmessage.proto ``` 就可以生成FMessage之类的protobuf实体类,例如 com.appjishu.fpush.core.proto.FMessage ### 4.2应用服务器的注册 每个应用服务器需要在fpush-server上注册一个账号, 得到相应的三个参数APP_ID, APP_KEY, APP_SECRET_KEY
应用服务器应该请求fpush-server的这个http接口http://serverHost:10200/app/registerAccount
### 4.3长连接注册鉴权 Android和iOS客户端通过访问http接口http://serverHost:10200/app/keyToken 获取到clientToken 参数是long appId, String appKey 。 这里就是上面提到的APP_ID, APP_KEY Android端代码如下: ```java public synchronized NettyClient connect() throws Exception { OkHttpClient httpClient = new OkHttpClient.Builder().build(); FormBody formBody = new FormBody.Builder().add("appId", appId + "") .add("appKey", appKey).build(); String urlStr = "http://" + NetConstant.PUSH_HOST + ":" + NetConstant.API_PORT + NetConstant.KEY_TOKEN_URL; Request request = new Request.Builder().url(urlStr) .post(formBody) .build(); Call call = httpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.d(MY_TAG, "http failed."); } @Override public void onResponse(Call call, Response response) throws IOException { if (response.code() == 200) { String responseStr = response.body().string(); // ResponseData responseData = JSON.parseObject(responseStr, ResponseData.class); ResponseData responseData = gson.fromJson(responseStr, ResponseData.class); if (responseData.getCode() == 0) { String clientToken = (String) responseData.getData(); try { // MainActivity mainActivity = MainActivity.getInstance(); // if (mainActivity != null) { // Toast.makeText(MainActivity.getInstance(), "连接成功!", Toast.LENGTH_SHORT).show(); // } doStart(appId, clientToken); } catch (Exception e) { e.printStackTrace(); } } } } }); return this; } ```
Android使用自己的APP_ID, clientToken, MsgUser.alias(一般用应用系统里的userId作为alias标识一个移动设备)注册到fpush-server代码RegisterRequestHandler.java ```java package com.appjishu.fpush_demo.handler; import android.util.Log; import com.appjishu.fpush.core.constant.FMessageType; import com.appjishu.fpush.core.constant.RegisterState; import com.appjishu.fpush.core.model.MsgUser; import com.appjishu.fpush.core.proto.FBody; import com.appjishu.fpush.core.proto.FHeader; import com.appjishu.fpush.core.proto.FMessage; import com.appjishu.fpush_demo.singleton.CurrentUser; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class RegisterRequestHandler extends ChannelInboundHandlerAdapter { private static final String MY_TAG = "RegisterHandler"; public RegisterRequestHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildRegisterRequest()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FMessage) { FMessage receivedMsg = (FMessage) msg; FHeader receivedHeader = receivedMsg.getHeader(); if (receivedHeader!=null && receivedHeader.getType() == FMessageType.REGISTER_RESP.value()) { if (RegisterState.SUCCESS.equals(receivedHeader.getResultCode())) { Log.d(MY_TAG, "---client:register_SUCCESS---"); ctx.fireChannelRead(msg); return; } else { Log.d(MY_TAG, "---client:register_FAILED---DisconnectIt!!!--"); ctx.close(); } } else { ctx.fireChannelRead(msg); } } else { ctx.fireChannelRead(msg); } } private FMessage buildRegisterRequest() { FMessage.Builder builder = FMessage.newBuilder(); FHeader.Builder headerBuilder = FHeader.newBuilder(); headerBuilder.setAlias(CurrentUser.getInfo().getAlias()); headerBuilder.setAccount(CurrentUser.getInfo().getAccount()); headerBuilder.setType(FMessageType.REGISTER_REQ.value()); headerBuilder.setSessionId(1234); headerBuilder.setPriority(9); headerBuilder.setAppId(CurrentUser.getInfo().getAppId()); headerBuilder.setClientToken(CurrentUser.getInfo().getAppToken()); builder.setHeader(headerBuilder.build()); FMessage fMessage = builder.build(); return fMessage; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } ``` fpush-server的RegisterResponseHandler会进行后台鉴权处理, 原理图

![](doc/client-passport.png)


### 4.4应用服务器鉴权 应用服务端的http连接的鉴权
应用服务端(即app server)发送appId + appSecretKey到http接口http://serverHost:10200/app/secretToken ,经后台鉴定权限通过后,获取到appToken 应用服务端每次调用fpush-server的http api都需要带上appId+appToken ### 4.5心跳机制与TCP超时处理 tcp通信图如下:

![](doc/tcp.png)
客户端使用的是HeartBeatRequestHandler并定期发送Heartbeat ping,fpush-server使用HeartBeatResponseHandler
发送hearbeat pong进行回应
### 4.6保存fpush-server端的通道映射关系 RegisterResponseHandler中channelRead()里,如果客户端注册成功则把channel对象保存到NettyChannelMap这个Map里去 ```java String clientId = header.getAlias(); ctx.channel().attr(ChannelAttrKey.KEY_CLIENT_ID).set(clientId); NettyChannelMap.put(clientId, ctx.channel()); ``` 保存了通道的映射关系后,就可以在收到应用服务器发给fpush-server的消息后,通过Netty框架来实时把消息推送到Android客户端
channel.writeAndFlush(message)
### 4.7推送过程 应用服务器通过访问 http接口 http://localhost:10200/api/**
例如 http://localhost:10200/app/push?receiverAlias=lsm001&title=系统提醒&desc=消息内容001&data=abc
就是发送一条测试消息,描述为消息内容001,标题为系统提醒,发送给Android/iOS客户端,
接收者的alias为lsm001
后台ApiController把消息的内容写入缓存ToSendMap.aliasMap中去,如 ToSendMap.aliasMap.put(alias, list);
定时任务com.appjishu.fpush.server.boot.SendTask#scan每隔一定的时间间隔,会扫描ToSendMap.aliasMap
里的待发送的消息. 遍历后,会通过NettyChannelMap.get(alias)获取到Channel,然后 channel.writeAndFlush(message)
发送出去 ```java @Scheduled(fixedRate = 5000) public void scan() { for (Map.Entry> entry: ToSendMap.aliasMap.entrySet()) { String alias = entry.getKey(); List msgList = entry.getValue(); if (StringUtils.isNotEmpty(alias) && msgList != null && msgList.size() > 0) { pushService.doPush(msgList, alias); } } } ``` ```java public void doPush(List msgList, String alias) { FMessage fMessage = buildPushMessage(msgList, alias); if (fMessage != null) { log.info("---TringToDoPush()--->"); Channel channel = NettyChannelMap.get(alias); if (channel == null) { log.info("------channelIsNull---"); } else if (!channel.isWritable()) { log.info("------channelIsNotWritable---"); } else { ChannelFuture future = channel.writeAndFlush(fMessage); log.info("------msgWriten!!!---"); future.addListener(new ChannelFutureListener() { public void operationComplete(final ChannelFuture future) throws Exception { if (msgList.size() > 0) { msgList.remove(0); log.info("------removeAreadySentMsg!!!---"); } } }); } } } ```