您的位置:首页 > 移动互联

开源项目:用环信MQTT实现“世界频道”只需5分钟【附源码】

发布时间:2022-07-14 12:09:38  来源:互联网     背景:

  说到“世界频道”想必大家都不陌生,常见的如王者荣耀的世界广播摇人组队以及最近兴起的Discord社区交友等等。究其目的就是在应用内让海量用户可以实时互动。有些开发者为了实现这种场景会选择聊天室方案来实现,但是这种方式存在一定的局限性,比如聊天室人数上限、海量消息处理等各种情况。

  当然如果有钱有颜,可以直接选择云厂商产品(比如环信的聊天室方案和超级社区),如果有才有time,也可以选择平替版MQTT实现方案。今天小猿将介绍用环信MQTT消息云实现应用内的世界频道,满满干货,不要错过~~

  使用MQTT实现世界频道-Demo效果演示

  协议优势:

  在介绍具体方案之前,我们先唠一唠为啥选择MQTT协议。

  轻量级:MQTT本身是物联网的连接协议,专为受限设备和低带宽场景使用。所以其代码占用空间较小,同样适用于注重SDK大小的移动应用领域(比如:游戏领域)。

  易集成:MQTT作为标准开放的消息协议,经过多年演进,已支持30多种开发语言,10余种SDK,无论何种开发环境,都可以快速找到开源SDK。

  高并发:MQTT是轻量级的消息传输协议,2字节心跳报文,最小化传输和连接成本,云厂商broker产品都可支持千万级并发接入,适用于高并发连接场景。

  低成本:MQTT是基于客户端-服务器的订阅/发布模型,通过服务器中间件实现消息分发,减少消息复制成本,快速实现一对多在线推送。

  灵活性:MQTT协议支持多种消息特性,包括:topic主题层级、消息分级(QoS0,1,2)、遗嘱消息、保留消息等,可以灵活实现多种业务场景。

  衍生功能:随着MQTT云服务的发展,部分服务器厂商已支持消息存储、获取在线设备列表、查看历史消息等衍生功能,降低开发工作量与消息存储成本。

  实现方案:

  言归正传,上干货。本次技术实现方案包含:移动客户端(Android)、后端服务(Java)以及MQTT服务器。这里提一下,MQTT服务器使用环信MQTT消息云,使用三方云服务比较省心,既节省开发时间,产品性能也不需要担心,现在注册可以直接使用环信MQTT消息云超高额度的免费版:每月100并发连接、300万消息,完全满足功能开发使用。

  客户端实现:

  客户端实现主要包含以下两部分:

  底层MQTT业务集成:包含引入SDK、MQTT方法封装、业务交互(消息收发)。

  APP上层交互:在APP首页提供世界频道入口,实现心情弹幕飘窗(接收)和发送。

  接下来上底层MQTT业务集成代码。

  引入SDK:

  这一步环信官方文档比较明确,就是根据自己的平台引入相应的mqtt客户端sdk,这里简单贴一下AndroidStudio的引入配置

  1// 在根目录 build.gradle repositories 下加入配置

  2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

  3...

  4// 然后加入 MQTT 依赖

  5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk

  6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

  7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

  方法封装

  这里贴一下对mqtt相关方法的简单封装,代码在vmmqtt模块儿的MQTTHelper类下:

  1 /**

  2 * Create by lzan13 on 2022/3/22

  3 * 描述:MQTT 帮助类

  4 */

  5 object MQTTHelper {

  6

  7 private var mqttClient: MqttAndroidClient? = null

  8

  9 // 缓存主题集合

  10 private val topicList = mutableListOf()

  11

  12 /**

  13 * 链接MQTT

  14 * @param id 用户 Id

  15 * @param token 用户链接 MQTT 的 Token

  16 * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅

  17 */

  18 fun connect(id: String, token: String, topic: String = "") {

  19 // 处理订阅主题

  20 if (topic.isNotEmpty()) topicList.add(topic)

  21

  22 // 拼接链接地址

  23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

  24 // 拼接 clientId

  25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

  26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

  27

  28 //连接参数

  29 val options = MqttConnectOptions()

  30 options.isAutomaticReconnect = true //设置自动重连

  31 options.isCleanSession = true // 缓存

  32 options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒

  33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒

  34 options.userName = id // 用户名

  35 options.password = token.toCharArray() // 密码

  36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

  37 // 设置MQTT监听

  38 mqttClient?.setCallback(object : MqttCallback {

  39 override fun connectionLost(t: Throwable) {

  40 // 通知链接断开

  41 VMLog.d("MQTT 链接断开 $t")

  42 }

  43

  44 @Throws(Exception::class)

  45 override fun messageArrived(topic: String, message: MqttMessage) {

  46 // 通知收到消息

  47 VMLog.d("MQTT 收到消息:$message")

  48 // 如果未订阅则直接丢弃

  49 if (!topicList.contains(topic)) return

  50 notifyEvent(topic, String(message.payload))

  51 }

  52

  53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

  54 })

  55 //进行连接

  56 mqttClient?.connect(options, null, object : IMqttActionListener {

  57 override fun onSuccess(token: IMqttToken) {

  58 VMLog.d("MQTT 链接成功")

  59 // 链接成功,循环订阅缓存的主题

  60 topicList.forEach { subscribe(it) }

  61 }

  62

  63 override fun onFailure(token: IMqttToken, t: Throwable) {

  64 VMLog.d("MQTT 链接失败 $t")

  65 }

microsoft visual studio2020

  66 })

  67 }

  68

  69 /**

  70 * 订阅主题

5g商用以来各手机厂商纷纷推出5g智能手机

  71 * @param topic 主题

华为没有创新

  72 */

  73 fun subscribe(topic: String) {

  74 if (!topicList.contains(topic)) {

  75 topicList.add(topic)

  76 }

  77 try {

  78 //连接成功后订阅主题

  79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

  80 override fun onSuccess(token: IMqttToken) {

  81 VMLog.d("MQTT 订阅成功 $topic")

  82 }

  83

  84 override fun onFailure(token: IMqttToken, t: Throwable) {

  85 VMLog.d("MQTT 订阅失败 $topic $t")

  86 }

  87 })

  88 } catch (e: MqttException) {

  89 e.printStackTrace()

  90 }

  91 }

  92

  93 /**

  94 * 取消订阅

  95 * @param topic 主题

  96 */

  97 fun unsubscribe(topic: String) {

  98 if (topicList.contains(topic)) {

  99 topicList.remove(topic)

  100 }

  101 try {

  102 mqttClient?.unsubscribe(topic)

  103 } catch (e: MqttException) {

  104 e.printStackTrace()

  105 }

  106 }

  107

  108 /**

  109 * 发送 MQTT 消息

  110 * @param topic 主题

  111 * @param content 内容

  112 */

  113 fun sendMsg(topic: String, content: String) {

  114 val msg = MqttMessage()

  115 msg.payload = content.encodeToByteArray() // 设置消息内容

  116 msg.qos = 0 //设置消息发送质量,可为0,1,2.

  117 // 设置消息的topic,并发送。

  118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

  119 override fun onSuccess(asyncActionToken: IMqttToken) {

  120 VMLog.d("MQTT 消息发送成功")

  121 }

  122

  123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

  124 VMLog.d("MQTT 消息发送失败 ${exception.message}")

  125 }

  126 })

  127 }

  128

  129 /**

分析我国油气田智能技术体系及其建设现状

  130 * 通知 MQTT 事件

  131 */

  132 private fun notifyEvent(topic: String, data: String) {

  133 LDEventBus.post(topic, data)

  134 }

  135 }

  业务交互

  和业务相关的就是在启动APP后,使用后端服务器返回的鉴权token信息及连接封装接口登录环信通MQTT服务器,登录成功后订阅主题并监听消息。

  1// 请求 token 成功后,调用MQTTHelper.connect()链接 MQTT 服务器,这里会同时传递监听的主题

  2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)

  3

  4/**

  5 * 发送匹配信息

  6 */

  7private fun sendMatchInfo() {

  8 if (selfMatch.user.nickname.isEmpty()) return

  9 // 提交自己的匹配信息到服务器

  10 mViewModel.submitMatch(selfMatch)

  11 val json = JSONObject()

  12 json.put("content", selfMatch.content)

  13 json.put("emotion", selfMatch.emotion)

  14 json.put("gender", selfMatch.gender)

  15 json.put("type", selfMatch.type)

  16 val jsonUser = JSONObject()

  17 jsonUser.put("avatar", mUser.avatar)

  18 jsonUser.put("id", mUser.id)

  19 jsonUser.put("nickname", mUser.nickname)

  20 jsonUser.put("username", mUser.username)

  21 json.put("user", jsonUser)

2020华为云互联网5G创新峰会

  22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())

  23}

  24

  25// 监听消息这里使用了一个事件总线进行通知,在上边封装 MQTTHelper 发送消息也使用了这个,

  26// 订阅 MQTT 事件

  27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {

  28 val match = JsonUtils.fromJson(it, Match::class.java)

  29 // 这里收到匹配信息之后就增加一条弹幕

  30 addBarrage(match)

  31}

  后端服务实现

  接下来介绍后端服务实现,主要包含以下两部分:

  配置连接信息:配置环信MQTT消息云连接信息。

  获取鉴权信息:获取客户端连接需要的鉴权信息。

  配置连接信息

  配置部分只需要按照环信后台配置信息进行替换就好,配置在config目录下的config.xxx.json文件内

  1/**

  2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService

  3 */

  4config.mqtt = {

  5 host: 'mqtt host', // MQTT 链接地址

  6 appId: 'appId', // MQTT AppId

  7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)

  8 restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服务 API 地址

  9 clientId: 'client id', // 替换环信后台 clientId

  10 clientSecret: 'client secret', // 替换环信后台 clientSecret

  11};

  获取鉴权信息

  这里主要是获取客户端连接所需要的鉴权信息token,为了安全token肯定是要放在服务器端生成的,废话不多说,上代码:

