API Events
@nx/searchlà một CDC consumer. Nó subscribe 18 Debezium topic và chỉ produce tới một dead-letter queue. Không có business event outbound, không WebSocket, không BullMQ. Đặt tên topic theo Debezium{prefix}.{schema}.{table}với prefixnx.seller.
1. Inbound — Kafka (CDC)
Tất cả topic định nghĩa trong SearchCollections.CDC (src/common/kafka-topics.ts); consumer subscribe ALL_CDC_TOPICS. Handler là CDCService.handleBatch() cho mọi topic.
Direct document sources (map trong TableToCollectionMap)
| Topic | Schema.Table | → Collection |
|---|---|---|
nx.seller.public.Organizer | public.Organizer | organizers |
nx.seller.public.Merchant | public.Merchant | merchants |
nx.seller.public.Category | public.Category | categories |
nx.seller.public.Device | public.Device | devices |
nx.seller.public.SaleChannel | public.SaleChannel | sale-channels |
nx.seller.public.Product | public.Product | products |
nx.seller.public.ProductInfo | public.ProductInfo | products (i18n partial) |
nx.seller.public.ProductVariant | public.ProductVariant | product-variants |
nx.seller.inventory.InventoryStock | inventory.InventoryStock | inventories |
Cascade-only sources (không phải direct doc source — fan out qua CDCCascadeService)
| Topic | Schema.Table | Tính lại |
|---|---|---|
nx.seller.public.ProductCategory | public.ProductCategory | products + product-variants categoryIds |
nx.seller.public.MetaLink | public.MetaLink | product → variant productMetaLinks; variant metaLinks |
nx.seller.pricing.FareSet | pricing.FareSet | variant fareSet + defaultPrice |
nx.seller.pricing.Fare | pricing.Fare | variant fareSet + defaultPrice |
nx.seller.public.ProductBundler | public.ProductBundler | variant comboItems |
nx.seller.inventory.InventoryItem | inventory.InventoryItem | nhóm item inventories |
nx.seller.inventory.InventoryLocation | inventory.InventoryLocation | nhóm location inventories |
nx.seller.inventory.InventoryIdentifier | inventory.InventoryIdentifier | inventories identifiers[] |
nx.seller.inventory.Material | inventory.Material | item inventories (kiểu Material) |
Op code Debezium
| Op | Ý nghĩa | Action |
|---|---|---|
c | create | upsert doc |
u | update | upsert doc |
d | delete | delete doc |
r | snapshot read | upsert doc (initial load) |
Bất kỳ op nào mà mapper trả về
null(soft-deleted) trở thành một Typesense delete.
2. Outbound — Kafka
| Topic | Trigger | Consumers | Payload |
|---|---|---|---|
nx.seller.cdc.dlq (mặc định, override qua APP_ENV_CDC_DLQ_TOPIC) | Một message CDC fail xử lý sau retry | Ops / tooling replay thủ công | Message CDC gốc + metadata lỗi |
3. Inbound — BullMQ
N/A — không có BullMQ consumer. (Thiết kế embedding-queue trong selection report là một tùy chọn tương lai, chưa implement.)
4. Outbound — BullMQ
N/A — không có BullMQ producer.
5. WebSocket Emissions
N/A — thư viện không emit WebSocket event nào. Cập nhật UI real-time là mối quan tâm của host service.
6. Payload Schemas
Payload CDC là Debezium envelope, giải mã với avsc. Search không sở hữu các schema này — chúng phản chiếu bảng nguồn. Shape (rút gọn):
// Debezium change event (Avro-decoded)
interface DebeziumEnvelope<TBefore, TAfter> {
op: 'c' | 'u' | 'd' | 'r';
before: TBefore | null;
after: TAfter | null;
source: {
schema: string; // public | pricing | inventory
table: string; // vd "Product"
lsn: number; // → guard replay sai thứ tự (SearchVersionFields.SOURCE_LSN)
ts_ms: number;
};
}Tham số query search (hướng consumer, ISearchParams):
interface ISearchParams {
q?: string; // mặc định '*'
limit?: number; // mặc định 10, max 250
offset?: number;
where?: unknown; // kiểu Ignis → filter_by
order?: string | string[]; // "createdAt DESC" → sort_by
include?: IIncludeSpec[]; // hydration quan hệ / native join
useCache?: boolean;
cacheTtl?: number;
disableSemanticSearch?: boolean; // bỏ embedding khỏi query_by (keyword nghiêm ngặt)
}7. Idempotency & Ordering
| Topic class | Delivery | Ordering | Recovery |
|---|---|---|---|
| Tất cả topic CDC | at-least-once (auto-commit off; commit sau batch) | per-key qua Debezium; guard LSN loại bỏ event cũ | đọc lại offset chưa-commit sau restart / circuit-breaker close |
| DLQ | fire-and-forget | — | replay thủ công |
| Guard | Field | Hiệu ứng |
|---|---|---|
| LSN | source_lsn (SearchVersionFields.SOURCE_LSN) | Skip event nếu LSN của nó ≤ LSN doc đã lưu |
| Tombstone | deleted_at (SearchVersionFields.DELETED_AT) | So sánh để giữ delete khỏi bị hoàn tác bởi upsert cũ |