programing

클러스터 환경에서 실행 중인 스프링 스케줄링된 작업

minimums 2023. 3. 9. 21:56
반응형

클러스터 환경에서 실행 중인 스프링 스케줄링된 작업

60초마다 실행되는 cron 작업이 있는 어플리케이션을 쓰고 있습니다.애플리케이션은 필요에 따라 여러 인스턴스로 확장되도록 구성되어 있습니다.60초마다 1개의 인스턴스(임의의 노드)에서만 작업을 수행합니다.즉시 해결 방법을 찾을 수 없고 이전에 여러 번 물어보지 않은 것이 놀랍습니다.Spring 4.1.6을 사용하고 있습니다.

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>

바로 이 목적을 위한 ShutLock 프로젝트가 있습니다.실행할 때 잠가야 할 작업에 주석을 달기만 하면 됩니다.

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

스프링 및 LockProvider 구성

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}

이를 위해서는 JDBC-JobStore와 함께 Quartz Clustering을 사용해야 한다고 생각합니다.

는 클러스터에서 작업을 안전하게 실행하는 또 다른 단순하고 견고한 방법입니다.노드가 클러스터의 "리더"인 경우에만 데이터베이스를 기반으로 태스크를 실행할 수 있습니다.

또한 클러스터에서 노드에 장애가 발생하거나 종료된 경우 다른 노드가 리더가 됩니다.

"리더 선거" 메커니즘을 만들고 매번 자신이 리더인지 확인하는 것 밖에 없습니다.

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

다음의 순서에 따릅니다.

1. 클러스터 내의 노드당 1개의 엔트리를 유지하는 오브젝트와 테이블을 정의합니다.

@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

}

2. a) 데이터베이스에 노드를 삽입하고 b) 리더를 확인하는 서비스를 만듭니다.

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3. 데이터베이스를 ping하여 가 동작하고 있음을 송신한다.

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4. 준비 완료!작업을 수행하기 전에 자신이 리더인지 확인하십시오.

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

배치 및 스케줄링된 작업은 일반적으로 고객용 앱이 아닌 자체 독립 실행형 서버에서 실행되므로 클러스터에서 실행될 것으로 예상되는 애플리케이션에 작업을 포함할 필요가 없습니다.또한 클러스터 환경의 작업은 일반적으로 동일한 작업의 다른 인스턴스가 병렬로 실행되는 것을 걱정할 필요가 없기 때문에 작업 인스턴스를 분리할 필요가 없는 또 다른 이유가 있습니다.

간단한 해결책은 스프링 프로파일 내에서 작업을 구성하는 것입니다.예를 들어, 현재 설정이 다음과 같은 경우:

<beans>
  <bean id="someBean" .../>

  <task:scheduled-tasks>
    <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
  </task:scheduled-tasks>
</beans>

다음으로 변경합니다.

<beans>
  <beans profile="scheduled">
    <bean id="someBean" .../>

    <task:scheduled-tasks>
      <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
  </beans>
</beans>

다음 '을 '어플리케이션'을 '어플리케이션'으로 한 .scheduledactivated(프로파일 활성화)-Dspring.profiles.active=scheduled를 참조해 주세요.

어떤 이유로 프라이머리 서버를 사용할 수 없게 되었을 경우는, 프로파일을 유효하게 한 상태로 다른 서버를 기동하기만 하면, 정상적으로 동작합니다.


작업에 대한 자동 장애 조치를 원하는 경우에도 상황이 달라집니다.그런 다음 모든 서버에서 작업을 계속 실행하고 데이터베이스 테이블, 클러스터 캐시, JMX 변수 등의 공통 리소스를 통해 동기화를 확인해야 합니다.

데이터베이스 테이블을 사용하여 잠금을 수행하고 있습니다.테이블에 삽입할 수 있는 작업은 한 번에 1개뿐입니다.다른 한쪽은 DuplicateKey를 받습니다.예외.삽입 및 삭제 로직은 @Scheduled 주석 주위의 측면에 의해 처리됩니다.Spring Boot 2.0을 사용하고 있습니다.