超市门店运营

  1/**

  2 * Create by lzan13 on 2022/3/22

  3 * 描述:MQTT 帮助类

  4 */

  5object MQTTHelper {

  6

  7 private var mqttClient: MqttAndroidClient? = null

  8

  9 // 缓存主题集合

  10 private val topicList = mutableListOf()

  11

  12 /**

  13 * 链接MQTT

  14 * @param id 用户 Id

  15 * @param token 用户链接 MQTT 的 Token

  16 * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅

  17 */

  18 fun connect(id: String, token: String, topic: String = "") {

  19 // 处理订阅主题

  20 if (topic.isNotEmpty()) topicList.add(topic)

  21

  22 // 拼接链接地址

  23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

  24 // 拼接 clientId

  25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

  26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

  27

  28 //连接参数

  29 val options = MqttConnectOptions()

  30 options.isAutomaticReconnect = true //设置自动重连

  31 options.isCleanSession = true // 缓存

  32 options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒

  33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒

  34 options.userName = id // 用户名

  35 options.password = token.toCharArray() // 密码

  36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

  37 // 设置MQTT监听

  38 mqttClient?.setCallback(object : MqttCallback {

  39 override fun connectionLost(t: Throwable) {

  40 // 通知链接断开

  41 VMLog.d("MQTT 链接断开 $t")

  42 }

  43

  44 @Throws(Exception::class)

  45 override fun messageArrived(topic: String, message: MqttMessage) {

  46 // 通知收到消息

  47 VMLog.d("MQTT 收到消息:$message")

  48 // 如果未订阅则直接丢弃

  49 if (!topicList.contains(topic)) return

  50 notifyEvent(topic, String(message.payload))

  51 }

  52

  53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

  54 })

  55 //进行连接

  56 mqttClient?.connect(options, null, object : IMqttActionListener {

  57 override fun onSuccess(token: IMqttToken) {

  58 VMLog.d("MQTT 链接成功")

  59 // 链接成功,循环订阅缓存的主题

  60 topicList.forEach { subscribe(it) }

  61 }

  62

  63 override fun onFailure(token: IMqttToken, t: Throwable) {

  64 VMLog.d("MQTT 链接失败 $t")

  65 }

  66 })

  67 }

  68

  69 /**

  70 * 订阅主题

  71 * @param topic 主题

  72 */

  73 fun subscribe(topic: String) {

  74 if (!topicList.contains(topic)) {

  75 topicList.add(topic)

  76 }

  77 try {

  78 //连接成功后订阅主题

  79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

  80 override fun onSuccess(token: IMqttToken) {

  81 VMLog.d("MQTT 订阅成功 $topic")

  82 }

  83

  84 override fun onFailure(token: IMqttToken, t: Throwable) {

  85 VMLog.d("MQTT 订阅失败 $topic $t")

  86 }

  87 })

  88 } catch (e: MqttException) {

  89 e.printStackTrace()

  90 }

  91 }

  92

  93 /**

  94 * 取消订阅

  95 * @param topic 主题

  96 */

  97 fun unsubscribe(topic: String) {

  98 if (topicList.contains(topic)) {

  99 topicList.remove(topic)

  100 }

  101 try {

  102 mqttClient?.unsubscribe(topic)

  103 } catch (e: MqttException) {

  104 e.printStackTrace()

  105 }

  106 }

  107

  108 /**

  109 * 发送 MQTT 消息

  110 * @param topic 主题

  111 * @param content 内容

  112 */

  113 fun sendMsg(topic: String, content: String) {

  114 val msg = MqttMessage()

  115 msg.payload = content.encodeToByteArray() // 设置消息内容

  116 msg.qos = 0 //设置消息发送质量,可为0,1,2.

  117 // 设置消息的topic,并发送。

  118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

  119 override fun onSuccess(asyncActionToken: IMqttToken) {

  120 VMLog.d("MQTT 消息发送成功")

  121 }

  122

  123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

  124 VMLog.d("MQTT 消息发送失败 ${exception.message}")

  125 }

  126 })

  127 }

  128

  129 /**

  130 * 通知 MQTT 事件

  131 */

  132 private fun notifyEvent(topic: String, data: String) {

  133 LDEventBus.post(topic, data)

  134 }

  135}

  源码地址

  核心代码就这么多,不超过500行,这里没有直接调用环信历史消息接口获取消息存储记录,后续可以在进行改良,简化实现流程。源码链接附上,配合使用效果更佳。

  服务端github源码:

  https://github.com/lzan13/vmtemplateserver

  客户端github源码:

  https://gitee.com/lzan13/VMTemplateAndroid

  写在最后

  MQTT协议资源占用小,并发连接高,集成简单,特别适用于高频数据交互场景,比如:游戏的世界广场、视频平台弹幕等等等等,欢迎各位小伙伴集思广益,基于MQTT服务实现更多的业务场景,享受技术带来的便利与快乐。


