@Slf4j public class JobRepositoryIntegrationTest { @Autowired private JobRepositoryTestService service; @Test @Rollback(false) public void updateWorker2() throws Exception { val targetId = 3L; Thread thread0 = new Thread(() -> updateWorkerValue2(targetId, "worker0")); Thread thread1 = new Thread(() -> updateWorkerValue2(targetId, "worker1")); Thread thread2 = new Thread(() -> updateWorkerValue2(targetId, "worker2")); Thread thread3 = new Thread(() -> updateWorkerValue2(targetId, "worker3")); Thread thread4 = new Thread(() -> updateWorkerValue2(targetId, "worker4")); thread0.start(); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread0.join(); thread1.join(); thread2.join(); thread3.join(); thread4.join(); } private void updateWorkerValue2(Long id, String worker) { log.info("+++ {}: STARTED", worker); try (val job = getLockableJob(id, service)) { log.info("+++ {}: GOT LOCK", worker); try { service.updateWorkerValue(id, worker); log.info("+++ {}: VALUE UPDATED", worker); } catch (Exception e) { job.error(); throw e; } } catch (LockFailureException e) { log.info(e.getMessage(), e); } catch (Exception e) { log.error(e.getMessage(), e); } log.info("+++ {}: DONE", worker); }
public class LockableJob implements AutoCloseable { private Long id; private JobStatus closeStatus = DONE; private JobRepositoryTestService dbupdater; private LockableJob(Long id, JobRepositoryTestService dbupdater) throws LockFailureException { this.id = id; this.dbupdater = dbupdater; updateStatus(from(WAIT), to(LOCK)); } static LockableJob getLockableJob(Long id, JobRepositoryTestService dbupdater) throws LockFailureException { return new LockableJob(id, dbupdater); } public void error() { this.closeStatus = ERROR; } @Override public void close() throws Exception { updateStatus(from(LOCK), to(closeStatus)); } private void updateStatus(List<JobStatus> from, JobStatus to) throws LockFailureException { val affected = dbupdater.updateStatus(id, from, to); if (affected == 1) return; if (to == LOCK) throw new LockFailureException(id); throw new RuntimeException("error while updating status. id=" + id + ", from=" + from + ", toStatus=" + to); } @AllArgsConstructor static class LockFailureException extends Exception { private Long id; public String getMessage() { return firstNonNull(super.getMessage(), "") + " id=" + id; } } }
@Service @Transactional(readOnly = true) @Slf4j public class JobRepositoryTestService { @Autowired private JobRepository repo; @Transactional public void updateWorkerValue(Long id, String worker) { // TODO biz repo.updateValueOfLockedJob(id, worker); // if (id != null) // throw new RuntimeException("for test"); } @Transactional public int updateStatus(Long id, List<JobStatus> from, JobStatus to) { return repo.updateStatus(id, from, to); } }
@Repository public interface JobRepository { @Modifying @Query("update JobItem i set i.status=:newStatus, i.modifiedAt=now() where i.id=:id and i.status in :statuses") int updateStatus( @Param("id") Long id, @Param("statuses") List<JobStatus> statuses, @Param("newStatus") JobStatus newStatus); @Modifying @Query("update JobItem i set i.stringUnitId=:value, i.modifiedAt=now() where i.id=:id and i.status='LOCK'") int updateValueOfLockedJob( @Param("id") Long id, @Param("value") String value); }