تجزیه و تحلیل داده های اینترنت اشیا با استفاده از کافکا، جریان های کافکا و Thingsconnect
موتور قوانین ThingsBoard قادر است تحلیل سادهای را بر روی دادههای تلمتری ورودی انجام دهد، به عنوان مثال، عبور از آستانه. ایدهای که پشت موتور قوانین وجود دارد، ارائه قابلیت برای مسیریابی دادهها از دستگاههای IoT به افزونههای مختلف است، بر اساس ویژگیهای دستگاه یا خود داده.
با این حال، بیشتر موارد استفاده در زندگی واقعی نیاز به پشتیبانی از تجزیه و تحلیل پیشرفته دارند: یادگیری ماشین، تجزیه و تحلیل پیشبینی و غیره.
این آموزش نشان خواهد داد که چگونه میتوانید:
- دادههای دستگاه تلمتری را از ThingsBoard به موضوع Kafka با استفاده از قابلیتهای داخلی موتور قوانین (کاربردی برای ThingsBoard CE و PE) هدایت کنید.
- دادهها را از چندین دستگاه با استفاده از یک برنامه ساده Kafka Streams تجمیع کنید.
- نتایج تجزیه و تحلیل را به ThingsBoard برای ثبت و تجسم با استفاده از ادغام Kafka ThingsBoard PE پیش بینی کنید.
البته، تجزیه و تحلیل در این آموزش به طور معمول بسیار ساده است، اما هدف ما برجسته کردن مراحل ادغام است.
////img
فرض کنید ما یک تعداد زیادی پنل خورشیدی داریم که شامل تعدادی ماژول خورشیدی است. ThingsBoard برای جمعآوری، ذخیره و تجسم دادههای تلمتری ناهنجار از این ماژولهای خورشیدی در هر پنل استفاده میشود.
ما این ناهنجار را با مقایسه مقدار تولیدشده توسط یک ماژول خورشیدی با مقدار میانگین تولیدشده توسط تمام ماژولهای همان پنل و انحراف معیار همان مقدار محاسبه کردهایم.
////img
ما قصد داریم از کاربرد Kafka Streams با پنجرهای به طول 30 ثانیه (قابل پیکربندی) برای تحلیل دادههای بلادرنگ از دستگاههای مختلف استفاده کنیم.
برای ذخیره و تجسم نتایج تجزیه و تحلیل، قصد داریم سه دستگاه ماژول خورشیدی مجازی را برای هر پنل خورشیدی ایجاد کنیم.
پیش نیازها
سرویسهای زیر باید در حال اجرا باشند:
- نسخه ThingsBoard PE v2.4.2+
- سرور Kafka
مرحله 1. پیکربندی زنجیره قوانین
در این مرحله، سه گره تولید کننده را پیکربندی خواهیم کرد که در حین توسعه، دادههای شبیهسازی شده را برای آزمایش تولید خواهند کرد. به طور معمول، شما در محیط تولید نیازی به آنها ندارید، اما برای اشکالزدایی بسیار مفید هستند. ما برای 3 ماژول و یک پنل، دادهها را تولید خواهیم کرد. دو ماژول از همان مقدار تولیدی استفاده میکنند و یک ماژول مقدار کمتری تولید میکند. البته، شما باید این قسمت را با دادههای واقعی تولید شده توسط دستگاههای واقعی جایگزین کنید. این فقط یک مثال است.
بیایید سه دستگاه با نوع “ماژول خورشیدی” ایجاد کنیم. اگر از ThingsBoard PE استفاده میکنید، آنها را در گروه جدید “ماژولهای خورشیدی” قرار دهید.
////img
حالا، بیایید سه شبیهساز دستگاه ایجاد کنیم تا دادهها را مستقیماً به کارخانه Kafka محلیمان ارسال کنیم. دادههای شبیهسازی شده به گره قوانین Kafka ارسال میشود که مسئول ارسال داده به موضوع Kafka است. ابتدا گره قوانین Kafka را پیکربندی کنیم. ما از سرور Kafka محلی (localhost:9092) و موضوع “solar-module-raw” استفاده خواهیم کرد.
////img
حالا، بیایید گره “تولیدکننده” را برای اولین ماژول اضافه کنیم. ما گره تولیدکننده را به طور مداوم برای “تولید” 5 وات پیکربندی خواهیم کرد.
////img
اکنون، گره “تولیدکننده” را برای ماژول دوم اضافه کنیم. ما گره تولیدکننده را برای “تولید” مداوم 5 وات نیز پیکربندی خواهیم کرد.
سپس، گره “تولیدکننده” را برای ماژول سوم اضافه کنیم. ما گره تولیدکننده را برای “تولید” مداوم 3.5 وات که تخریب ماژول را شبیهسازی میکند، پیکربندی خواهیم کرد.
////img
نمونه زنجیره قوانین نتیجه باید شبیه به این شکل باشد:
////img
شما همچنین میتوانید فایل JSON زنجیره قوانین را دانلود کرده و آن را به پروژه خود وارد کنید.
با وارد کردن زنجیره قوانین، باید خروجی اشکال زدایی گره Kafka را بررسی کنید. اگر کافکا شما در localhost در حال اجرا است، باید پیامهای اشکال زدایی مشابهی را مشاهده کنید. توجه کنید که در ورودی روزنامه اشکال هیچ خطا وجود ندارد.
////img
مرحله ۲. راهاندازی برنامه Kafka Streams.
در این مرحله، ما یک برنامه نمونه را برای تجزیه و تحلیل دادههای خام از “solar-module-raw” دانلود و راهاندازی میکنیم. برنامه نمونه مجموع مقدار انرژی تولید شده توسط هر ماژول در پنل را در یک بازه زمانی (قابل تنظیم) محاسبه میکند. سپس برنامه میانگین توان تولیدی هر ماژول برای هر پنل و انحراف آن در همان بازه زمانی را محاسبه میکند. پس از انجام این کار، برنامه مقادیر هر ماژول را با مقدار میانگین مقایسه میکند و اگر تفاوت بزرگتر از انحراف باشد، آن را به عنوان ناهنجاری در نظر میگیرد.
نتایج محاسبات ناهنجاری به “anomalies-topic” ارسال میشوند. ThingsBoard با استفاده از اتصال Kafka، به این موضوع مشترک میشود و هشدارها را تولید کرده و ناهنجاریها را در پایگاه داده ذخیره میکند.
دانلود برنامه نمونه
با آزادی کد را از مخزن ThingsBoard دریافت کرده و پروژه را با استفاده از Maven بسازید.
mvn clean install
از IDE مورد علاقه خود، این پروژه Maven را اضافه کنید.
بازبینی وابستگیها
اصلیترین وابستگیهایی که در پروژه استفاده میشوند:
<dependencies> ... <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.version}</version> </dependency> ... </dependencies>
بازبینی کد منبع
منطق برنامه Kafka Streams اصلی در کلاس SolarConsumer تمرکز شده است.
private static Properties getProperties() { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return props; }
private static final String IN_TOPIC = "solar-module-raw"; private static final TopicNameExtractor<String, SolarModuleAggregatorJoiner> OUT_TOPIC = new StaticTopicNameExtractor<>("solar-module-anomalies"); // Time for windowing private static final Duration DURATION = Duration.ofSeconds(30); private static final TimeWindows TIME_WINDOWS = TimeWindows.of(DURATION); private static final JoinWindows JOIN_WINDOWS = JoinWindows.of(DURATION); private static final StreamsBuilder builder = new StreamsBuilder(); // serde - Serializer/Deserializer // for custom classes should be custom Serializer/Deserializer private static final Serde<SolarModuleData> SOLAR_MODULE_DATA_SERDE = Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleData.class)); private static final Serde<SolarModuleAggregator> SOLAR_MODULE_AGGREGATOR_SERDE = Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleAggregator.class)); private static final Serde<SolarPanelAggregator> SOLAR_PANEL_AGGREGATOR_SERDE = Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarPanelAggregator.class)); private static final Serde<SolarModuleKey> SOLAR_MODULE_KEY_SERDE = Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleKey.class)); private static final Serde<SolarPanelAggregatorJoiner> SOLAR_PANEL_AGGREGATOR_JOINER_SERDE = Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarPanelAggregatorJoiner.class)); private static final Serde<SolarModuleAggregatorJoiner> SOLAR_MODULE_AGGREGATOR_JOINER_SERDE = Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleAggregatorJoiner.class)); private static final Serde<String> STRING_SERDE = Serdes.String(); private static final Serde<Windowed<String>> WINDOWED_STRING_SERDE = Serdes.serdeFrom( new TimeWindowedSerializer<>(STRING_SERDE.serializer()), new TimeWindowedDeserializer<>(STRING_SERDE.deserializer(), TIME_WINDOWS.size())); // 1 - sigma private static final double Z = 1;
// source stream from kafka final KStream<SolarModuleKey, SolarModuleData> source = builder .stream(IN_TOPIC, Consumed.with(STRING_SERDE, SOLAR_MODULE_DATA_SERDE)) .map((k, v) -> KeyValue.pair(new SolarModuleKey(v.getPanel(), v.getName()), v)); // calculating sum power and average power for modules final KStream<Windowed<SolarModuleKey>, SolarModuleAggregator> aggPowerPerSolarModuleStream = source .groupByKey(Grouped.with(SOLAR_MODULE_KEY_SERDE, SOLAR_MODULE_DATA_SERDE)) .windowedBy(TIME_WINDOWS) .aggregate(SolarModuleAggregator::new, (modelKey, value, aggregation) -> aggregation.updateFrom(value), Materialized.with(SOLAR_MODULE_KEY_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE)) .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded())) .toStream(); // calculating sum power and average power for panels final KStream<Windowed<String>, SolarPanelAggregator> aggPowerPerSolarPanelStream = aggPowerPerSolarModuleStream .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v)) .groupByKey(Grouped.with(WINDOWED_STRING_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE)) .aggregate(SolarPanelAggregator::new, (panelKey, value, aggregation) -> aggregation.updateFrom(value), Materialized.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE)) .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded())) .toStream(); // if used for join more than once, the exception "TopologyException: Invalid topology:" will be thrown final KStream<Windowed<String>, SolarModuleAggregator> aggPowerPerSolarModuleForJoinStream = aggPowerPerSolarModuleStream .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v)); // joining aggregated panels with aggregated modules // need for calculating sumSquare and deviance final KStream<Windowed<String>, SolarPanelAggregatorJoiner> joinedAggPanelWithAggModule = aggPowerPerSolarPanelStream .join( aggPowerPerSolarModuleForJoinStream, SolarPanelAggregatorJoiner::new, JOIN_WINDOWS, Joined.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE)); //calculating sumSquare and deviance final KStream<Windowed<String>, SolarPanelAggregator> aggPowerPerSolarPanelFinalStream = joinedAggPanelWithAggModule .groupByKey(Grouped.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_JOINER_SERDE)) .aggregate(SolarPanelAggregator::new, (key, value, aggregation) -> aggregation.updateFrom(value), Materialized.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE)) .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded())) .toStream(); // joining aggregated modules with aggregated panels in which calculated sumSquare and deviance // need for check modules with anomaly power value final KStream<Windowed<String>, SolarModuleAggregatorJoiner> joinedAggModuleWithAggPanel = aggPowerPerSolarModuleStream .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v)) .join( aggPowerPerSolarPanelFinalStream, SolarModuleAggregatorJoiner::new, JOIN_WINDOWS, Joined.with(WINDOWED_STRING_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE)); // streaming result data (modules with anomaly power value) joinedAggModuleWithAggPanel .filter((k, v) -> isAnomalyModule(v)) .map((k, v) -> KeyValue.pair(k.key(), v)) .to(OUT_TOPIC, Produced.valueSerde(SOLAR_MODULE_AGGREGATOR_JOINER_SERDE)); // starting streams final KafkaStreams streams = new KafkaStreams(builder.build(), getProperties()); streams.cleanUp(); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
محاسبه دادههای ناهنجار
////img
private static boolean isAnomalyModule(SolarModuleAggregatorJoiner module) { double currentZ = Math.abs(module.getSumPower() - module.getSolarPanelAggregator().getAvgPower()) / module.getSolarPanelAggregator().getDeviance(); return currentZ > Z; }
نمونه خروجی برنامه
...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 1]: 30.0:6 ...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 2]: 30.0:6 ...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 3]: 21.0:6 ...SolarConsumer - PerSolarPanel: [1572447690|Panel 1]: 81.0:3 ...SolarConsumer - PerSolarPanelFinal: [1572447660|Panel 1]: power:81.0 count:3 squareSum:54.0 variance:18.0 deviance:4.2 ...SolarConsumer - ANOMALY module: [1572447660|Panel 1|Module 3]: sumPower:21.0 panelAvg:27.0 deviance:4.2
مرحله 3. پیکربندی ادغام Kafka.
بیایید ThingsBoard را طوری پیکربندی کنیم که به موضوع “solar-module-anomalies” مشترک شود و هشدارها را ایجاد کند. ما از ادغام Kafka که از ThingsBoard نسخه 2.4.2 به بعد در دسترس است استفاده خواهیم کرد.
پیکربندی تبدیل کننده Uplink
قبل از راه اندازی ادغام Kafka ، باید تبدیل کننده داده Uplink را ایجاد کنید. تبدیل کننده داده Uplink مسئول تجزیه دادههای ناهنجار ورودی است.
نمونهای از پیام ورودی که توسط برنامه Kafka Streams ما تولید میشود:
{ "moduleName": "Module 3", "panelName": "Panel 1", "count": 6, "sumPower": 21.0, "avgPower": 3.5, "solarPanelAggregator": { "panelName": "Panel 1", "count": 3, "sumPower": 81.0, "avgPower": 27.0, "squaresSum": 54.0, "variance": 18.0, "deviance": 4.2 } }
لطفاً اسکریپت مورد نظر را در بخش تابع Decoder مشاهده کنید.
// Decode an uplink message from a buffer // payload - array of bytes // metadata - key/value object /** Decoder **/ // decode payload to string var msg = decodeToJson(payload); // decode payload to JSON // var data = decodeToJson(payload); var deviceName = msg.moduleName; var deviceType = 'module'; // Result object with device attributes/telemetry data var result = { deviceName: deviceName, deviceType: deviceType, attributes: { panel: msg.panelName }, telemetry: { avgPower: msg.avgPower, sumPower: msg.sumPower, avgPowerFromPanel: msg.solarPanelAggregator.avgPower, deviance: msg.solarPanelAggregator.deviance } }; /** Helper functions **/ function decodeToString(payload) { return String.fromCharCode.apply(String, payload); } function decodeToJson(payload) { // covert payload to string. var str = decodeToString(payload); // parse string to JSON var data = JSON.parse(str); return data; } return result;
هدف تابع Decoder، تجزیه داده و متادیتای ورودی به یک فرمت قابل استفاده توسط ThingsBoard است. deviceName و deviceType لازم است، در حالی که attributes و telemetry اختیاری هستند. attributes و telemetry اشیاء کلید-مقدار تک بعدی هستند و اشیاء تو در تو پشتیبانی نمیشوند.
////img
پیکربندی ادغام Kafka
بیایید ادغام Kafka را ایجاد کنیم که به موضوع “solar-module-anomalies” مشترک شود.
////img
مرحله 4. پیکربندی موتور قوانین برای ایجاد هشدارها.
در این مرحله، بر اساس پرچم بولین “anomaly” در تلمتری ورودی، از راهنمای “ایجاد و پاکسازی هشدارها” برای ایجاد هشدار استفاده کنید و از راهنمای “ارسال ایمیل در هشدار” برای ارسال اطلاعیههای ایمیل استفاده کنید. برای یادگیری بیشتر، راهنماهای دیگر را بررسی کنید.
مرحله 5. حذف پیامهای روزانه اشکالزدایی
اگرچه حالت اشکالزدایی بسیار مفید برای توسعه و رفع مشکلات است، اما باقی گذاشتن آن در حالت تولید ممکن است باعث افزایش قابل توجهی در فضای دیسک مورد استفاده توسط پایگاه داده شود زیرا تمام دادههای اشکالزدایی در آنجا ذخیره میشوند. توصیه میشود که پس از انجام اشکالزدایی حالت اشکالزدایی را خاموش کنید.