تجزیه و تحلیل داده های اینترنت اشیا با استفاده از کافکا، جریان های کافکا و Thingsconnect

موتور قوانین ThingsBoard قادر است تحلیل ساده‌ای را بر روی داده‌های تلمتری ورودی انجام دهد، به عنوان مثال، عبور از آستانه. ایده‌ای که پشت موتور قوانین وجود دارد، ارائه قابلیت برای مسیریابی داده‌ها از دستگاه‌های IoT به افزونه‌های مختلف است، بر اساس ویژگی‌های دستگاه یا خود داده.

با این حال، بیشتر موارد استفاده در زندگی واقعی نیاز به پشتیبانی از تجزیه و تحلیل پیشرفته دارند: یادگیری ماشین، تجزیه و تحلیل پیش‌بینی و غیره.

این آموزش نشان خواهد داد که چگونه می‌توانید:

  • داده‌های دستگاه تلمتری را از ThingsBoard به موضوع Kafka با استفاده از قابلیت‌های داخلی موتور قوانین (کاربردی برای ThingsBoard CE و PE) هدایت کنید.
  •  داده‌ها را از چندین دستگاه با استفاده از یک برنامه ساده Kafka Streams تجمیع کنید.
  •  نتایج تجزیه و تحلیل را به ThingsBoard برای ثبت و تجسم با استفاده از ادغام Kafka ThingsBoard PE پیش بینی کنید.

البته، تجزیه و تحلیل در این آموزش به طور معمول بسیار ساده است، اما هدف ما برجسته کردن مراحل ادغام است.

////img

فرض کنید ما یک تعداد زیادی پنل خورشیدی داریم که شامل تعدادی ماژول خورشیدی است. ThingsBoard برای جمع‌آوری، ذخیره و تجسم داده‌های تلمتری ناهنجار از این ماژول‌های خورشیدی در هر پنل استفاده می‌شود.

ما این ناهنجار را با مقایسه مقدار تولیدشده توسط یک ماژول خورشیدی با مقدار میانگین تولیدشده توسط تمام ماژول‌های همان پنل و انحراف معیار همان مقدار محاسبه کرده‌ایم.

////img

ما قصد داریم از کاربرد Kafka Streams با پنجره‌ای به طول 30 ثانیه (قابل پیکربندی) برای تحلیل داده‌های بلادرنگ از دستگاه‌های مختلف استفاده کنیم.

برای ذخیره و تجسم نتایج تجزیه و تحلیل، قصد داریم سه دستگاه ماژول خورشیدی مجازی را برای هر پنل خورشیدی ایجاد کنیم.

پیش نیازها

سرویس‌های زیر باید در حال اجرا باشند:

  •  نسخه ThingsBoard PE v2.4.2+
  •  سرور Kafka

مرحله 1. پیکربندی زنجیره قوانین

در این مرحله، سه گره تولید کننده را پیکربندی خواهیم کرد که در حین توسعه، داده‌های شبیه‌سازی شده را برای آزمایش تولید خواهند کرد. به طور معمول، شما در محیط تولید نیازی به آن‌ها ندارید، اما برای اشکال‌زدایی بسیار مفید هستند. ما برای 3 ماژول و یک پنل، داده‌ها را تولید خواهیم کرد. دو ماژول از همان مقدار تولیدی استفاده می‌کنند و یک ماژول مقدار کمتری تولید می‌کند. البته، شما باید این قسمت را با داده‌های واقعی تولید شده توسط دستگاه‌های واقعی جایگزین کنید. این فقط یک مثال است.

بیایید سه دستگاه با نوع “ماژول خورشیدی” ایجاد کنیم. اگر از ThingsBoard PE استفاده می‌کنید، آن‌ها را در گروه جدید “ماژول‌های خورشیدی” قرار دهید.

////img

حالا، بیایید سه شبیه‌ساز دستگاه ایجاد کنیم تا داده‌ها را مستقیماً به کارخانه Kafka محلی‌مان ارسال کنیم. داده‌های شبیه‌سازی شده به گره قوانین Kafka ارسال می‌شود که مسئول ارسال داده به موضوع Kafka است. ابتدا گره قوانین Kafka را پیکربندی کنیم. ما از سرور Kafka محلی (localhost:9092) و موضوع “solar-module-raw” استفاده خواهیم کرد.

////img

