base_from_mysteel_chemical.go 21 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "eta/eta_index_lib/logic"
  6. "eta/eta_index_lib/models"
  7. "eta/eta_index_lib/services/alarm_msg"
  8. "eta/eta_index_lib/utils"
  9. "fmt"
  10. "io"
  11. "net/http"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) {
  17. for _, v := range req.List {
  18. if v.IndexName == "" || v.IndexCode == "" {
  19. continue
  20. }
  21. err = handleIndex(v)
  22. if err != nil {
  23. return
  24. }
  25. }
  26. _ = SetMysteelChemicalEdbInfoUpdateStat(false)
  27. _ = SetEdbSourceStat(false)
  28. return
  29. }
  30. func HandleApiMysteelIndex(req *models.HandleMysteelIndexResp) (errMsg string, err error) {
  31. addIndexCodeList := make([]string, 0)
  32. for _, v := range req.List {
  33. if v.IndexCode == "" {
  34. continue
  35. }
  36. addIndexCodeList = append(addIndexCodeList, v.IndexCode)
  37. }
  38. errMsg, err = HandleApiIndex(addIndexCodeList)
  39. if err != nil {
  40. return
  41. }
  42. _ = SetMysteelChemicalEdbInfoUpdateStat(false)
  43. _ = SetEdbSourceStat(false)
  44. return
  45. }
  46. func ApiCheck() (ok bool, err error) {
  47. item, err := getPageIndexInfoMap(1, 1, true)
  48. if err != nil {
  49. if err.Error() == "406" {
  50. return false, nil
  51. }
  52. if item != nil && item.Code == "100006" {
  53. return false, nil
  54. }
  55. return
  56. }
  57. if item != nil && item.Code == "100006" {
  58. return false, nil
  59. }
  60. return true, nil
  61. }
  62. func HandleApiIndex(indexCodes []string) (errMsg string, err error) {
  63. if len(indexCodes) == 0 {
  64. return
  65. }
  66. resp, err := GetEdbDataFromMySteelChemical(indexCodes, utils.GetEdbRefreshStartDate(""), utils.BASE_END_DATE, "desc")
  67. if err != nil {
  68. return
  69. }
  70. if !resp.Success {
  71. errMsg = "获取数据失败"
  72. err = errors.New(resp.Message)
  73. return
  74. }
  75. indexInfoMap, err := GetMySteelChemicalIndexNameMap()
  76. if err != nil {
  77. errMsg = "获取指标数据失败"
  78. return
  79. }
  80. indexObj := &models.BaseFromMysteelChemicalIndex{}
  81. existIndexs, err := indexObj.GetBatchIndexItem(indexCodes)
  82. if err != nil {
  83. errMsg = "获取指标数据失败"
  84. return
  85. }
  86. //获取已存在的所有数据
  87. existDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
  88. existIndexMap := make(map[string]*models.BaseFromMysteelChemicalIndex)
  89. updateDataObj := new(models.BaseFromMysteelChemicalData)
  90. for _, v := range existIndexs {
  91. // 更新指标的名称,单位和频度等信息
  92. if info, ok := indexInfoMap[v.IndexCode]; ok {
  93. v.IndexName = info.IndexName
  94. v.Unit = info.UnitName
  95. v.Frequency = info.FrequencyName
  96. v.ModifyTime = time.Now()
  97. err = v.Update([]string{"index_name", "unit", "frequency", "modify_time"})
  98. if err != nil {
  99. errMsg = "更新指标失败"
  100. return
  101. }
  102. }
  103. if err != nil {
  104. errMsg = "添加指标失败"
  105. return
  106. }
  107. existIndexMap[v.IndexCode] = v
  108. exitDataList, er := updateDataObj.GetIndexDataList(v.IndexCode)
  109. if er != nil {
  110. errMsg = "获取指标数据失败"
  111. err = er
  112. return
  113. }
  114. fmt.Println("exitDataListLen:", len(exitDataList))
  115. for _, v := range exitDataList {
  116. dateStr := v.DataTime.Format(utils.FormatDate)
  117. existDataMap[dateStr] = v
  118. }
  119. }
  120. mysteelChemicalDatas, err := tranformData(resp)
  121. if err != nil {
  122. errMsg = "转换数据失败"
  123. return
  124. }
  125. var indexErr error
  126. var lErr error
  127. defer func() {
  128. if indexErr != nil {
  129. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error())
  130. alarm_msg.SendAlarmMsg(tips, 3)
  131. }
  132. if lErr != nil {
  133. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error())
  134. alarm_msg.SendAlarmMsg(tips, 3)
  135. }
  136. }()
  137. var hasUpdate bool
  138. dataObj := new(models.BaseFromMysteelChemicalData)
  139. for _, items := range mysteelChemicalDatas {
  140. addItems := make([]*models.BaseFromMysteelChemicalData, 0)
  141. for _, v := range items {
  142. dateStr := v.DataTime.Format(utils.FormatDate)
  143. if findData, ok := existDataMap[dateStr]; !ok {
  144. index, ok := existIndexMap[v.IndexCode]
  145. if !ok {
  146. continue
  147. }
  148. v.BaseFromMysteelChemicalIndexId = index.BaseFromMysteelChemicalIndexId
  149. addItems = append(addItems, v)
  150. } else {
  151. if findData != nil && findData.BaseFromMysteelChemicalDataId > 0 && findData.Value != v.Value {
  152. dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
  153. dataObj.Value = v.Value
  154. dataObj.ModifyTime = time.Now()
  155. err = dataObj.Update([]string{"value", "modify_time"})
  156. if err != nil {
  157. errMsg = "更新数据失败"
  158. return
  159. }
  160. hasUpdate = true
  161. }
  162. }
  163. }
  164. err = dataObj.AddV2(addItems)
  165. if err != nil {
  166. return
  167. }
  168. //修改最大最小日期
  169. if len(items) <= 0 {
  170. continue
  171. }
  172. mysteelIndexMaxItem, er := dataObj.GetMysteelIndexInfoMaxAndMinInfo(items[0].IndexCode)
  173. if er == nil && mysteelIndexMaxItem != nil && mysteelIndexMaxItem.IndexCode != "" {
  174. e := dataObj.ModifyMysteelIndexMaxAndMinInfo(items[0].IndexCode, mysteelIndexMaxItem)
  175. if e != nil {
  176. fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error())
  177. utils.FileLog.Info("修改钢联化工的最大最小日期失败,Err:" + e.Error())
  178. }
  179. }
  180. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode)
  181. if e != nil && e.Error() != utils.ErrNoRow() {
  182. indexErr = e
  183. return
  184. }
  185. if edbInfo != nil && edbInfo.EdbInfoId > 0 {
  186. dataUpdateResult := 2
  187. dataUpdateFailedReason := "服务异常"
  188. _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
  189. if logErr != nil {
  190. lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 0, 0)
  191. return
  192. }
  193. if hasUpdate {
  194. dataUpdateResult = 1
  195. dataUpdateFailedReason = ""
  196. } else {
  197. dataUpdateFailedReason = "未刷新到数据"
  198. }
  199. // 添加刷新成功日志
  200. lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 0, 0)
  201. if lErr != nil {
  202. return
  203. }
  204. }
  205. }
  206. return
  207. }
  208. func tranformData(dataResp *models.MySteelChemicalApiResp) (items [][]*models.BaseFromMysteelChemicalData, err error) {
  209. for _, v := range dataResp.Data {
  210. tmpNewDataMap := make(map[string]int64)
  211. tmpDateDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
  212. tmpDataItems := make([]*models.BaseFromMysteelChemicalData, 0)
  213. for _, vv := range v.DataList {
  214. // 如果数据值为null那么直接忽略
  215. if !vv.DataValue.Valid {
  216. continue
  217. }
  218. tmpData := new(models.BaseFromMysteelChemicalData)
  219. tmpData.IndexCode = vv.IndexCode
  220. // 如果存在多条数据,则取发布时间最新的数据
  221. pub, ok := tmpNewDataMap[vv.DataDate]
  222. if !ok {
  223. tmpNewDataMap[vv.DataDate] = vv.PublishTime
  224. tmpData.Value = strconv.FormatFloat(vv.DataValue.Float64, 'f', -1, 64)
  225. } else {
  226. if pub < vv.PublishTime {
  227. tmpNewDataMap[vv.DataDate] = vv.PublishTime
  228. tmpData = tmpDateDataMap[vv.DataDate]
  229. tmpData.Value = strconv.FormatFloat(vv.DataValue.Float64, 'f', -1, 64)
  230. }
  231. continue
  232. }
  233. dataDate, er := time.Parse(utils.FormatDate, vv.DataDate)
  234. if er != nil {
  235. err = er
  236. return
  237. }
  238. tmpData.DataTime = dataDate
  239. tmpData.CreateTime = time.Now()
  240. tmpData.ModifyTime = time.Now()
  241. tmpDataItems = append(tmpDataItems, tmpData)
  242. tmpDateDataMap[vv.DataDate] = tmpData
  243. }
  244. items = append(items, tmpDataItems)
  245. }
  246. return
  247. }
  248. func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
  249. defer func() {
  250. if err != nil {
  251. // 添加刷新失败日志
  252. dataUpdateResult := 2
  253. dataUpdateFailedReason := "服务异常"
  254. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexItem.IndexCode)
  255. if e == nil {
  256. //查询指标存在,才添加刷新日志
  257. _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
  258. }
  259. }
  260. }()
  261. indexObj := new(models.BaseFromMysteelChemicalIndex)
  262. var indexId int64
  263. addDataList := make([]models.BaseFromMysteelChemicalData, 0)
  264. exitDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
  265. //判断指标是否存在
  266. var isAdd int
  267. item, err := indexObj.GetIndexItem(indexItem.IndexCode)
  268. if err != nil {
  269. if err.Error() == utils.ErrNoRow() {
  270. isAdd = 1
  271. } else {
  272. isAdd = -1
  273. return
  274. }
  275. }
  276. nameChange := false
  277. if item != nil && item.BaseFromMysteelChemicalIndexId > 0 {
  278. isAdd = 2
  279. if item.IndexName != indexItem.IndexName {
  280. nameChange = true
  281. }
  282. } else {
  283. isAdd = 1
  284. }
  285. fmt.Println("isAdd:", isAdd)
  286. if !strings.Contains(indexItem.Frequency, "度") {
  287. indexItem.Frequency = indexItem.Frequency + "度"
  288. }
  289. if isAdd == 1 {
  290. indexObj.IndexCode = indexItem.IndexCode
  291. indexObj.IndexName = indexItem.IndexName
  292. indexObj.Unit = indexItem.Unit
  293. indexObj.Source = indexItem.Source
  294. indexObj.Describe = indexItem.Describe
  295. indexObj.StartDate, err = utils.DealExcelDate(indexItem.StartDate)
  296. if err != nil {
  297. fmt.Println("utils.DealExcelDate err:" + indexItem.StartDate + err.Error())
  298. return
  299. }
  300. indexObj.EndDate, err = utils.DealExcelDate(indexItem.EndDate)
  301. if err != nil {
  302. fmt.Println("utils.DealExcelDate err:" + indexItem.EndDate + err.Error())
  303. return
  304. }
  305. indexObj.Frequency = indexItem.Frequency
  306. //indexObj.CreateTime = time.Now().Local()
  307. //indexObj.ModifyTime = time.Now().Local()
  308. err = indexObj.Add()
  309. if err != nil {
  310. fmt.Println("add err:" + err.Error())
  311. return
  312. }
  313. indexId = indexObj.BaseFromMysteelChemicalIndexId
  314. } else if isAdd == 2 {
  315. indexObj.BaseFromMysteelChemicalIndexId = item.BaseFromMysteelChemicalIndexId
  316. indexObj.IndexCode = indexItem.IndexCode
  317. indexObj.IndexName = indexItem.IndexName
  318. indexObj.Unit = indexItem.Unit
  319. indexObj.Source = indexItem.Source
  320. indexObj.Describe = indexItem.Describe
  321. indexObj.StartDate, err = utils.DealExcelDate(indexItem.StartDate)
  322. if err != nil {
  323. fmt.Println("utils.DealExcelDate err:" + indexItem.StartDate + err.Error())
  324. return
  325. }
  326. indexObj.EndDate, err = utils.DealExcelDate(indexItem.EndDate)
  327. if err != nil {
  328. fmt.Println("utils.DealExcelDate err:" + indexItem.EndDate + err.Error())
  329. return
  330. }
  331. indexObj.Frequency = indexItem.Frequency
  332. indexObj.ModifyTime = time.Now()
  333. indexId = item.BaseFromMysteelChemicalIndexId
  334. indexObj.IsSupplierStop = item.IsSupplierStop
  335. var isStop int
  336. if strings.Contains(indexItem.IndexName, "停") {
  337. isStop = 1
  338. indexObj.IsSupplierStop = 1
  339. }
  340. indexObj.IsStop = isStop
  341. //修改数据
  342. updateColsArr := make([]string, 0)
  343. updateColsArr = append(updateColsArr, "index_name")
  344. updateColsArr = append(updateColsArr, "unit")
  345. updateColsArr = append(updateColsArr, "source")
  346. updateColsArr = append(updateColsArr, "frequency")
  347. updateColsArr = append(updateColsArr, "start_date")
  348. updateColsArr = append(updateColsArr, "end_date")
  349. updateColsArr = append(updateColsArr, "describe")
  350. updateColsArr = append(updateColsArr, "end_date")
  351. updateColsArr = append(updateColsArr, "is_stop")
  352. updateColsArr = append(updateColsArr, "is_supplier_stop")
  353. updateColsArr = append(updateColsArr, "modify_time")
  354. e := indexObj.Update(updateColsArr)
  355. if e != nil {
  356. fmt.Println("Index Update Err:" + e.Error())
  357. return
  358. }
  359. if item.IndexName != indexItem.IndexName {
  360. var changeRecord models.BaseFromMysteelChemicalRecord
  361. changeRecord.BaseFromMysteelChemicalIndexId = item.BaseFromMysteelChemicalIndexId
  362. changeRecord.OldIndexName = item.IndexName
  363. changeRecord.NewIndexName = indexItem.IndexName
  364. ctime := time.Now()
  365. changeRecord.CreateTime = ctime
  366. changeRecord.Timestamp = ctime.Unix()
  367. e = changeRecord.AddBaseFromMysteelChemicalRecord()
  368. if e != nil {
  369. fmt.Println("mysteel chemical changeRecord Add Err:" + e.Error())
  370. return
  371. }
  372. }
  373. dataObj := new(models.BaseFromMysteelChemicalData)
  374. //获取已存在的所有数据
  375. exitDataList, err := dataObj.GetIndexDataList(indexItem.IndexCode)
  376. if err != nil {
  377. fmt.Println("GetIndexDataList Err:" + err.Error())
  378. return err
  379. }
  380. fmt.Println("exitDataListLen:", len(exitDataList))
  381. for _, v := range exitDataList {
  382. dateStr := v.DataTime.Format(utils.FormatDate)
  383. exitDataMap[dateStr] = v
  384. }
  385. }
  386. dataObj := new(models.BaseFromMysteelChemicalData)
  387. var hasUpdate bool
  388. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  389. for date, value := range indexItem.ExcelDataMap {
  390. dateTime, err := utils.DealExcelDate(date)
  391. if err != nil {
  392. fmt.Println("time.ParseInLocation Err:" + err.Error())
  393. return err
  394. }
  395. date = dateTime.Format(utils.FormatDate)
  396. if findData, ok := exitDataMap[date]; !ok {
  397. if !strings.Contains(value, "#N/A") {
  398. dataItem := new(models.BaseFromMysteelChemicalData)
  399. dataItem.BaseFromMysteelChemicalIndexId = indexId
  400. dataItem.IndexCode = indexItem.IndexCode
  401. dataItem.DataTime = dateTime
  402. dataItem.Value = value
  403. dataItem.UpdateDate = indexItem.UpdateDate
  404. dataItem.CreateTime = time.Now()
  405. dataItem.ModifyTime = time.Now()
  406. addDataList = append(addDataList, *dataItem)
  407. }
  408. } else {
  409. if findData != nil && findData.BaseFromMysteelChemicalDataId > 0 && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  410. dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
  411. dataObj.Value = value
  412. dataObj.ModifyTime = time.Now()
  413. updateDataColsArr := make([]string, 0)
  414. updateDataColsArr = append(updateDataColsArr, "value")
  415. updateDataColsArr = append(updateDataColsArr, "modify_time")
  416. dataObj.Update(updateDataColsArr)
  417. hasUpdate = true
  418. }
  419. }
  420. }
  421. if len(addDataList) > 0 {
  422. err = dataObj.Add(addDataList)
  423. if err != nil {
  424. fmt.Println("dataObj.Add() Err:" + err.Error())
  425. return
  426. }
  427. hasUpdate = true
  428. }
  429. //修改最大最小日期
  430. mysteelIndexMaxItem, err := dataObj.GetMysteelIndexInfoMaxAndMinInfo(indexItem.IndexCode)
  431. if err == nil && mysteelIndexMaxItem != nil && mysteelIndexMaxItem.IndexCode != `` {
  432. e := dataObj.ModifyMysteelIndexMaxAndMinInfo(indexItem.IndexCode, mysteelIndexMaxItem)
  433. if e != nil {
  434. fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error())
  435. }
  436. }
  437. // 同步刷新图库钢联的指标
  438. //go func() {
  439. var indexErr error
  440. var lErr error
  441. defer func() {
  442. if indexErr != nil {
  443. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error())
  444. alarm_msg.SendAlarmMsg(tips, 3)
  445. }
  446. if lErr != nil {
  447. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error())
  448. alarm_msg.SendAlarmMsg(tips, 3)
  449. }
  450. }()
  451. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode)
  452. if e != nil && e.Error() != utils.ErrNoRow() {
  453. indexErr = e
  454. return
  455. }
  456. if edbInfo != nil && edbInfo.EdbInfoId > 0 {
  457. dataUpdateResult := 2
  458. dataUpdateFailedReason := "服务异常"
  459. _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
  460. if logErr != nil {
  461. lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
  462. return
  463. }
  464. if hasUpdate {
  465. dataUpdateResult = 1
  466. dataUpdateFailedReason = ""
  467. } else {
  468. dataUpdateFailedReason = "未刷新到数据"
  469. }
  470. // 添加刷新成功日志
  471. lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 1, 0)
  472. if lErr != nil {
  473. return
  474. }
  475. //如果变更了指标名称,则添加指标信息变更日志
  476. if nameChange {
  477. edbInfo.SourceIndexName = indexItem.IndexName
  478. lErr = edbInfo.Update([]string{"SourceIndexName"})
  479. if lErr != nil {
  480. return
  481. }
  482. lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 0, "", 0, 1)
  483. }
  484. }
  485. //}()
  486. return
  487. }
  488. type MySteelChemicalApiDataBody struct {
  489. IndexCodes []string `json:"indexCodes"`
  490. StartTime string `json:"startTime"`
  491. EndTime string `json:"endTime"`
  492. Order string `json:"order"`
  493. }
  494. type MySteelChemicalApiInfoBody struct {
  495. PageNum int `json:"pageNum"`
  496. PageSize int `json:"pageSize"`
  497. IncludeInfo bool `json:"includeInfo"`
  498. }
  499. // GetEdbDataFromMySteelChemical 批量获得钢联化工的指标数据
  500. func GetEdbDataFromMySteelChemical(indexCodes []string, startTime, endTime, order string) (item *models.MySteelChemicalApiResp, err error) {
  501. if utils.MysteelChemicalApiToken == "" {
  502. err = errors.New("钢联接口token未配置")
  503. return
  504. }
  505. m := new(MySteelChemicalApiDataBody)
  506. m.IndexCodes = indexCodes
  507. m.StartTime = startTime
  508. m.EndTime = endTime
  509. m.Order = order
  510. postData, er := json.Marshal(m)
  511. if er != nil {
  512. err = er
  513. return
  514. }
  515. // postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
  516. postUrl := utils.MySteelChemicalApiUrl
  517. if postUrl == "" {
  518. err = errors.New("钢联化工接口url未配置")
  519. return
  520. }
  521. body, err := MySteelChemicalPost(postUrl, "data", postData)
  522. if err != nil {
  523. return
  524. }
  525. err = json.Unmarshal(body, &item)
  526. if err != nil {
  527. return
  528. }
  529. return
  530. }
  531. // GetMySteelChemicalIndexNameMap 获取钢联化工的所有指标的信息
  532. func GetMySteelChemicalIndexNameMap() (indexNameMap map[string]*models.MySteelChemicalApiInfoItem, err error) {
  533. if utils.MysteelChemicalApiToken == "" {
  534. err = errors.New("钢联接口token未配置")
  535. return
  536. }
  537. item, err := getPageIndexInfoMap(1, 200, true)
  538. if err != nil {
  539. return
  540. }
  541. indexNameMap = make(map[string]*models.MySteelChemicalApiInfoItem)
  542. for _, v := range item.Data.List {
  543. indexNameMap[v.IndexCode] = v
  544. }
  545. // 如果总条数大于200,则继续获取
  546. if item.Data.Total > 200 || item.Data.Pages > 1 {
  547. for i := 2; i <= item.Data.Pages; i++ {
  548. item, err = getPageIndexInfoMap(i, 200, true)
  549. if err != nil {
  550. return
  551. }
  552. for _, v := range item.Data.List {
  553. indexNameMap[v.IndexCode] = v
  554. }
  555. }
  556. return
  557. }
  558. return
  559. }
  560. func getPageIndexInfoMap(pageNum, pageSize int, includeInfo bool) (item *models.MySteelChemicalApiInfoResp, err error) {
  561. m := new(MySteelChemicalApiInfoBody)
  562. m.PageNum = pageNum
  563. m.PageSize = pageSize
  564. m.IncludeInfo = includeInfo
  565. postData, er := json.Marshal(m)
  566. if er != nil {
  567. err = er
  568. return
  569. }
  570. // postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
  571. postUrl := utils.MySteelChemicalApiUrl
  572. if postUrl == "" {
  573. err = errors.New("钢联化工接口url未配置")
  574. return
  575. }
  576. body, er := MySteelChemicalPost(postUrl, "info", postData)
  577. if er != nil {
  578. err = er
  579. return
  580. }
  581. err = json.Unmarshal(body, &item)
  582. if err != nil {
  583. return
  584. }
  585. if !item.Success {
  586. err = errors.New(item.Message)
  587. utils.FileLog.Info("code:" + item.Code + " message:" + item.Message)
  588. return
  589. }
  590. return
  591. }
  592. func MySteelChemicalPost(postUrl, hType string, postData []byte) (body []byte, err error) {
  593. req, er := http.NewRequest(`POST`, postUrl, strings.NewReader(string(postData)))
  594. if er != nil {
  595. err = er
  596. return
  597. }
  598. req.Header.Set(`Content-Type`, `application/json`)
  599. req.Header.Set(`accessTokenSign`, utils.MysteelChemicalApiToken)
  600. req.Header.Set(`infoOrData`, hType)
  601. client := &http.Client{}
  602. resp, er := client.Do(req)
  603. if er != nil {
  604. err = er
  605. return
  606. }
  607. defer resp.Body.Close()
  608. body, err = io.ReadAll(resp.Body)
  609. if err != nil {
  610. return
  611. }
  612. return
  613. }
  614. func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err error) {
  615. indexObj := &models.BaseFromMysteelChemicalIndex{}
  616. tmpIndex, err := indexObj.GetIndexItem(edbCode)
  617. if err != nil {
  618. return
  619. }
  620. terminal, err := GetTerminal(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, tmpIndex.TerminalCode)
  621. if err != nil {
  622. err = fmt.Errorf("获取钢联化工接口配置出错 Err: %s", err)
  623. return
  624. }
  625. if tmpIndex.TerminalCode == "" {
  626. // 设置指标与终端关系的缓存
  627. terminalCodeCacheKey := utils.CACHE_EDB_TERMINAL_CODE_URL + edbCode
  628. _ = utils.Rc.Put(terminalCodeCacheKey, terminal.TerminalCode, utils.GetTodayLastSecond())
  629. }
  630. // 如果配置了api的token, 那么就走api接口
  631. if utils.MysteelChemicalApiToken != "" {
  632. resp, er := GetEdbDataFromMySteelChemical([]string{edbCode}, startDate, endDate, "desc")
  633. if er != nil {
  634. err = er
  635. return
  636. }
  637. if !resp.Success {
  638. err = errors.New(resp.Message)
  639. return
  640. }
  641. dataObj := new(models.BaseFromMysteelChemicalData)
  642. exitDataList, er := dataObj.GetIndexDataList(edbCode)
  643. if er != nil {
  644. err = er
  645. return
  646. }
  647. existDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
  648. for _, v := range exitDataList {
  649. dateStr := v.DataTime.Format(utils.FormatDate)
  650. existDataMap[dateStr] = v
  651. }
  652. mysteelChemicalDatas, er := tranformData(resp)
  653. if er != nil {
  654. err = er
  655. return
  656. }
  657. addItems := make([]*models.BaseFromMysteelChemicalData, 0)
  658. indexObj := &models.BaseFromMysteelChemicalIndex{}
  659. existIndex, er := indexObj.GetIndexItem(edbCode)
  660. if er != nil {
  661. err = er
  662. return
  663. }
  664. if len(mysteelChemicalDatas) == 0 {
  665. err = errors.New("没有获取到数据")
  666. return
  667. }
  668. // 因为只有一个指标,所以取第一个就可以了
  669. items := mysteelChemicalDatas[0]
  670. for _, v := range items {
  671. dateStr := v.DataTime.Format(utils.FormatDate)
  672. if findData, ok := existDataMap[dateStr]; !ok {
  673. v.BaseFromMysteelChemicalIndexId = existIndex.BaseFromMysteelChemicalIndexId
  674. addItems = append(addItems, v)
  675. } else {
  676. if findData != nil && findData.BaseFromMysteelChemicalDataId > 0 && findData.Value != v.Value {
  677. dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
  678. dataObj.Value = v.Value
  679. dataObj.ModifyTime = time.Now()
  680. err = dataObj.Update([]string{"value", "modify_time"})
  681. if err != nil {
  682. return
  683. }
  684. }
  685. }
  686. }
  687. err = dataObj.AddV2(addItems)
  688. if err != nil {
  689. return
  690. }
  691. return
  692. }
  693. return
  694. }