@Slf4j
public class JobRepositoryIntegrationTest {

	@Autowired
	private JobRepositoryTestService service;
	
	@Test
	@Rollback(false)
	public void updateWorker2() throws Exception {
		val targetId = 3L;
		Thread thread0 = new Thread(() -> updateWorkerValue2(targetId, "worker0"));
		Thread thread1 = new Thread(() -> updateWorkerValue2(targetId, "worker1"));
		Thread thread2 = new Thread(() -> updateWorkerValue2(targetId, "worker2"));
		Thread thread3 = new Thread(() -> updateWorkerValue2(targetId, "worker3"));
		Thread thread4 = new Thread(() -> updateWorkerValue2(targetId, "worker4"));
		thread0.start();
		thread1.start();
		thread2.start();
		thread3.start();
		thread4.start();
		thread0.join();
		thread1.join();
		thread2.join();
		thread3.join();
		thread4.join();
	}

	private void updateWorkerValue2(Long id, String worker) {

		log.info("+++ {}: STARTED", worker);

		try (val job = getLockableJob(id, service)) {
			log.info("+++ {}: GOT LOCK", worker);

			try {
				service.updateWorkerValue(id, worker);
				log.info("+++ {}: VALUE UPDATED", worker);
			} catch (Exception e) {
				job.error();
				throw e;
			}
		} catch (LockFailureException e) {
			log.info(e.getMessage(), e);
		} catch (Exception e) {
			log.error(e.getMessage(), e);
		}

		log.info("+++ {}: DONE", worker);
	}

public class LockableJob implements AutoCloseable {
	private Long id;
	private JobStatus closeStatus = DONE;
	private JobRepositoryTestService dbupdater;

	private LockableJob(Long id, JobRepositoryTestService dbupdater) throws LockFailureException {
		this.id = id;
		this.dbupdater = dbupdater;
		updateStatus(from(WAIT), to(LOCK));
	}

	static LockableJob getLockableJob(Long id, JobRepositoryTestService dbupdater) throws LockFailureException {
		return new LockableJob(id, dbupdater);
	}

	public void error() {
		this.closeStatus = ERROR;
	}

	@Override
	public void close() throws Exception {
		updateStatus(from(LOCK), to(closeStatus));
	}

	private void updateStatus(List<JobStatus> from, JobStatus to) throws LockFailureException {
		val affected = dbupdater.updateStatus(id, from, to);
		if (affected == 1)
			return;
		if (to == LOCK)
			throw new LockFailureException(id);
		throw new RuntimeException("error while updating status. id=" + id + ", from=" + from + ", toStatus=" + to);
	}

	@AllArgsConstructor
	static class LockFailureException extends Exception {
		private Long id;
		public String getMessage() {
			return firstNonNull(super.getMessage(), "") + " id=" + id;
		}
	}
}

@Service
@Transactional(readOnly = true)
@Slf4j
public class JobRepositoryTestService {

	@Autowired
	private JobRepository repo;

	@Transactional
	public void updateWorkerValue(Long id, String worker) {
		// TODO biz
		repo.updateValueOfLockedJob(id, worker);

		// if (id != null)
		//	throw new RuntimeException("for test");
	}

	@Transactional
	public int updateStatus(Long id, List<JobStatus> from, JobStatus to) {
		return repo.updateStatus(id, from, to);
	}
}

@Repository
public interface JobRepository {

	@Modifying
	@Query("update JobItem i set i.status=:newStatus, i.modifiedAt=now() where i.id=:id and i.status in :statuses")
	int updateStatus(
			@Param("id") Long id,
			@Param("statuses") List<JobStatus> statuses,
			@Param("newStatus") JobStatus newStatus);

	@Modifying
	@Query("update JobItem i set i.stringUnitId=:value, i.modifiedAt=now() where i.id=:id and i.status='LOCK'")
	int updateValueOfLockedJob(
			@Param("id") Long id,
			@Param("value") String value);
}
Valid XHTML 1.0! Valid CSS! powered by MoniWiki
last modified 2019-03-09 23:16:58
Processing time 0.0051 sec