حالا، بیایید گره “تولیدکننده” را برای اولین ماژول اضافه کنیم. ما گره تولیدکننده را به طور مداوم برای “تولید” 5 وات پیکربندی خواهیم کرد.

////img

اکنون، گره “تولیدکننده” را برای ماژول دوم اضافه کنیم. ما گره تولیدکننده را برای “تولید” مداوم 5 وات نیز پیکربندی خواهیم کرد.

سپس، گره “تولیدکننده” را برای ماژول سوم اضافه کنیم. ما گره تولیدکننده را برای “تولید” مداوم 3.5 وات که تخریب ماژول را شبیه‌سازی می‌کند، پیکربندی خواهیم کرد.

////img

نمونه زنجیره قوانین نتیجه باید شبیه به این شکل باشد:

////img

شما همچنین می‌توانید فایل JSON زنجیره قوانین را دانلود کرده و آن را به پروژه خود وارد کنید.

با وارد کردن زنجیره قوانین، باید خروجی اشکال زدایی گره Kafka را بررسی کنید. اگر کافکا شما در localhost در حال اجرا است، باید پیام‌های اشکال زدایی مشابهی را مشاهده کنید. توجه کنید که در ورودی روزنامه اشکال هیچ خطا وجود ندارد.

////img

مرحله ۲. راه‌اندازی برنامه Kafka Streams.

در این مرحله، ما یک برنامه نمونه را برای تجزیه و تحلیل داده‌های خام از “solar-module-raw” دانلود و راه‌اندازی می‌کنیم. برنامه نمونه مجموع مقدار انرژی تولید شده توسط هر ماژول در پنل را در یک بازه زمانی (قابل تنظیم) محاسبه می‌کند. سپس برنامه میانگین توان تولیدی هر ماژول برای هر پنل و انحراف آن در همان بازه زمانی را محاسبه می‌کند. پس از انجام این کار، برنامه مقادیر هر ماژول را با مقدار میانگین مقایسه می‌کند و اگر تفاوت بزرگتر از انحراف باشد، آن را به عنوان ناهنجاری در نظر می‌گیرد.

نتایج محاسبات ناهنجاری به “anomalies-topic” ارسال می‌شوند. ThingsBoard با استفاده از اتصال Kafka، به این موضوع مشترک می‌شود و هشدارها را تولید کرده و ناهنجاری‌ها را در پایگاه داده ذخیره می‌کند.

دانلود برنامه نمونه

با آزادی کد را از مخزن ThingsBoard دریافت کرده و پروژه را با استفاده از Maven بسازید.

mvn clean install

از IDE مورد علاقه خود، این پروژه Maven را اضافه کنید.

بازبینی وابستگی‌ها

اصلیترین وابستگی‌هایی که در پروژه استفاده می‌شوند:

<dependencies>
...
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>${kafka.version}</version>
    </dependency>
...
</dependencies>

بازبینی کد منبع

منطق برنامه Kafka Streams اصلی در کلاس SolarConsumer تمرکز شده است.

private static Properties getProperties() {
    final Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return props;
}
private static final String IN_TOPIC = "solar-module-raw";

private static final TopicNameExtractor<String, SolarModuleAggregatorJoiner> OUT_TOPIC =
        new StaticTopicNameExtractor<>("solar-module-anomalies");

// Time for windowing
private static final Duration DURATION = Duration.ofSeconds(30);

private static final TimeWindows TIME_WINDOWS = TimeWindows.of(DURATION);

private static final JoinWindows JOIN_WINDOWS = JoinWindows.of(DURATION);

private static final StreamsBuilder builder = new StreamsBuilder();


// serde - Serializer/Deserializer
// for custom classes should be custom Serializer/Deserializer
private static final Serde<SolarModuleData> SOLAR_MODULE_DATA_SERDE =
    Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleData.class));

private static final Serde<SolarModuleAggregator> SOLAR_MODULE_AGGREGATOR_SERDE =
    Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleAggregator.class));

private static final Serde<SolarPanelAggregator> SOLAR_PANEL_AGGREGATOR_SERDE =
    Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarPanelAggregator.class));

private static final Serde<SolarModuleKey> SOLAR_MODULE_KEY_SERDE =
    Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleKey.class));

private static final Serde<SolarPanelAggregatorJoiner> SOLAR_PANEL_AGGREGATOR_JOINER_SERDE =
    Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarPanelAggregatorJoiner.class));

