📜  Spring Boot-批处理服务

📅  最后修改于: 2020-11-11 05:52:28             🧑  作者: Mango


您可以创建一个可执行的JAR文件,并使用Maven或Gradle命令运行Spring Boot应用程序,如下所示-

对于Maven,您可以使用下面给出的命令-

mvn clean install

在“ BUILD SUCCESS”之后,您可以在目标目录下找到JAR文件。

对于Gradle,您可以使用如下所示的命令-

gradle clean build

在“ BUILD SUCCESSFUL”之后,您可以在build / libs目录下找到JAR文件。

使用此处给定的命令运行JAR文件-

java –jar 

现在,该应用程序已在Tomcat端口8080上启动,如图所示。

批处理服务应用程序在Tomcat端口上启动

现在,在Web浏览器中单击URL http:// localhost:8080 /并连接Web套接字并发送问候语并接收消息。

Web套接字发送和接收消息

批处理服务是在单个任务中执行多个命令的过程。在本章中,您将学习如何在Spring Boot应用程序中创建批处理服务。

让我们考虑一个将CSV文件内容保存到HSQLDB中的示例。

要创建批处理服务程序,我们需要在构建配置文件中添加Spring Boot Starter Batch依赖性和HSQLDB依赖性。

Maven用户可以在pom.xml文件中添加以下依赖项。


   org.springframework.boot
   spring-boot-starter-batch


   org.hsqldb
   hsqldb

Gradle用户可以在build.gradle文件中添加以下依赖项。

compile("org.springframework.boot:spring-boot-starter-batch")
compile("org.hsqldb:hsqldb")

现在,在类路径资源– src / main / resources下添加简单的CSV数据文件,并将文件命名为file.csv,如下所示-

William,John
Mike, Sebastian
Lawarance, Lime

接下来,为HSQLDB编写一个SQL脚本-在类路径资源目录下-request_fail_hystrix_timeout

DROP TABLE USERS IF EXISTS;
CREATE TABLE USERS  (
   user_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
   first_name VARCHAR(20),
   last_name VARCHAR(20)
);

为USERS模型创建POJO类,如下所示-

package com.tutorialspoint.batchservicedemo;
public class User {
   private String lastName;
   private String firstName;

   public User() {
   }
   public User(String firstName, String lastName) {
      this.firstName = firstName;
      this.lastName = lastName;
   }
   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
   public String getFirstName() {
      return firstName;
   }
   public String getLastName() {
      return lastName;
   }
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }

   @Override
   public String toString() {
      return "firstName: " + firstName + ", lastName: " + lastName;
   }   
}

现在,创建一个中间处理器以在从CSV文件读取数据之后并将数据写入SQL之前执行操作。

package com.tutorialspoint.batchservicedemo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

public class UserItemProcessor implements ItemProcessor {
   private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class);

   @Override
   public User process(final User user) throws Exception {
      final String firstName = user.getFirstName().toUpperCase();
      final String lastName = user.getLastName().toUpperCase();
      final User transformedPerson = new User(firstName, lastName);

      log.info("Converting (" + user + ") into (" + transformedPerson + ")");
      return transformedPerson;
   }
}

让我们创建一个批处理配置文件,以从CSV读取数据并写入SQL文件,如下所示。我们需要在配置类文件中添加@EnableBatchProcessing批注。 @EnableBatchProcessing批注用于为您的Spring Boot应用程序启用批处理操作。

package com.tutorialspoint.batchservicedemo;

import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
   @Autowired
   public JobBuilderFactory jobBuilderFactory;

   @Autowired
   public StepBuilderFactory stepBuilderFactory;

   @Autowired
   public DataSource dataSource;

   @Bean
   public FlatFileItemReader reader() {
      FlatFileItemReader reader = new FlatFileItemReader();
      reader.setResource(new ClassPathResource("file.csv"));
      reader.setLineMapper(new DefaultLineMapper() {
         {
            setLineTokenizer(new DelimitedLineTokenizer() {
               {
                  setNames(new String[] { "firstName", "lastName" });
               }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper() {
               {
                  setTargetType(User.class);
               }
            });
         }
      });
      return reader;
   }
   @Bean
   public UserItemProcessor processor() {
      return new UserItemProcessor();
   }
   @Bean
   public JdbcBatchItemWriter writer() {
      JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
      writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
      writer.setSql("INSERT INTO USERS (first_name, last_name) VALUES (:firstName, :lastName)");
      writer.setDataSource(dataSource);
      return writer;
   }
   @Bean
   public Job importUserJob(JobCompletionNotificationListener listener) {
      return jobBuilderFactory.get("importUserJob").incrementer(
         new RunIdIncrementer()).listener(listener).flow(step1()).end().build();
   }
   @Bean
   public Step step1() {
      return stepBuilderFactory.get("step1").chunk(10).reader(reader()).processor(processor()).writer(writer()).build();
   }
}

reader()方法用于从CSV文件读取数据,而writer()方法用于将数据写入SQL。

接下来,我们将必须编写作业完成通知侦听器类,该类用于在作业完成后进行通知。

package com.tutorialspoint.batchservicedemo;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
   private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
   private final JdbcTemplate jdbcTemplate;

   @Autowired
   public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
      this.jdbcTemplate = jdbcTemplate;
   }
   @Override
   public void afterJob(JobExecution jobExecution) {
      if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
         log.info("!!! JOB FINISHED !! It's time to verify the results!!");

         List results = jdbcTemplate.query(
            "SELECT first_name, last_name FROM USERS", new RowMapper() {
            
            @Override
            public User mapRow(ResultSet rs, int row) throws SQLException {
               return new User(rs.getString(1), rs.getString(2));
            }
         });

         for (User person : results) {
            log.info("Found  in the database.");
         }
      }
   }
}

现在,创建一个可执行的JAR文件,并使用以下Maven或Gradle命令运行Spring Boot应用程序。

对于Maven,使用如下所示的命令-

mvn clean install

在“ BUILD SUCCESS”之后,您可以在目标目录下找到JAR文件。

对于Gradle,您可以使用如下所示的命令-

gradle clean build

在“ BUILD SUCCESSFUL”之后,您可以在build / libs目录下找到JAR文件。

使用此处给定的命令运行JAR文件-

java –jar 

您可以在控制台窗口中看到输出,如下所示:

控制台窗口中的批处理服务输出