高级使用示例
本指南通过一个完整的贷款处理示例演示GoBatch的高级用法。完整的示例代码可以在gobatch-example/file_usage目录下找到。
任务功能概述
本示例实现了一个完整的贷款处理批处理任务,主要包含以下步骤:
- 从TSV文件导入贷款交易数据到数据库表
- 基于交易数据生成还款计划
- 统计汇总还款计划数据并生成CSV文件
- 将统计结果文件上传到FTP服务器
前提条件
在开始之前,请确保已满足所有依赖项要求,特别是MySQL数据库的配置。
数据库设置
- 创建必要的表:
-- 贷款交易表
CREATE TABLE t_trade (
id bigint NOT NULL AUTO_INCREMENT,
trade_no varchar(64) NOT NULL,
account_no varchar(60) NOT NULL,
type varchar(20) NOT NULL,
amount decimal(10,2) NOT NULL,
terms int(11) NOT NULL,
interest_rate decimal(10,6) NOT NULL,
trade_time datetime NOT NULL,
status varchar(10) NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uniq_trade_no (trade_no)
);
-- 还款计划表
CREATE TABLE t_repay_plan (
id bigint NOT NULL AUTO_INCREMENT,
account_no varchar(60) NOT NULL,
loan_no varchar(64) NOT NULL,
term int(11) NOT NULL,
principal decimal(10,2) NOT NULL,
interest decimal(10,2) NOT NULL,
init_date date NOT NULL,
repay_date date NOT NULL,
repay_state varchar(10) NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uniq_loan_no_term (loan_no, term)
);
定义数据模型
- 交易模型
type Trade struct {
TradeNo string `order:"0" header:"trade_no"`
AccountNo string `order:"1" header:"account_no"`
Type string `order:"2" header:"type"`
Amount float64 `order:"3" header:"amount"`
Terms int `order:"4" header:"terms"`
InterestRate float64 `order:"5" header:"interest_rate"`
Status string `order:"6" header:"status"`
TradeTime time.Time `order:"7" header:"trade_time"`
}
- 还款计划模型
type RepayPlan struct {
AccountNo string
LoanNo string
Term int
Principal float64
Interest float64
InitDate time.Time
RepayDate time.Time
RepayState string
}
- 统计结果模型
type RepayPlanStats struct {
Term int
TotalPrincipal float64
TotalInterest float64
}
文件处理组件
1. 配置文件模型
var tradeFile = file.FileObjectModel{
FileStore: &file.LocalFileSystem{},
FileName: "res/trade.data",
Type: file.TSV,
Encoding: "utf-8",
Header: false,
ItemPrototype: &Trade{}, // 文件记录自动映射到交易数据模型
}
var statsFileExport = file.FileObjectModel{
FileStore: &file.LocalFileSystem{},
FileName: "res/{date,yyyyMMdd}/stats.csv",
Type: file.CSV,
Encoding: "utf-8",
Checksum: file.MD5,
ItemPrototype: &RepayPlanStats{}, // 文件记录自动映射到统计结果模型
}
var ftp = &file.FTPFileSystem{
Hort: "localhost",
Port: 21,
User: "yourftpuser",
Password: "yourftppassword",
ConnTimeout: time.Second * 10,
}
var copyFileToFtp = file.FileMove{
FromFileName: "res/{date,yyyyMMdd}/stats.csv",
FromFileStore: &file.LocalFileSystem{},
ToFileStore: ftp,
ToFileName: "stats/{date,yyyyMMdd}/stats.csv",
}
var copyChecksumFileToFtp = file.FileMove{
FromFileName: "res/{date,yyyyMMdd}/stats.csv.md5",
FromFileStore: &file.LocalFileSystem{},
ToFileStore: ftp,
ToFileName: "stats/{date,yyyyMMdd}/stats.csv.md5",
}
2. 实现数据处理器
1. 交易数据导入数据库
type tradeImporter struct {
db *sql.DB
}
func (p *tradeImporter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
for _, item := range items {
trade := item.(*Trade)
_, err := p.db.Exec("INSERT INTO t_trade(trade_no, account_no, type, amount, terms, interest_rate, trade_time, status) values (?,?,?,?,?,?,?,?)",
trade.TradeNo, trade.AccountNo, trade.Type, trade.Amount, trade.Terms, trade.InterestRate, trade.TradeTime, trade.Status)
if err != nil {
return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "插入交易记录失败", err)
}
}
return nil
}
2. 根据交易数据生成还款计划
从t_trade表读取交易记录:
import (
"database/sql"
"fmt"
"github.com/chararch/gobatch"
)
type tradeReader struct {
db *sql.DB
}
func (h *tradeReader) Open(execution *gobatch.StepExecution) gobatch.BatchError {
return nil
}
func (h *tradeReader) Close(execution *gobatch.StepExecution) gobatch.BatchError {
return nil
}
func (h *tradeReader) ReadKeys() ([]interface{}, error) {
rows, err := h.db.Query("select id from t_trade")
if err != nil {
return nil, err
}
defer rows.Close()
var result []interface{}
var id int64
for rows.Next() {
err = rows.Scan(&id)
if err != nil {
return nil, err
}
result = append(result, id)
}
return result, nil
}
func (h *tradeReader) ReadItem(key interface{}) (interface{}, error) {
id := int64(0)
switch r := key.(type) {
case int64:
id = r
case float64:
id = int64(r)
default:
return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
}
rows, err := h.db.Query("select trade_no, account_no, type, amount, terms, interest_rate, trade_time, status from t_trade where id = ?", id)
if err != nil {
return nil, err
}
defer rows.Close()
trade := &Trade{}
if rows.Next() {
err = rows.Scan(&trade.TradeNo, &trade.AccountNo, &trade.Type, &trade.Amount, &trade.Terms, &trade.InterestRate, &trade.TradeTime, &trade.Status)
if err != nil {
return nil, err
}
}
return trade, nil
}
根据交易数据计算并生成还款计划:
import (
"database/sql"
"github.com/chararch/gobatch"
"time"
)
type repayPlanHandler struct {
db *sql.DB
}
func (h *repayPlanHandler) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
trade := item.(*Trade)
plans := make([]*RepayPlan, 0)
restPrincipal := trade.Amount
for i := 1; i <= trade.Terms; i++ {
principal := restPrincipal / float64(trade.Terms-i+1)
interest := restPrincipal * trade.InterestRate / 12
repayPlan := &RepayPlan{
AccountNo: trade.AccountNo,
LoanNo: trade.TradeNo,
Term: i,
Principal: principal,
Interest: interest,
InitDate: time.Now(),
RepayDate: time.Now().AddDate(0, 1, 0),
RepayState: "",
CreateTime: time.Now(),
UpdateTime: time.Now(),
}
plans = append(plans, repayPlan)
restPrincipal -= principal
}
return plans, nil
}
func (h *repayPlanHandler) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
for _, item := range items {
plans := item.([]*RepayPlan)
for _, plan := range plans {
_, err := h.db.Exec("INSERT INTO t_repay_plan(account_no, loan_no, term, principal, interest, init_date, repay_date, repay_state) values (?,?,?,?,?,?,?,?)",
plan.AccountNo, plan.LoanNo, plan.Term, plan.Principal, plan.Interest, plan.InitDate, plan.RepayDate, plan.RepayState)
if err != nil {
return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "insert t_repay_plan failed", err)
}
}
}
return nil
}
3. 统计还款计划数据生成CSV文件
import (
"database/sql"
"fmt"
"github.com/chararch/gobatch"
)
type statsHandler struct {
db *sql.DB
}
func (h *statsHandler) Open(execution *gobatch.StepExecution) gobatch.BatchError {
return nil
}
func (h *statsHandler) Close(execution *gobatch.StepExecution) gobatch.BatchError {
return nil
}
func (h *statsHandler) ReadKeys() ([]interface{}, error) {
rows, err := h.db.Query("select distinct(term) as term from t_trade")
if err != nil {
return nil, err
}
defer rows.Close()
var result []interface{}
var id int64
for rows.Next() {
err = rows.Scan(&id)
if err != nil {
return nil, err
}
result = append(result, id)
}
return result, nil
}
func (h *statsHandler) ReadItem(key interface{}) (interface{}, error) {
term := int64(0)
switch r := key.(type) {
case int64:
term = r
case float64:
term = int64(r)
default:
return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
}
rows, err := h.db.Query("select sum(principal) as total_principal, sum(interest) as total_interest from t_repay_plan where term = ?", term)
if err != nil {
return nil, err
}
defer rows.Close()
stats := &RepayPlanStats{}
if rows.Next() {
err = rows.Scan(&stats.Term, &stats.TotalPrincipal, &stats.TotalInterest)
if err != nil {
return nil, err
}
}
return stats, nil
}
func (ss *statsHandler) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
return item, nil
}
构建并运行任务
func buildAndRunJob() {
// 初始化数据库
sqlDb := openDB()
gobatch.SetDB(sqlDb)
gobatch.SetTransactionManager(gobatch.NewTransactionManager(sqlDb))
// 构建步骤
step1 := gobatch.NewStep("import_trade").
ReadFile(tradeFile).
Writer(&tradeImporter{sqlDb}).
Partitions(10).
Build()
step2 := gobatch.NewStep("gen_repay_plan").
Reader(&tradeReader{sqlDb}).
Handler(&repayPlanHandler{sqlDb}).
Partitions(10).
Build()
step3 := gobatch.NewStep("stats").
Reader(&statsHandler{sqlDb}).
Processor(&statsHandler{sqlDb}).
WriteFile(statsFileExport).
Partitions(2).
Build()
step4 := gobatch.NewStep("upload_file_to_ftp").
CopyFile(copyFileToFtp, copyChecksumFileToFtp).
Build()
// 构建并注册任务
job := gobatch.NewJob("accounting_job").
Step(step1, step2, step3, step4).
Build()
gobatch.Register(job)
// 运行任务并传入参数
params, _ := util.JsonString(map[string]interface{}{
"date": time.Now().Format("2006-01-02"),
"rand": time.Now().Nanosecond(),
})
gobatch.Start(context.Background(), job.Name(), params)
}
任务流程说明
导入交易数据 (step1)
- 从TSV文件读取交易数据
- 将数据导入t_trade表
- 使用10个分区并行处理
生成还款计划 (step2)
- 从数据库读取交易记录
- 计算还款计划
- 将计划保存到t_repay_plan表
- 使用并行处理
统计数据导出文件 (step3)
- 汇总还款数据
- 计算统计结果
- 将结果写入CSV文件
上传统计结果到FTP (step4)
- 将统计结果CSV文件和校验文件上传到FTP服务器