private static final Serde<SolarModuleAggregatorJoiner> SOLAR_MODULE_AGGREGATOR_JOINER_SERDE =
    Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleAggregatorJoiner.class));

private static final Serde<String> STRING_SERDE = Serdes.String();

private static final Serde<Windowed<String>> WINDOWED_STRING_SERDE = Serdes.serdeFrom(
    new TimeWindowedSerializer<>(STRING_SERDE.serializer()),
    new TimeWindowedDeserializer<>(STRING_SERDE.deserializer(), TIME_WINDOWS.size()));

// 1 - sigma
private static final double Z = 1;
// source stream from kafka
final KStream<SolarModuleKey, SolarModuleData> source =
    builder
        .stream(IN_TOPIC, Consumed.with(STRING_SERDE, SOLAR_MODULE_DATA_SERDE))
        .map((k, v) -> KeyValue.pair(new SolarModuleKey(v.getPanel(), v.getName()), v));


// calculating sum power and average power for modules
final KStream<Windowed<SolarModuleKey>, SolarModuleAggregator> aggPowerPerSolarModuleStream =
     source
        .groupByKey(Grouped.with(SOLAR_MODULE_KEY_SERDE, SOLAR_MODULE_DATA_SERDE))
        .windowedBy(TIME_WINDOWS)
        .aggregate(SolarModuleAggregator::new,
            (modelKey, value, aggregation) -> aggregation.updateFrom(value),
            Materialized.with(SOLAR_MODULE_KEY_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE))
        .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded()))
        .toStream();


// calculating sum power and average power for panels
final KStream<Windowed<String>, SolarPanelAggregator> aggPowerPerSolarPanelStream =
    aggPowerPerSolarModuleStream
        .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v))
        .groupByKey(Grouped.with(WINDOWED_STRING_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE))
        .aggregate(SolarPanelAggregator::new,
            (panelKey, value, aggregation) -> aggregation.updateFrom(value),
            Materialized.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE))
        .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded()))
        .toStream();


 // if used for join more than once, the exception "TopologyException: Invalid topology:" will be thrown
final KStream<Windowed<String>, SolarModuleAggregator> aggPowerPerSolarModuleForJoinStream =
    aggPowerPerSolarModuleStream
        .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v));


// joining aggregated panels with aggregated modules
// need for calculating sumSquare and deviance
final KStream<Windowed<String>, SolarPanelAggregatorJoiner> joinedAggPanelWithAggModule =
    aggPowerPerSolarPanelStream
        .join(
            aggPowerPerSolarModuleForJoinStream,
            SolarPanelAggregatorJoiner::new, JOIN_WINDOWS,
            Joined.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE));


//calculating sumSquare and deviance
final KStream<Windowed<String>, SolarPanelAggregator> aggPowerPerSolarPanelFinalStream =
    joinedAggPanelWithAggModule
        .groupByKey(Grouped.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_JOINER_SERDE))
        .aggregate(SolarPanelAggregator::new,
            (key, value, aggregation) -> aggregation.updateFrom(value),
            Materialized.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE))
        .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded()))
        .toStream();


// joining aggregated modules with aggregated panels in which calculated sumSquare and deviance
// need for check modules with anomaly power value
final KStream<Windowed<String>, SolarModuleAggregatorJoiner> joinedAggModuleWithAggPanel =
    aggPowerPerSolarModuleStream
        .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v))
        .join(
            aggPowerPerSolarPanelFinalStream,
            SolarModuleAggregatorJoiner::new, JOIN_WINDOWS,
            Joined.with(WINDOWED_STRING_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE));


// streaming result data (modules with anomaly power value)
joinedAggModuleWithAggPanel
    .filter((k, v) -> isAnomalyModule(v))
    .map((k, v) -> KeyValue.pair(k.key(), v))
    .to(OUT_TOPIC, Produced.valueSerde(SOLAR_MODULE_AGGREGATOR_JOINER_SERDE));


// starting streams
final KafkaStreams streams = new KafkaStreams(builder.build(), getProperties());
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

محاسبه داده‌های ناهنجار

////img

private static boolean isAnomalyModule(SolarModuleAggregatorJoiner module) {
    double currentZ = Math.abs(module.getSumPower() - module.getSolarPanelAggregator().getAvgPower()) / module.getSolarPanelAggregator().getDeviance();
    return currentZ > Z;
}

