Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor management of duplicate insertion with unique indexes #3360

Open
pommepommee opened this issue Dec 20, 2024 · 0 comments
Open

Poor management of duplicate insertion with unique indexes #3360

pommepommee opened this issue Dec 20, 2024 · 0 comments
Labels

Comments

@pommepommee
Copy link

Describe the bug
When I perform a bulk write operation (inserting or saving a list) when the list contains duplicates that are already in the database (with a unique index on several attributes of this class), the elements of the list before the duplicate are inserted but not those after.

I've thought about a workaround but this inevitably leads to data loss, especially as it's impossible to know which elements have been inserted and which haven't since insert or save systematically returns the same list as was sent.

For example, I tried to insert (or save) at first, I catch the BulkWriteError, parse the message to identify which data hasn't been inserted, filter it on the original array and try once again to insert. But with this method, the elements before the duplicates has been inserted and at the second time, became a duplicate too.

I tested the performance of looping on the array and insert one by one the data but this 5 time slower on an array of 10k elements.

To Reproduce
Steps to reproduce the behavior:

  1. Have a model with multiple attributes like name, category etc
  2. Create a unique index on these attributes
  3. Insert a data
  4. Try to save / insert a list containing the data already in the database, you can try at different positions in the list.

Expected behavior
Mutiple choice here, assuming there is a unique index on the table :

  • log the E11000 duplicate key error collection but continues to insert the other elements of the array, returns a list of elements that cannot be inserted or those that have been inserted
  • or returns the list of the elements that cannot be inserted (that means duplicates and those after the duplicates) in order to retry.
  • something else?

** Please complete the following information: **

  • Server Version: mongo 7.0.5
  • Driver Version: mongodb-driver-sync:4.11.1
  • Morphia Version: morphia-core:2.4.11

Additional context

Model

@Entity("tmPackets")
@Indexes({@Index(fields = @Field(value = "receptionTime", type = IndexType.DESC)),
    @Index(fields = @Field(value = "onBoardTime", type = IndexType.DESC)),
    @Index(fields = @Field(value = "packetType", type = IndexType.DESC)),
    @Index(
        fields = {@Field(value = "onBoardTime", type = IndexType.DESC),
            @Field(value = "sourceSeqCount", type = IndexType.DESC),
            @Field(value = "apid", type = IndexType.DESC)},
        options = @IndexOptions(unique = true))})
public class TmPacketToStore {
  /**
   * ID of the packet. This field is only used after storing.
   */
  @Id
  @JsonSerialize(using = ObjectIdSerializer.class)
  @JsonDeserialize(using = ObjectIdDeserializer.class)
  private ObjectId id;

  /**
   * Raw binary values of the packet.
   */
  private byte[] rawPacket;

  /**
   * The ground reception time.
   */
  private Instant receptionTime;

  /**
   * The onboard time.
   */
  private Instant onBoardTime;

  /**
   * The type of the packet.
   */
  private String packetType;

  /**
   * The APID of the packet.
   */
  private long apid;

  /**
   * The source sequence counter.
   */
  private long sourceSeqCount;

DAO

/**
   * Insert telemetry packets in database.
   *
   * @param tmPackets Tm packets to insert
   */
  public List<ObjectId> insertBatchTmPackets(List<TmPacketToStore> tmPackets) {
    log.debug("Inserting TM packets by batch {}",
        tmPackets.stream().map(TmPacketToStore::getPacketType).toList());
    List<ObjectId> ids;
    List<TmPacketToStore> tmStored = new ArrayList<>();

    try {
      tmStored = this.dataStore.save(tmPackets);
    } catch (MongoBulkWriteException we) {
      List<BulkWriteError> errors = we.getWriteErrors();
      for (BulkWriteError error : errors) {
        String errorMsg = error.getMessage();
        TmPacketToStore duplicateTmFound = findAssociatedTmPacket(tmPackets, errorMsg);
        if (duplicateTmFound != null) {
          log.warn(errorMsg);
          log.warn("Cannot insert duplicate TmPacket {} OnBoardTime({}) SrcSeqCount({}) APID({})",
              duplicateTmFound.getPacketType(), duplicateTmFound.getOnBoardTime(),
              duplicateTmFound.getSourceSeqCount(), duplicateTmFound.getApid());
        }
      }
    } catch (Exception e) {
      log.error("Unhandled exception {}", e.toString());
      throw e;
    }

    ids = tmStored.stream().map(TmPacketToStore::getId).toList();
    log.debug("Inserted {} TM packets {}", tmPackets.size(), tmPackets);
    return ids;
  }

Test

@Test
  void insertDuplicatesTm() throws InterruptedException {
    final int packetsCount = 10;
    final int expectedPacketsCount = 9;
    final byte[] redacted = REDACTED_BYTE_ARRAY

    List<TmPacketToStore> packetsToStore = new ArrayList<>();
    Instant onBoardTime = Instant.now();

    for (int i = 0; i < packetsCount; i++) {
      TmPacketToStore packet = new TmPacketToStore();
      packet.setPacketType("REDACTED");
      packet.setRawPacket(redacted);
      packet.setSourceSeqCount(i);
      packet.setReceptionTime(Instant.now());
      packet.setOnBoardTime(onBoardTime.plusSeconds(i * 10));
      packetsToStore.add(packet);
      System.out.println(packet);
    }

    // Create duplicate
    TmPacketToStore toDup = packetsToStore.get(1);
    TmPacketToStore fromDup = packetsToStore.get(0);
    fromDup.setOnBoardTime(toDup.getOnBoardTime());
    fromDup.setReceptionTime(toDup.getReceptionTime());
    fromDup.setSourceSeqCount(toDup.getSourceSeqCount());
    fromDup.setApid(toDup.getApid());

    tmDao.insertBatchTmPackets(packetsToStore);

    int sleepCount = 0;
    // This is the only way to do the test without large Thread sleep in an efficient way
    while (sleepCount != 100) {
      Thread.sleep(1); // NOSONAR
      sleepCount++;
    }

    List<TmPacketToStore> selectResult = tmDao.getLastTmPacketsByReceptionTime(10);

    System.out.println("count inserted " + selectResult.size());
    for (TmPacketToStore packet : selectResult) {
      System.out.print(packet);
    }
    assertEquals(expectedPacketsCount, selectResult.size());
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant