Skip to content

Commit

Permalink
added documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
rliebi committed Oct 30, 2024
1 parent 20c3f41 commit 920bc72
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 13 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The only job of the bundle is to store Pimcore elements (assets, documents, data

1. `composer require valantic/pimcore-elastica-bridge`
1. Edit `config/bundles.php` and add `\Valantic\ElasticaBridgeBundle\ValanticElasticaBridgeBundle::class => ['all' => true],`
1. Configure the connection to your Elasticsearch cluster as seen in [`example/app/config/config.yaml`](example/app/config/config.yaml)
1. Configure the connection to your Elasticsearch cluster as seen in [`example/app/config/config.yaml`](/docs/example/config/config.yaml)
1. Don't forget to register your newly created services (implementing `IndexInterface` etc.) in your `services.yaml`
```yml
App\Elasticsearch\:
Expand All @@ -22,7 +22,7 @@ The only job of the bundle is to store Pimcore elements (assets, documents, data

## Usage

Please see the [`docs/example/`](docs/example/) folder for a complete example. The following steps link to the corresponding section in the example and explain in a bit more detail what they are doing.
Please see the [`docs/example/`](/docs/example) folder for a complete example. The following steps link to the corresponding section in the example and explain in a bit more detail what they are doing.

### Define an index

Expand Down Expand Up @@ -67,6 +67,10 @@ valantic_elastica_bridge:
# If true, when a document fails to be indexed, it will be skipped and indexing continue with the next document. If false, indexing that index will be aborted.
should_skip_failing_documents: false
```

### Async Configuration
This bundle supports utilizing the message queue for indexing. To enable this feature, you can find the necessary configuration in the [async](async.md) documentation.

## Events

This project uses Symfony's event dispatcher. Here are the events that you can listen to:
Expand Down
56 changes: 56 additions & 0 deletions async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Async Settings
## Basic Configuration
The package is capable of running indexing jobs asynchronously. This is done by using the Symfony Messenger component.
To enable this feature, you need to configure the following settings:

```yaml
# config.yaml
framework:
messenger:
transports:
elastica_bridge_populate: '%env(MESSENGER_TRANSPORT_DSN)%'
```
```dotenv
# .env
MESSENGER_TRANSPORT_DSN='doctrine://default?queue_name=elastica_bridge_populate'
```

This configuration will send all relevant messages to the `elastica_bridge_populate` transport. The `elastica_bridge_populate` transport is
configured to use the `doctrine` transport, which stores the messages in the database. The `queue_name` parameter
specifies the name of the queue where the messages are stored.

## Event Listeners
To take full advantage you will need to configure some event listeners.
See [PopulateListener.php](/docs/example/src/EventListener/PopulateListener.php) and [PopulateService.php](/docs/example/src/Service/PopulateService.php) for a full working example.

The package provides the following events:

| Event Description | Possible Use Cases | Event Name | Event Class |
|-------------------------------|-----------------------------------------------------------------------------------------------------------------|------------------------------|---------------------------|
| Before the index is populated | <ul><li>Determine the source of the message and possibly clear previous errors</li></ul> | `PRE_EXECUTE` | `PreExecuteEvent` |
| Before the index is populated | <ul><li>Set expected message count</li></ul> | `PRE_PROCESS_MESSAGES_EVENT` | `PreProcessMessagesEvent` |
| Before a document is created | <ul><li>Stop document creation if execution is locked</li><li>give the remaining messages for logging</li></ul> | `PRE_DOCUMENT_CREATE` | `PreDocumentCreateEvent` |
| After a document is created | <ul><li>Decrement remaining messages</li><li>lock execution if document creation failed</li></ul> | `POST_DOCUMENT_CREATE` | `PostDocumentCreateEvent` |
| Before a index is switched | <ul><li>Skip switch if execution is locked</li><li>update the remaining messages</li></ul> | `PRE_SWITCH_INDEX` | `PreSwitchIndexEvent` |
| Before a index is switched | <ul><li>Check if all messages are consumed</li><li>update the remaining messages</li></ul> | `WAIT_FOR_COMPLETION_EVENT` | `WaitForCompletionEvent` |
| After a index is switched | <ul><li>Log</li><li>Send Notifications</li></ul> | `POST_SWITCH_INDEX` | `PostSwitchIndexEvent` |



## Workers
Workers are preferably setup using a supervisor configuration. The following is an example configuration for a worker:

### Queue Worker
To process the messages, you need to set up a worker. This can be done by running the following command:

```shell
$ bin/console messenger:consume elastica_bridge_populate
```

### Scheduler Worker
To process the messages in a scheduled manner, you can use the following command:

```shell
$ bin/console messenger:consume scheduler_populate_index
```
10 changes: 10 additions & 0 deletions docs/example/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
valantic_elastica_bridge:
events:
auto_save:
document: false
asset: false
data_object: false
client:
should_add_sentry_breadcrumbs: true
dsn: 'http://localhost:9200'
indexing:
lock_timeout: 3600 # lock timeout for indexing
interval: 600 # interval in which the scheduler is executed
cooldown: 3600 # cooldown between two scheduler runs for each index
103 changes: 103 additions & 0 deletions docs/example/src/EventListener/PopulateListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

namespace App\EventListener;

use App\Service\PopulateService;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Valantic\ElasticaBridgeBundle\Model\Event\ElasticaBridgeEvents;
use Valantic\ElasticaBridgeBundle\Model\Event\PostDocumentCreateEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreDocumentCreateEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreExecuteEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreProcessMessagesEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreSwitchIndexEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\WaitForCompletionEvent;

class PopulateListener implements EventSubscriberInterface
{
public function __construct(
private readonly PopulateService $populateService,
) {}

public function onPostDocumentCreate(PostDocumentCreateEvent $event): void
{
if ($event->success) {
$this->populateService->decrementRemainingMessages($event->index->getName());

return;
}

if ($event->willRetry || $event->skipped) {
return;
}

$this->populateService->lockExecution($event->index->getName());
}

public function onPostSwitchIndex(): void {}

public function onPreDocumentCreate(PreDocumentCreateEvent $event): void
{
if ($this->populateService->isExecutionLocked($event->index->getName())) {
$event->stopExecution();

return;
}

$event->setCurrentCount($this->populateService->getRemainingMessages($event->index->getName()));
}

public function onPrePopulateIndex(PreExecuteEvent $prePopulateEvent): void
{
if ($prePopulateEvent->source === PreExecuteEvent::SOURCE_CLI) {
$this->populateService->unlockExecution($prePopulateEvent->index->getName());
}
}

public function onPreSwitchIndex(PreSwitchIndexEvent $event): void
{
if ($this->populateService->isExecutionLocked($event->index->getName())) {
$event->skipSwitch();
$event->initiateCooldown = false;
}

$event->setRemainingMessages($this->populateService->getRemainingMessages($event->index->getName()));
}

public function onWaitForCompletion(WaitForCompletionEvent $event): void
{
if ($this->populateService->isExecutionLocked($event->index->getName())) {
$event->skipSwitch();

return;
}

$retryCount = $event->retries;
$event->setRemainingMessages($this->populateService->getRemainingMessages($event->index->getName()));

if ($retryCount > $event->maximumRetries - 1) {
$remainingMessages = $this->populateService->getActualMessageCount($event->index->getName());
$event->setRemainingMessages($remainingMessages);
$this->populateService->setExpectedMessages($event->index->getName(), $remainingMessages);
}
}

public function preProcessMessagesEvent(PreProcessMessagesEvent $messageQueueInitializedEvent): void
{
$this->populateService->setExpectedMessages($messageQueueInitializedEvent->index->getName(), $messageQueueInitializedEvent->expectedMessages);
}

public static function getSubscribedEvents(): array
{
return [
ElasticaBridgeEvents::PRE_EXECUTE => 'onPrePopulateIndex',
ElasticaBridgeEvents::PRE_PROCESS_MESSAGES_EVENT => 'preProcessMessagesEvent',
ElasticaBridgeEvents::PRE_DOCUMENT_CREATE => 'onPreDocumentCreate',
ElasticaBridgeEvents::POST_DOCUMENT_CREATE => 'onPostDocumentCreate',
ElasticaBridgeEvents::PRE_SWITCH_INDEX => 'onPreSwitchIndex',
ElasticaBridgeEvents::WAIT_FOR_COMPLETION_EVENT => 'onWaitForCompletion',
ElasticaBridgeEvents::POST_SWITCH_INDEX => 'onPostSwitchIndex',
];
}
}
101 changes: 101 additions & 0 deletions docs/example/src/Service/PopulateService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

declare(strict_types=1);

namespace App\Service;

use Doctrine\DBAL\Connection;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;

class PopulateService
{
public const KEY_NAME_FAILURE = 'failure';
private const KEY_PREFIX = 'elasticsearch_populate';
private const REMAINING_MESSAGES = 'remaining_messages';

public function __construct(
#[Autowire(service: 'cache.default_redis_provider')]
private readonly \Redis $redis,
private readonly Connection $connection,
private readonly ConsoleOutputInterface $consoleOutput,
) {}

public function decrementRemainingMessages(string $indexName): void
{
$this->redis->decr($this->getKeyName($indexName, self::REMAINING_MESSAGES));
}

public function incrementRemainingMessages(string $indexName): void
{
$this->redis->incr($this->getKeyName($indexName, self::REMAINING_MESSAGES));
}

public function getRemainingMessages(string $indexName): int
{
return (int) $this->redis->get($this->getKeyName($indexName, self::REMAINING_MESSAGES));
}

public function setExpectedMessages(string $indexName, int $expectedMessages): void
{
$this->redis->set($this->getKeyName($indexName, self::REMAINING_MESSAGES), $expectedMessages);
}

public function getActualMessageCount(string $indexName): int
{
$query = "SELECT
COUNT(mm.id) AS remaining_messages
FROM messenger_messages mm
WHERE mm.queue_name = 'elastica_bridge_populate'
AND mm.body LIKE CONCAT('%\\\\\\\\\"', :indexName, '\\\\\\\\\"%')
AND mm.delivered_at IS NULL
AND mm.body LIKE '%CreateDocument%'";

$count = $this->connection->executeQuery($query, ['indexName' => $indexName, 'indexNameLength' => strlen($indexName)])->fetchOne();

return (int) $count;
}

public function lockExecution(string $document): string
{
$key = $this->getKeyName($document, self::KEY_NAME_FAILURE);

if ($this->isExecutionLocked($document)) {
return $key;
}

$this->redis->set($key, 1, ['NX', 'EX' => 1200]);
$this->consoleOutput->writeln(sprintf('Locking execution for %s (%s)', $document, $key), ConsoleOutputInterface::VERBOSITY_VERBOSE);

return $key;
}

public function unlockExecution(string $document): void
{
$key = $this->getKeyName($document, self::KEY_NAME_FAILURE);

if ($this->redis->exists($key) === 0) {
return;
}

$this->redis->del($key);
$this->consoleOutput->writeln(sprintf('Unlocking execution for %s (%s)', $document, hash('sha256', $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);
}

public function isExecutionLocked(string $document): bool
{
$key = $this->getKeyName($document, self::KEY_NAME_FAILURE);
$exists = $this->redis->exists($key);

if (is_int($exists)) {
return $exists > 0;
}

return false;
}

public function getKeyName(string $document, string $type): string
{
return sprintf('%s_%s_%s', self::KEY_PREFIX, $document, $type);
}
}
8 changes: 0 additions & 8 deletions src/Constant/CommandConstants.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,10 @@

interface CommandConstants
{
public const OPTION_CONFIG = 'config';
public const OPTION_INDEX = 'index';
public const OPTION_BATCH_NUMBER = 'batch-number';
public const OPTION_LISTING_COUNT = 'listing-count';
public const OPTION_DOCUMENT = 'document';

public const COMMAND_NAMESPACE = 'valantic:elastica-bridge:';

public const COMMAND_INDEX = self::COMMAND_NAMESPACE . 'index';
public const COMMAND_CLEANUP = self::COMMAND_NAMESPACE . 'cleanup';
public const COMMAND_REFRESH = self::COMMAND_NAMESPACE . 'refresh';
public const COMMAND_STATUS = self::COMMAND_NAMESPACE . 'status';
public const COMMAND_POPULATE_INDEX = self::COMMAND_NAMESPACE . 'populate-index';
public const COMMAND_DO_POPULATE_INDEX = self::COMMAND_NAMESPACE . 'do-populate-index';
}
3 changes: 0 additions & 3 deletions src/Resources/config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
framework:
messenger:
# failure_bridge: elastica_bridge_failed
enabled: true
buses:
messenger.bus.elasticaBridge:
Expand All @@ -12,10 +11,8 @@ framework:
elastica_bridge_sync: 'sync://'
scheduler: 'doctrine://default?queue_name=scheduler'
elastica_bridge_populate:
# dsn: 'doctrine://default?queue_name=elastica_bridge_populate'
dsn: 'sync://'
failure_transport: 'elastica_bridge_failed'
# elastica_bridge_populate: 'sync://'
elastica_bridge_failed: 'doctrine://default?queue_name=elastica_bridge_failed'
routing:
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement: elastica_bridge_index
Expand Down

0 comments on commit 920bc72

Please sign in to comment.