MqttReactiveObjectMapper: Abstracción reactiva y orientada a objetos para MQTT en C#

Introducción

En el desarrollo de aplicaciones IoT, la integración con brokers MQTT suele implicar una gestión detallada y propensa a errores manuales de tópicos, serialización y lógica de suscripción/publicación. Para aliviar esta complejidad, presentamos MqttReactiveObjectMapper, un framework en C# que permite definir tópicos fuertemente tipados con capacidades reactivas y configuración basada en atributos.

Este framework abstrae y automatiza gran parte del manejo de MQTT, convirtiendo los tópicos en objetos observables sobre los que se puede suscribir, filtrar o publicar, de manera limpia y coherente.

Ver en Github


Objetivos del Framework

  • Simplificar el acceso a MQTT con objetos C# tipados.
  • Habilitar la programación reactiva sobre tópicos.
  • Centralizar la configuración en atributos.
  • Automatizar la inicialización de tópicos desde contextos.
  • Reducir código repetitivo y errores comunes (casting, serialización, plantillas de tópicos, etc).

Comparativa: Sin MqttReactiveObjectMapper

Enfoque tradicional (MQTTnet puro):

var mqttFactory = new MqttFactory();
var mqttClient = mqttFactory.CreateMqttClient();

var options = new MqttClientOptionsBuilder()
    .WithClientId("sensor-client")
    .WithTcpServer("localhost", 1883)
    .Build();

await mqttClient.ConnectAsync(options);

mqttClient.ApplicationMessageReceivedAsync += e =>
{
    var json = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
    var message = JsonSerializer.Deserialize<SensorMessage>(json);
    Console.WriteLine($"[{message.DeviceId}] Temp: {message.Value}");
};

await mqttClient.SubscribeAsync("sensor/sensor-01/temperature");

var msg = new SensorMessage { DeviceId = "sensor-01", Value = 22.5 };
var payload = JsonSerializer.Serialize(msg);

await mqttClient.PublishAsync("sensor/sensor-01/temperature", Encoding.UTF8.GetBytes(payload));

Problemas:

  • Mucho código repetitivo.
  • Serialización/deserialización manual.
  • Suscripciones ligadas a strings.
  • Mayor riesgo de errores.
  • No es reactivo (Rx).

Con MqttReactiveObjectMapper

Declaración del modelo y contexto:

public class SensorMessage
{
    public string DeviceId { get; set; }
    public double Value { get; set; }
}

public class SensorContext : MqttBaseContext
{
    [Topic("sensor/@/temperature")] // @ se reemplaza con el nombre de la propiedad
    public TopicSet<SensorMessage> Temperature { get; set; }

    [Topic("sensor/device-001/temperature")] // Tipos primitivos también funcionan
    public TopicSet<double> Device001Temp { get; set; }
}

Uso reactivo y publicación:

var context = new SensorContext("localhost");

// Subscribirse al stream de objetos SensorMessage
context.Temperature.Subscribe(msg =>
{
    Console.WriteLine($"[{msg.DeviceId}] Temp: {msg.Value}");
});

// Publicar un mensaje
context.Temperature.Publish(new SensorMessage
{
    DeviceId = "sensor-01",
    Value = 22.7
});

Beneficios:

✅ Código más limpio y expresivo
✅ Tópicos y tipos unificados en un solo lugar
✅ Rx.NET nativo: Where, Select, Subscribe, etc.
✅ Serialización/deserialización automática
✅ Ideal para entornos con múltiples tópicos parametrizados


Casos de uso avanzados

Filtrado reactivo por valor:

context.Temperature
    .Where(msg => msg.Value > 30)
    .Subscribe(msg => Console.WriteLine($"⚠️ ALERTA: Alta temperatura en {msg.DeviceId} ({msg.Value} °C)"));

Uso con tipos primitivos:

context.Device001Temp.Subscribe(value =>
{
    Console.WriteLine($"[device-001] Nueva temperatura: {value} °C");
});

context.Device001Temp.Publish(19.6);

Arquitectura Interna

  • MqttBaseContext: clase base para definir un contexto de tópicos.
  • TopicSet<T>: clase genérica que representa un tópico tipado.
  • TopicAttribute: atributo que define la plantilla del tópico.
  • MqttBus: capa que encapsula conexión, publicación y observación.
  • MqttSerializer: serializador basado en System.Text.Json.

Instalación y requisitos

  • .NET 6 o superior.
  • Paquete MQTTnet.
  • Rx.NET (System.Reactive).

Próximamente el paquete estará disponible en NuGet con el nombre MqttReactiveObjectMapper.


Conclusión

MqttReactiveObjectMapper ofrece una solución robusta y moderna para trabajar con MQTT en aplicaciones .NET, permitiendo a los desarrolladores centrarse en la lógica de negocio en lugar de la infraestructura de mensajería. Su enfoque declarativo y tipado lo convierte en una herramienta ideal para aplicaciones IoT, domótica, sensorización industrial y más.


¿Quieres que este artículo esté en formato Markdown o HTML para documentación o publicación en GitHub Pages o tu blog? También puedo generar un README listo para usar.