@Component
@Aspect
public class SchedulerLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;  

    @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
    public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {

        String jobSignature = joinPoint.getSignature().toString();
        try {
            jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});

            Object proceed = joinPoint.proceed();

            jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
            return proceed;

        }catch (DuplicateKeyException e) {
            LOGGER.warn("Job is currently locked: "+jobSignature);
            return null;
        }
    }
}


@Component
public class EveryTenSecondJob {

    @Scheduled(cron = "0/10 * * * * *")
    public void taskExecution() {
        System.out.println("Hello World");
    }
}


CREATE TABLE scheduler_lock(
    signature varchar(255) NOT NULL,
    date datetime DEFAULT NULL,
    PRIMARY KEY(signature)
);

dlock은 데이터베이스 인덱스와 제약을 사용하여 태스크를 한 번만 실행하도록 설계되었습니다.당신은 다음과 같은 것을 간단히 할 수 있습니다.

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {

}

사용 방법에 대한 문서를 참조하십시오.

여기서 Zookeeper를 사용하여 마스터 인스턴스를 선택할 수 있으며 마스터 인스턴스는 예약된 작업만 수행합니다.

Aspect 및 Apache 큐레이터를 사용한 구현이 있습니다.

@SpringBootApplication
@EnableScheduling
public class Application {

    private static final int PORT = 2181;

    @Bean
    public CuratorFramework curatorFramework() {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:" + PORT, new ExponentialBackoffRetry(1000, 3));
        client.start();
        return client;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

애스펙트 클래스

 @Aspect
@Component
public class LeaderAspect implements LeaderLatchListener{

    private static final Logger log = LoggerFactory.getLogger(LeaderAspect.class);
    private static final String ELECTION_ROOT = "/election";

    private volatile boolean isLeader = false;

    @Autowired
    public LeaderAspect(CuratorFramework client) throws Exception {
        LeaderLatch ll = new LeaderLatch(client , ELECTION_ROOT);
        ll.addListener(this);
        ll.start();
    }


    @Override
    public void isLeader() {
        isLeader = true;
        log.info("Leadership granted.");
    }

    @Override
    public void notLeader() {
        isLeader = false;
        log.info("Leadership revoked.");
    }


    @Around("@annotation(com.example.apache.curator.annotation.LeaderOnly)")
    public void onlyExecuteForLeader(ProceedingJoinPoint joinPoint) {
        if (!isLeader) {
            log.debug("I'm not leader, skip leader-only tasks.");
            return;
        }

        try {
            log.debug("I'm leader, execute leader-only tasks.");
            joinPoint.proceed();
        } catch (Throwable ex) {
            log.error(ex.getMessage());
        }
    }

}

리더 전용 주석

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LeaderOnly {
}

스케줄링된 작업

@Component
public class HelloWorld {

    private static final Logger log = LoggerFactory.getLogger(HelloWorld.class);


    @LeaderOnly
    @Scheduled(fixedRate = 1000L)
    public void sayHello() {
        log.info("Hello, world!");
    }
}

노드 간 잠금을 관리하기 위해 데이터베이스를 설정할 필요 없이 다른 방법을 사용하고 있습니다.

그 성분은 헤이즐캐스트에 의해 불려지고 제공된다.

다른 노드가 일정과 연동되지 않도록 하기 위해 사용하고 있지만 일정의 노드 간 잠금 공유에도 사용할 수 있습니다.

이를 위해 서로 다른 잠금 이름을 만들 수 있는 두 가지 기능 도우미를 설정하기만 하면 됩니다.

@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   
   // This can also be a member of the class.
   HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

   Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");

   lock.lock();
   try {
      // do your schedule tasks here

   } finally {
      // don't forget to release lock whatever happens: end of task or any exceptions.
      lock.unlock();
   }
}

또는 지연 후 자동으로 잠금을 해제할 수도 있습니다. 예를 들어 cron 작업이 1시간마다 실행되고 있다고 가정하면 다음과 같이 50분 후에 자동 릴리스를 설정할 수 있습니다.

@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   
   // This can also be a member of the class.
   HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

   Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");

   if ( lock.tryLock ( 50, TimeUnit.MINUTES ) ) {
      try {
         // do your schedule tasks here
      } finally {
         // don't forget to release lock whatever happens: end of task or any exceptions.
         lock.unlock();
      }
   } else {
     // warning: lock has been released by timeout!
   }
}