返回网站首页

本文评论
Enhanced SWAP内存管理 OpenHarmony构建新的内存管理优化方案——ESWAP
  OpenHarmony是面向全场景泛终端设备的操作系统,终端设备内存性能的强弱会直接影响用户的体验。终端设备的内存差异很大,对于内存比较小的终端设备,内存优化方案无疑是增强...
日期:06-03
三星手机S2_三星Galaxy S22系列:以创新科技精准贴合Z世代年轻人需求
  随着消费场景的不断升级,对于追求特立独行、注重生活品质、乐于展示自我的Z世代年轻人来说,想要挑选出一款与自己需求和个性相匹配的智能手机并非易事。三星Galaxy S22系...
日期:07-14
和合期货坚持与客户同肩并立,实践共同价值
  金融服务机构的健康发展是金融体系稳定的重要保障。在国内大循环为主体、国内国际“双循环”相互促进的新发展格局下,金融作为服务实体经济的血液,正处于一个历史的跃迁期...
日期:07-27
年末礼物甄选 用三星Galaxy Watch5系列守护全家健康
  结束了一年的辛劳,年底的元旦如约而至。在这样一个非常特殊的日子,为自己和家人选购礼物成为约定俗成,而如何挑选礼物则值得探讨。尤其是随着近来疫情政策的变化,随时了解自...