نمونه خروجی برنامه

...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 1]: 30.0:6
...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 2]: 30.0:6
...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 3]: 21.0:6
...SolarConsumer - PerSolarPanel: [1572447690|Panel 1]: 81.0:3
...SolarConsumer - PerSolarPanelFinal: [1572447660|Panel 1]: power:81.0 count:3 squareSum:54.0 variance:18.0 deviance:4.2
...SolarConsumer - ANOMALY module: [1572447660|Panel 1|Module 3]: sumPower:21.0 panelAvg:27.0 deviance:4.2

مرحله 3. پیکربندی ادغام Kafka.

بیایید ThingsBoard را طوری پیکربندی کنیم که به موضوع “solar-module-anomalies” مشترک شود و هشدارها را ایجاد کند. ما از ادغام Kafka که از ThingsBoard نسخه 2.4.2 به بعد در دسترس است استفاده خواهیم کرد.

پیکربندی تبدیل کننده Uplink

قبل از راه اندازی ادغام Kafka ، باید تبدیل کننده داده Uplink را ایجاد کنید. تبدیل کننده داده Uplink مسئول تجزیه داده‌های ناهنجار ورودی است.

نمونه‌ای از پیام ورودی که توسط برنامه Kafka Streams ما تولید می‌شود:

{
    "moduleName": "Module 3",
    "panelName": "Panel 1",
    "count": 6,
    "sumPower": 21.0,
    "avgPower": 3.5,
    "solarPanelAggregator": {
        "panelName": "Panel 1",
        "count": 3,
        "sumPower": 81.0,
        "avgPower": 27.0,
        "squaresSum": 54.0,
        "variance": 18.0,
        "deviance": 4.2
    }
}

لطفاً اسکریپت مورد نظر را در بخش تابع Decoder مشاهده کنید.

// Decode an uplink message from a buffer
// payload - array of bytes
// metadata - key/value object

/** Decoder **/

// decode payload to string
var msg = decodeToJson(payload);

// decode payload to JSON
// var data = decodeToJson(payload);

var deviceName = msg.moduleName;
var deviceType = 'module';

// Result object with device attributes/telemetry data
var result = {
   deviceName: deviceName,
   deviceType: deviceType,
   attributes: {
       panel: msg.panelName
   },
   telemetry: {
       avgPower: msg.avgPower,
       sumPower: msg.sumPower,
       avgPowerFromPanel: msg.solarPanelAggregator.avgPower,
       deviance: msg.solarPanelAggregator.deviance
   }
};

/** Helper functions **/

function decodeToString(payload) {
   return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
   // covert payload to string.
   var str = decodeToString(payload);

   // parse string to JSON
   var data = JSON.parse(str);
   return data;
}

return result;

هدف تابع Decoder، تجزیه داده و متادیتای ورودی به یک فرمت قابل استفاده توسط ThingsBoard است. deviceName و deviceType لازم است، در حالی که attributes و telemetry اختیاری هستند. attributes و telemetry اشیاء کلید-مقدار تک بعدی هستند و اشیاء تو در تو پشتیبانی نمی‌شوند.

////img

پیکربندی ادغام Kafka

بیایید ادغام Kafka را ایجاد کنیم که به موضوع “solar-module-anomalies” مشترک شود.

////img

مرحله 4. پیکربندی موتور قوانین برای ایجاد هشدارها.

در این مرحله، بر اساس پرچم بولین “anomaly” در تلمتری ورودی، از راهنمای “ایجاد و پاکسازی هشدارها” برای ایجاد هشدار استفاده کنید و از راهنمای “ارسال ایمیل در هشدار” برای ارسال اطلاعیه‌های ایمیل استفاده کنید. برای یادگیری بیشتر، راهنماهای دیگر را بررسی کنید.

مرحله 5. حذف پیام‌های روزانه اشکال‌زدایی

اگرچه حالت اشکال‌زدایی بسیار مفید برای توسعه و رفع مشکلات است، اما باقی گذاشتن آن در حالت تولید ممکن است باعث افزایش قابل توجهی در فضای دیسک مورد استفاده توسط پایگاه داده شود زیرا تمام داده‌های اشکال‌زدایی در آنجا ذخیره می‌شوند. توصیه می‌شود که پس از انجام اشکال‌زدایی حالت اشکال‌زدایی را خاموش کنید.

عناوین هر بخش