이 Hazelcast 구성 요소는 클라우드 기반 환경(예: k8s 클러스터)에서 매우 잘 작동하며 추가 데이터베이스 비용을 지불할 필요가 없습니다.

설정할 필요가 있는 것은 다음과 같습니다.

// We need to specify the name otherwise it can conflict with internal Hazelcast beans
@Bean("hazelcastInstance")
public HazelcastInstance hazelcastInstance() {
    Config config = new Config();
    config.setClusterName(hazelcastProperties.getGroup().getName());
    NetworkConfig networkConfig = config.getNetworkConfig();

    networkConfig.setPortAutoIncrement(false);
    networkConfig.getJoin().getKubernetesConfig().setEnabled(hazelcastProperties.isNetworkEnabled())
            .setProperty("service-dns", hazelcastProperties.getServiceDNS())
            .setProperty("service-port", hazelcastProperties.getServicePort().toString());
    config.setProperty("hazelcast.metrics.enabled", "false");

    networkConfig.getJoin().getMulticastConfig().setEnabled(false);

    return Hazelcast.newHazelcastInstance(config);
}

은 헤이즐캐스트 입니다.ConfigurationProperties오브젝트가 속성으로 매핑됩니다.

로컬 테스트의 경우 로컬프로파일의 속성을 사용하여 네트워크 설정을 디세블로 할 수 있습니다.

hazelcast:
  network-enabled: false
  service-port: 5701
  group:
    name: your-hazelcast-group-name

이를 위해서는 db-scheduler와 같은 임베디드 가능한 스케줄러를 사용할 수 있습니다.또한 지속적인 실행이 가능하며 단일 노드에 의한 실행을 보장하기 위해 단순하고 낙관적인 잠금 메커니즘을 사용합니다.

유스케이스의 실현 방법의 코드 예:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
    .execute((taskInstance, executionContext) -> {
        System.out.println("Executing " + taskInstance.getTaskAndInstance());
    });

   final Scheduler scheduler = Scheduler
          .create(dataSource)
          .startTasks(recurring1)
          .build();

   scheduler.start();

kJob-Manager라는 무료 HTTP 서비스를 사용하고 있습니다.https://kjob-manager.ciesielski-systems.de/

장점은 데이터베이스에 새 테이블을 만들지 않고 HTTP 요청이기 때문에 데이터베이스 연결이 필요하지 않다는 것입니다.

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import org.apache.tomcat.util.json.JSONParser;
import org.apache.tomcat.util.json.ParseException;
import org.junit.jupiter.api.Test;

public class KJobManagerTest {

    @Test
    public void example() throws IOException, ParseException {

        String data = "{\"token\":\"<API-Token>\"}";
        URL url = new URL("https://kjob-manager.ciesielski-systems.de/api/ticket/<JOB-ID>");

        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));

        JSONParser jsonParser = new JSONParser(connection.getInputStream());
        LinkedHashMap<String, LinkedHashMap<String, Object>> result = (LinkedHashMap<String, LinkedHashMap<String, Object>>) jsonParser.parse();

        if ((boolean) result.get("ticket").get("open")) {
            System.out.println("This replica could run the cronjob!");
        } else {
            System.out.println("This replica has nothing to do!");
        }

    }

}

스프링 컨텍스트는 클러스터되지 않으므로 분산 애플리케이션의 작업을 관리하는 것이 다소 어려우며 jgroup을 지원하는 시스템을 사용하여 상태를 동기화하고 작업이 우선 순위를 할당하도록 해야 합니다.또는 ejb 컨텍스트를 사용하여 jboss ha environment https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd와 같은 클러스터화된 ha 싱글톤 서비스를 관리할 수 있습니다. 또는 서비스와 첫 번째 서비스 간에 클러스터된 캐시와 액세스 잠금 리소스를 사용할 수 있습니다. 잠금을 적용하면 작업을 수행하거나 사용자가 소유한 jgroup을 구현하여 통신할 수 있습니다.1노드1개의 조작을 실행할 수단은 1노드입니다.

언급URL : https://stackoverflow.com/questions/31288810/spring-scheduled-task-running-in-clustered-environment

반응형