日期:12-26
一站式多件换新至多补2040元 享服季京东手机服务节让你省心又舒心
  手机作为大多数消费者最常用的电子产品,其使用体验与商家提供的服务息息相关,甚至可以说,想要买得放心用得省心,完善的售前售中售后服务必不可少。为进一步提升消费者的使用...
日期:07-28
松下LUMIX S5挂机头?当然是LUMIX S 20-60mm_松下LUMIX DC-S5
  松下LUMIX S5已经发布有一年的时间了,但你会发现,在同价位中,好像S5依旧是综合能力最能打的那个。而跟随S5一起发布的松下20-60mm镜头也成为目前很多入手松下L卡口相机后会...
日期:07-28
harmonyos 2.0要不要更新_HarmonyOS 2版本更新!两个小技巧让你告别隐私泄露烦恼
  如今,手机作为我们的“贴身之物”,储存了太多个人隐私,从照片到聊天记录,从工作文件到身体健康信息。如何保护隐私安全,成为用户和厂商不容忽视的问题。华为一直把用户隐私安...
日期:04-26
再传喜讯,鸿雁中标杭州地铁3号线配电工程「杭州地铁三号线中标单位」
  备受关注的杭州地铁3号线一期车站(含区间)设备安装及装修工程3标段、8标段项目于近日落下帷幕。鸿雁配电力克强敌,以优质的产品质量和专业化的服务成功中标,顺利获得配电...
