src/Centralstage/DeviceBundle/EventSubscriber/RawStoreMongoSubscriber.php line 20

Open in your IDE?
  1. <?php
  2. namespace App\Centralstage\DeviceBundle\EventSubscriber;
  3. use MongoDB\BSON\UTCDateTime;
  4. use App\Centralstage\CoreBundle\Utils\DateTimeHelper;
  5. use App\Centralstage\DeviceBundle\Models\DeviceDataModel;
  6. use App\Centralstage\DeviceBundle\Event\RawStoreMongoEvent;
  7. use Symfony\Component\DependencyInjection\ContainerInterface;
  8. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  9. class RawStoreMongoSubscriber implements EventSubscriberInterface
  10. {
  11.     private $primaryDocumentManager;
  12.     public function __construct(ContainerInterface $container)
  13.     {
  14.         $this->primaryDocumentManager   $container->get('doctrine_mongodb.odm.document_manager');
  15.     }
  16.     public function onProcessDeviceRawData(RawStoreMongoEvent $RawObject) : void
  17.     {   
  18.         try {
  19.             $context $RawObject->getData();
  20.             list($deviceChannelID$data$orgKey$currentDate) = array_values($context);
  21.             $currentDT         DateTimeHelper::parseDateTime($currentDate'Y-m-d'DateTimeHelper::ServerTimeZone);
  22.             $conDetails        DeviceDataModel::getDeviceRawDBCollection($orgKey$deviceChannelID$currentDT);
  23.             $dataBase          $conDetails["database"];
  24.             $collectionName    $conDetails["collection"];
  25.             $data              json_decode($datatrue);
  26.             $deviceTimestamp   $data['timestamp'];
  27.             $deviceTimestamp   floor(strlen($deviceTimestamp) == 10 $deviceTimestamp*1000 $deviceTimestamp);
  28.             $timestamp         = new UTCDateTime($deviceTimestamp); // Convert to milliseconds
  29.             $data['timeseries'] = $timestamp;
  30.             $data['timestamp'] = $deviceTimestamp;
  31.             $data['createdAt'] = new UTCDateTime();
  32.         
  33.             $database $this->primaryDocumentManager->getClient()->selectDatabase($dataBase);
  34.         
  35.             // Check if collection exists and create if not
  36.             $collections $database->listCollections(['filter' => ['name' => $collectionName]]);
  37.             if (iterator_count($collections) === 0) {
  38.                 $database->command([
  39.                     'create' => $collectionName,
  40.                     'timeseries' => [
  41.                         'timeField' => 'timeseries',
  42.                     ],
  43.                 ]);
  44.                 // Create index on the timestamp field
  45.                 $database->selectCollection($collectionName)->createIndex(['timestamp' => 1,'timeseries' => 1]);
  46.             }
  47.         
  48.             // Insert data into the collection
  49.             $database->selectCollection($collectionName)->insertOne($data);
  50.         } catch (\Exception $exception) {
  51.             $error $exception->getMessage();
  52.         }
  53.     }
  54.     public static function getSubscribedEvents(): array
  55.     {
  56.         return [
  57.             'device.raw.data' => 'onProcessDeviceRawData'
  58.         ];
  59.     }
  60.     public function __destruct(){}
  61. }