Spring Boot批处理服务,您可以创建可执行 JAR 文件,并使用 Maven 或 Gradle 命令运行 Spring Boot 应用程序,如下所示 –

对于 Maven,您可以使用下面给出的命令 –

mvn clean install

“BUILD SUCCESS”后,您可以在目标目录下找到JAR文件。

对于 Gradle,您可以使用如下所示的命令 –

gradle clean build

“BUILD SUCCESSFUL”后,您可以在build/libs 目录下找到JAR 文件。

使用此处给出的命令运行 JAR 文件 –

java –jar <JARFILE>

现在,应用程序已在 Tomcat 端口 8080 上启动,如图所示。

batch service application started on tomcat port

现在,在 Web 浏览器中点击 URL http://localhost:8080/并连接 Web 套接字并发送问候语并接收消息。

web socket send receive message

批处理服务是在单个任务中执行多个命令的过程。在本章中,您将学习如何在 Spring Boot 应用程序中创建批处理服务。

让我们考虑一个示例,我们要将 CSV 文件内容保存到 HSQLDB 中。

要创建 Batch Service 程序,我们需要在构建配置文件中添加 Spring Boot Starter Batch 依赖项和 HSQLDB 依赖项。

Maven 用户可以在 pom.xml 文件中添加以下依赖项。

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
   <groupId>org.hsqldb</groupId>
   <artifactId>hsqldb</artifactId>
</dependency>

Gradle 用户可以在 build.gradle 文件中添加以下依赖项。

compile("org.springframework.boot:spring-boot-starter-batch")
compile("org.hsqldb:hsqldb")

现在,在类路径资源 – src/main/resources 下添加简单的 CSV 数据文件,并将文件命名为 file.csv,如下所示 –

William,John
Mike, Sebastian
Lawarance, Lime

接下来,为HSQLDB编写SQL脚本——classpath资源目录下——request_fail_hystrix_timeout

DROP TABLE USERS IF EXISTS;
CREATE TABLE USERS  (
   user_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
   first_name VARCHAR(20),
   last_name VARCHAR(20)
);

为 USERS 模型创建一个 POJO 类,如下所示 –

package net.zuze.batchservicedemo;
public class User {
   private String lastName;
   private String firstName;

   public User() {
   }
   public User(String firstName, String lastName) {
      this.firstName = firstName;
      this.lastName = lastName;
   }
   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
   public String getFirstName() {
      return firstName;
   }
   public String getLastName() {
      return lastName;
   }
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }

   @Override
   public String toString() {
      return "firstName: " + firstName + ", lastName: " + lastName;
   }   
}

现在,创建一个中间处理器来执行从 CSV 文件读取数据之后和将数据写入 SQL 之前的操作。

package net.zuze.batchservicedemo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

public class UserItemProcessor implements ItemProcessor<User, User> {
   private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class);

   @Override
   public User process(final User user) throws Exception {
      final String firstName = user.getFirstName().toUpperCase();
      final String lastName = user.getLastName().toUpperCase();
      final User transformedPerson = new User(firstName, lastName);

      log.info("Converting (" + user + ") into (" + transformedPerson + ")");
      return transformedPerson;
   }
}

让我们创建一个 Batch 配置文件,从 CSV 读取数据并写入 SQL 文件,如下所示。我们需要在配置类文件中添加@EnableBatchProcessing注解。@EnableBatchProcessing 注解用于为 Spring Boot 应用程序启用批处理操作。

package net.zuze.batchservicedemo;

import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
   @Autowired
   public JobBuilderFactory jobBuilderFactory;

   @Autowired
   public StepBuilderFactory stepBuilderFactory;

   @Autowired
   public DataSource dataSource;

   @Bean
   public FlatFileItemReader<User> reader() {
      FlatFileItemReader<User> reader = new FlatFileItemReader<User>();
      reader.setResource(new ClassPathResource("file.csv"));
      reader.setLineMapper(new DefaultLineMapper<User>() {
         {
            setLineTokenizer(new DelimitedLineTokenizer() {
               {
                  setNames(new String[] { "firstName", "lastName" });
               }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {
               {
                  setTargetType(User.class);
               }
            });
         }
      });
      return reader;
   }
   @Bean
   public UserItemProcessor processor() {
      return new UserItemProcessor();
   }
   @Bean
   public JdbcBatchItemWriter<User> writer() {
      JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<User>();
      writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<User>());
      writer.setSql("INSERT INTO USERS (first_name, last_name) VALUES (:firstName, :lastName)");
      writer.setDataSource(dataSource);
      return writer;
   }
   @Bean
   public Job importUserJob(JobCompletionNotificationListener listener) {
      return jobBuilderFactory.get("importUserJob").incrementer(
         new RunIdIncrementer()).listener(listener).flow(step1()).end().build();
   }
   @Bean
   public Step step1() {
      return stepBuilderFactory.get("step1").<User, User>chunk(10).reader(reader()).processor(processor()).writer(writer()).build();
   }
}

reader ()方法用于从 CSV 文件读取数据,writer() 方法用于将数据写入 SQL。

接下来,我们必须编写一个作业完成通知监听器类 – 用于在作业完成后发出通知。

package net.zuze.batchservicedemo;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
   private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
   private final JdbcTemplate jdbcTemplate;

   @Autowired
   public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
      this.jdbcTemplate = jdbcTemplate;
   }
   @Override
   public void afterJob(JobExecution jobExecution) {
      if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
         log.info("!!! JOB FINISHED !! It's time to verify the results!!");

         List<User> results = jdbcTemplate.query(
            "SELECT first_name, last_name FROM USERS", new RowMapper<User>() {
            
            @Override
            public User mapRow(ResultSet rs, int row) throws SQLException {
               return new User(rs.getString(1), rs.getString(2));
            }
         });

         for (User person : results) {
            log.info("Found <" + person + "> in the database.");
         }
      }
   }
}

现在,创建一个可执行 JAR 文件,并使用以下 Maven 或 Gradle 命令运行 Spring Boot 应用程序。

对于 Maven,使用如下所示的命令 –

mvn clean install

“BUILD SUCCESS”后,您可以在目标目录下找到JAR文件。

对于 Gradle,您可以使用如下所示的命令 –

gradle clean build

“BUILD SUCCESSFUL”后,您可以在build/libs 目录下找到JAR 文件。

使用此处给出的命令运行 JAR 文件 –

java –jar <JARFILE>

您可以在控制台窗口中看到输出,如下所示 –

batch service output in console window

Spring Boot批处理服务 推荐

Git教程09 – Git存储操作

领券有优惠