正文
一、RabbitMQHelper
/// <summary>
/// RabbitMQHelper 的摘要说明
/// </summary>
public class RabbitMQHelper
{
//主机
private readonly static string host = ConfigurationManager.AppSettings["MqHost"].ToString().Trim();
//端口
private readonly static string port = ConfigurationManager.AppSettings["MqPort"].ToString().Trim();
//用户
private readonly static string userName = ConfigurationManager.AppSettings["MqUser"].ToString().Trim();
//密码
private readonly static string passWord = ConfigurationManager.AppSettings["MqPwd"].ToString().Trim();
//交换机
private readonly static string exchangeName = ConfigurationManager.AppSettings["MqExchange"].ToString().Trim();
//队列
//private readonly static string queueName = ConfigurationManager.AppSettings["MqQueue"].ToString().Trim();
//队列
private readonly static string queueName1 = "Can_Gather";
private readonly static string queueName2 = "Can_Tire";
private readonly static string queueName3 = "Can_Fault";
private readonly static string queueName4 = "Can_Alarm";
private readonly static string queueName5 = "Can_Behavior";
private readonly static string queueName6 = "Can_Violate";
/// <summary>
/// 获取RabbitMQ连接
/// </summary>
/// <returns></returns>
public static IConnection GetConnection()
{
//实例化链接工厂
var factory = new ConnectionFactory
{
HostName = host, //ip
Port = 5672, // 端口
UserName = userName, // 账户
Password = passWord, // 密码
VirtualHost = "/" , // 虚拟主机
AutomaticRecoveryEnabled = true,//断开默认重连
TopologyRecoveryEnabled = true
};
return factory.CreateConnection();
}
/// <summary>
/// 建立链接
/// </summary>
/// <returns></returns>
public static IModel CreateConnection()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
// 声明Direct交换机
channel.ExchangeDeclare(exchangeName, "direct");
// 声明创建队列
channel.QueueDeclare(queueName1, false, false, false, null);
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);
channel.QueueDeclare(queueName4, false, false, false, null);
channel.QueueDeclare(queueName5, false, false, false, null);
channel.QueueDeclare(queueName6, false, false, false, null);
// 绑定到交互机 指定routingKey
channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "gather");//完成
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "tire");
channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "fault");//完成
channel.QueueBind(queue: queueName4, exchange: exchangeName, routingKey: "alarm");//完成
channel.QueueBind(queue: queueName5, exchange: exchangeName, routingKey: "behavior");//完成
channel.QueueBind(queue: queueName6, exchange: exchangeName, routingKey: "violate");
return channel;
}
}
二、MqModel
/// <summary>
/// MqModel 的摘要说明
/// </summary>
public class MqModel
{
/// <summary>
/// 消息类型,1:gather,2:behavior,3:alarm,4:fault,5:tire, 6:violate
/// </summary>
public int type { get; set; }
/// <summary>
/// 消息实体
/// </summary>
public object content { get; set; }
}
三、PublishMsg
/// <summary>
/// 根据routingKey发布消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="type"></param>
/// <param name="routingKey"></param>
/// <param name="et"></param>
private void PublishMsg<T>(int type,string routingKey, T et)
{
#region 发布MQ消息
//按照消费端格式定义
MqModel publishModel = new MqModel()
{
type = 1,
content = et
};
//转换
string jsonStr = JsonConvert.SerializeObject(publishModel);
var body = Encoding.UTF8.GetBytes(jsonStr);//消息以二进制形式传输
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, null, body);
#endregion
}