استفاده از صف‌ها برای پردازش مجدد پیام‌ها

مورد استفاده

فرض کنید دستگاه شما داده‌های دما و رطوبت را به ThingsConnect ارسال می‌کند. ما ارسال پیام‌ها به یک سرور خارجی را با استفاده از گره فراخوانی Rest API شبیه‌سازی خواهیم کرد.

در این آموزش، ما موتور قوانین ThingsConnect را پیکربندی خواهیم کرد تا از صفی با استراتژی پردازش پیام‌های ناموفق و زمان‌توقف استفاده کنیم. هرچند این سناریو خیالی است، شما یاد خواهید گرفت چگونه با صف کار کنید تا در صورت بروز خطا یا زمان‌توقف پردازش، پیام‌ها را مجدداً پردازش کنید و از این دانش در برنامه‌های واقعی استفاده کنید.

پیش‌نیازها

ما فرض می‌کنیم که راهنماهای زیر را تکمیل کرده‌اید و مقالات ذکر شده را مرور کرده‌اید:

  • راهنمای شروع سریع
  • مرور کلی موتور قوانین

علاوه بر این، شما نیاز به داشتن حداقل یک دستگاه در محیط خود دارید.

گام ۱: ایجاد زنجیره قوانین

image

ما یک گره “تولیدکننده” اضافه خواهیم کرد تا شش پیام را با تأخیر ۱ ثانیه شبیه‌سازی کنیم.

image

تمام پیام‌ها در صفی با نام “HighPriority” قرار خواهند گرفت. این صف از استراتژی پردازش پیام به نام “RETRY_FAILED_AND_TIMED_OUT” استفاده می‌کند (لطفاً به راهنمای پیکربندی مراجعه کنید برای جزئیات بیشتر)، که به این معنی است که پیام‌های ناموفق یا زمان‌توقف‌شده دوباره پردازش خواهند شد.

image

در نهایت، پیام‌ها به سرور خارجی ارسال خواهند شد.

image

گام ۲: پیکربندی سرور خارجی

فرض کنید ما یک سرور آماده داریم تا پیام‌ها را دریافت کند. ما یک کنترلر ساده در برنامه Spring Boot برای این منظور ایجاد کرده‌ایم. علاوه بر این، هر پیام سوم را شبیه‌سازی کرده‌ایم تا ناموفق باشد.

				
					@RestController
@RequestMapping("/api/v1/test")
@Slf4j
public class Controller {

    private AtomicLong atomicLong = new AtomicLong(0);

    @RequestMapping(value = {"/"}, headers = "Content-Type=application/json", method = {RequestMethod.POST})
    @ResponseStatus(value = HttpStatus.OK)
    public DeferredResult<ResponseEntity> processRequest(@RequestBody JsonNode msg) {
        DeferredResult<ResponseEntity> deferredResult = new DeferredResult<>();

        log.info("Received message: {}", msg);

        long counter = atomicLong.incrementAndGet();
        if (counter % 3 == 0) {
            log.warn("Bad request: {}", msg);
            deferredResult.setResult(new ResponseEntity<>("Bad Request", HttpStatus.BAD_REQUEST));
        } else {
            log.info("Success: {}", msg);
            deferredResult.setResult(new ResponseEntity<>("Ok", HttpStatus.OK));
        }

        return deferredResult;
    }
}
				
			

گام ۳: اعتبارسنجی منطق زنجیره قوانین

بیایید بررسی کنیم که آیا منطق ما درست است یا خیر، با ذخیره کردن زنجیره قوانین و راه‌اندازی سرور خارجی. تولیدکننده شروع به تولید پیام‌ها خواهد کرد:

گره "Checkpoint" شش پیام را دریافت کرده است:

image

ما مشاهده می‌کنیم که گره فراخوانی Rest API بعدی، “Send Request”، هشت پیام را پردازش کرده است.

image

هر پیام سوم (دو پیام از شش پیام اولیه) ناموفق بوده است.

image

دو پیام آخر، پیام‌هایی هستند که نیاز به پردازش مجدد داشتند (پیام‌های ناموفق). این به این معنی است که منطق ما به درستی کار می‌کند.

TL;DR

فایل JSON پیوست‌شده را با زنجیره قوانین دانلود و وارد کنید. فراموش نکنید که گره‌های تولیدکننده را با دستگاه خاص خود پر کنید.

مراحل بعدی

  • راهنمای شروع سریع: این راهنماها نمای کلی از ویژگی‌های اصلی Thingsconnect ارائه می‌دهند و برای تکمیل آن‌ها ۱۵-۳۰ دقیقه زمان نیاز است.
  • اتصال دستگاه خود: یاد بگیرید چگونه دستگاه‌ها را بر اساس فناوری یا راه‌حل ارتباطی خود متصل کنید.
  • تصویرسازی داده‌ها: این راهنماها شامل دستورالعمل‌هایی برای پیکربندی داشبوردهای پیچیده Thingsconnect هستند.
  • تحلیل داده‌های IoT: یاد بگیرید چگونه از موتور قوانین برای انجام وظایف تحلیل پایه استفاده کنید.
  • نمونه‌های سخت‌افزاری: یاد بگیرید چگونه پلتفرم‌های سخت‌افزاری مختلف را به Thingsconnect متصل کنید.
  • ویژگی‌های پیشرفته: با ویژگی‌های پیشرفته Thingsconnect آشنا شوید.
  • مشارکت و توسعه: با مشارکت و توسعه در Thingsconnect آشنا شوید.

عناوین هر بخش