日期:07-26
小米企业的品牌定位_新logo、新定位,小米集团旗下通信服务品牌全面升级
  1月17日,小米集团旗下通信服务品牌:小米移动,官宣品牌升级。希望在新的发展阶段,以全新的品牌形象、定位和更优质的服务连接广大通信用户。持续以“用户中心”思维打造...
日期:04-05
Shure耳机_Shure 携手反光镜乐队推出全新真无线耳机AONIC FREE
  “求真我,一起红”,音乐才是我们的解药华为显示器mateview gt尺寸  “只有音乐才是我的解药”,伴随这句反光镜乐队的经典歌词,Shure全新真无线隔音耳机AONIC FREE真我登场...
日期:07-26
文武兼备的高性能台式电脑! 全新华硕天选X京东火爆热销中「华硕天选桌」
  随着双十二的到来,年底最后一轮商家大促也轰轰烈烈地展开,对于有电脑购买需求的人群来说,此刻是最好的入手时机。如何在琳琅满目的推荐中找到合适自己的台式电脑呢?如果你...
日期:07-25
内外全能 体验三星Galaxy Z Flip3 5G双屏提供的乐趣_三星Galaxy Z Flip3 5G折叠屏手机
  屏幕作为智能手机中最为重要的组成部分,是一款产品最能影响用户使用体验的地方。今年三星发布的折叠屏手机Galaxy Z Flip3 5G,内外拥有两块素质极高的屏幕,加上这款产品潮...
日期:07-25
云鲸发布最新一代产品J3小鲸灵,引领扫地机品类智能化新风向「小云鲸扫地机器人」
  8月31日,云鲸发布了最新一代产品的宣传片,正式推出第三代扫拖一体机器人——J3小鲸灵。据介绍,云鲸新品J3小鲸灵最大的创新点在于首创“DirtSense污水识别系统”,可识别...
日期:09-02
天下谁人不识“金” — SONY NW-WM1ZM2 索尼金砖二代测评_索尼wm1z金砖搭配什么耳机
  这世界上有两种人, 一种是SONY用户另一种是非SONY用户。电视、相机、PlayStation、手机、耳机、播放器,可以说想找到一个使用SONY旗下产品的人,比较简单。  这世界上有...
日期:09-09
易鑫:汽车金融风控进入下一站_易鑫金融车贷人工客服
  2022年上半年,在中国乘用车(包括新车及二手车)总销量同比下跌1.5%的背景下,国内专业的汽车金融交易平台易鑫集团(02858.HK)汽车融资交易量同比上升17%,收入同比增长73%。逆...
日期:10-12
galaxy watch 4 lte_使用场景更多元 三星Galaxy Watch5 LTE版让你时刻“在线”
  随着智能手表成为大众潮流,我们在各个生活场景中都能见到它的影子。而在户外跑步或短程出行时,我们常需要轻装上阵,这时就需要一款可在类似情况下代替智能手机的随身智能设...
日期:10-12
猿辅导教育经:孩子10岁前,千万别偷这3个懒_孩子学猿辅导有用吗
  养育孩子的成功与否,如何判断?猿辅导认为,孩子长大成人后,能够自食其力、品性出色,身为父母就已经成功了。  有人说:“养孩子的有效期是10年。”无数父母认同,真的是这样,错...
日期:10-21
老乡鸡的产品理念_老乡鸡认准一个品类深耕到底,用20年时间打磨爆品
  老乡鸡凭借快餐的高频消费+互联网深度黏性,构建了一个全新的品牌矩阵,线下场景与线上人设的互相配合,成为一个值得信赖又有趣好玩的生活方式品牌。小鹏汽车美国上市最新进...
日期:02-02
2022年青少年社媒使用报告:TikTok和YouTube正在“占领”年轻人
  社交媒体的格局始终在变化,而处于这一领域前沿的青少年用户,更是其中尤其“善变”的人群,可能这就是所谓的没有人永远年轻,但永远有人年轻。  8月10日,皮尤研究中心发...
日期:09-09
对话张新发:315为什么下调主力产品价格
  近日,张新发的调价政策引起了社会各界的议论,打爆了张新发的热线电话。  张新发为什么要下调价格?针对社会各界的质疑,张新发公司常务副总经理张俊做出回应:“张新发早在2...
日期:07-29