package binlog

import (
	dataSourceModel "eta/eta_api/models/data_source"
	edbmonitorSvr "eta/eta_api/services/edb_monitor"
	"eta/eta_api/utils"
	"fmt"
	"reflect"
	"runtime/debug"
	"time"

	"github.com/go-mysql-org/go-mysql/canal"
	"github.com/go-mysql-org/go-mysql/mysql"
	"github.com/go-mysql-org/go-mysql/replication"
	"github.com/go-mysql-org/go-mysql/schema"
)

type EdbEventHandler struct {
	canal.DummyEventHandler
	fileName string
	position uint32
}

func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
	// 下面有可能出现panic导致监听挂掉
	defer func() {
		if r := recover(); r != nil {
			utils.FileLog.Error("binlog OnRow panic: ", r)

			stackTrace := debug.Stack()
			utils.FileLog.Error("binlog OnRow panic stack: ", string(stackTrace))
		}
	}()

	// 监听逻辑
	switch e.Action {
	case canal.InsertAction:
		err = h.Insert(e)
		if err != nil {
			utils.FileLog.Info("binlog insert error:", err)
		}
	case canal.UpdateAction:
		err = h.Update(e)
		if err != nil {
			utils.FileLog.Info("binlog update error:", err)
		}
	default:
		return nil
	}

	fmt.Println("fileName:", h.fileName, ";position:", h.position)

	// 每次操作完成后都将当前位置记录到缓存
	utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
	utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)

	return nil
}

func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
	h.fileName = p.Name
	h.position = p.Pos

	return nil
}

func (h *EdbEventHandler) SyncToRedis() {
	for {
		// 旋转binlog日志的时候,需要将当前位置记录到缓存
		utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
		utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
		time.Sleep(3 * time.Second)
	}
}

func (h *EdbEventHandler) String() string {
	return "EdbEventHandler"
}

func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
	// 批量插入的时候,e.Rows的长度会大于0
	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)

	// 指标库
	tableName := e.Table.Name
	if tableName == "edb_info" {
		for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
			newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
			if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
				err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
				if err != nil {
					return err
				}
			} else {
				ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
				if err != nil {
					return err
				}
				if ok {
					err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
					if err != nil {
						return err
					}
				}
			}
			// if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
			// 	for _, monitor := range monitors {
			// 		err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
			// 		if err != nil {
			// 			continue
			// 		}
			// 	}
			// }
		}
		return nil
	}

	// 数据源
	indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
	if indexOb == nil {
		return fmt.Errorf("数据表无对应数据源: %s", tableName)
	}
	for _, row := range e.Rows {
		indexItem := DataSourceMapRowToStruct(e.Table.Columns, row, indexOb)
		// 写入队列(此处无需做去重处理)
		if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
			return fmt.Errorf("写入redis队列失败, %v", e)
		}
	}
	return nil
}

func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
	tableName := e.Table.Name

	// 指标库
	lenRows := len(e.Rows)
	if tableName == "edb_info" {
		//if len(e.Rows) != 2 {
		//	fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
		//	return nil
		//}

		// 由于UPDATE语句影响行数超过1时e.Rows长度会大于2,所以此处遍历处理
		for i := 0; i < lenRows; i += 2 {
			if i+1 >= lenRows {
				continue
			}
			oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i])
			newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i+1])
			if oldEdbInfo.EndValue != newEdbInfo.EndValue {
				if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
					err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
					if err != nil {
						return err
					}
				} else {
					ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
					if err != nil {
						return err
					}
					if ok {
						err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
						if err != nil {
							return err
						}
					}
				}
			}
		}

		// if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
		// 	for _, monitor := range monitors {
		// 		err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
		// 		if err != nil {
		// 			continue
		// 		}
		// 	}
		// }
		return nil
	}

	// 数据源
	indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
	if indexOb == nil {
		return fmt.Errorf("数据表无对应数据源: %s", tableName)
	}
	for i := 0; i < lenRows; i += 2 {
		if i+1 >= lenRows {
			continue
		}
		// 这里只取[i+1]即UPDATE后的数据
		indexItem := DataSourceMapRowToStruct(e.Table.Columns, e.Rows[i+1], indexOb)
		if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
			utils.FileLog.Info(fmt.Sprintf("binlog update data source lpush err: %v", e))
			continue
		}
	}
	return nil
}

func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
	defer func() {
		if r := recover(); r != nil {
			utils.FileLog.Info(fmt.Sprintf("binlog update data source panic;data:%v; err: %v", columns, r))
		}
	}()
	newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
	for i, column := range columns {
		value := reflect.ValueOf(row[i])
		// 数据无效的话,那么就过滤掉
		if !value.IsValid() {
			continue
		}
		switch column.Name {
		case "edb_info_id":
			newEdbInfo.EdbInfoId = int(value.Int())
		case "edb_info_type":
			newEdbInfo.EdbInfoType = int(value.Uint())
		case "source":
			newEdbInfo.Source = int(value.Int())
		case "edb_code":
			newEdbInfo.EdbCode = value.String()
		case "start_date":
			newEdbInfo.StartDate = value.String()
		case "end_date":
			newEdbInfo.EndDate = value.String()
		case "unique_code":
			newEdbInfo.UniqueCode = value.String()
		case "create_time":
			newEdbInfo.CreateTime = value.String()
		case "modify_time":
			newEdbInfo.ModifyTime = value.String()
		case "base_modify_time":
			newEdbInfo.BaseModifyTime = value.String()
		case "min_value":
			newEdbInfo.MinValue = value.Float()
		case "max_value":
			newEdbInfo.MaxValue = value.Float()
		case "latest_date":
			newEdbInfo.LatestDate = value.String()
		case "latest_value":
			newEdbInfo.LatestValue = value.Float()
		case "end_value":
			newEdbInfo.EndValue = value.Float()
		case "data_update_time":
			newEdbInfo.DataUpdateTime = value.String()
		case "er_data_update_date":
			newEdbInfo.ErDataUpdateDate = value.String()
		case "sub_source":
			newEdbInfo.SubSource = int(value.Int())
		default:
			continue
		}
	}
	return newEdbInfo
}

// SetBinlogFileName
// @Description: 设置当前的binlog文件名和位置
// @author: Roc
// @receiver h
// @datetime 2024-02-29 18:09:36
// @param fileName string
// @param position uint32
func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
	h.fileName = fileName
	h.position = position

	fmt.Println("init fileName:", h.fileName, ";position:", h.position)
}

// DataSourceMapRowToStruct 数据源-binlog转为es结构体
func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, indexOb dataSourceModel.EsBaseFromIndex) dataSourceModel.SearchDataSource {
	item := dataSourceModel.SearchDataSource{}
	source, subSource, sourceName := indexOb.SourceInfo()
	item.Source = source
	item.SubSource = subSource
	item.SourceName = sourceName

	// 根据不同数据源匹配对应的字段名
	indexCols := indexOb.EsCols()
	for i, column := range columns {
		// 数据无效的话,那么就过滤掉
		value := reflect.ValueOf(row[i])
		if !value.IsValid() {
			continue
		}

		switch column.Name {
		case indexCols.PrimaryId:
			if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
				item.PrimaryId = int(value.Int())
			}
			if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
				item.PrimaryId = int(value.Uint())
			}
		case indexCols.IndexCode:
			item.IndexCode = value.String()
		case indexCols.IndexName:
			item.IndexName = value.String()
		case indexCols.ClassifyId:
			if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
				item.ClassifyId = int(value.Int())
			}
			if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
				item.ClassifyId = int(value.Uint())
			}
		case indexCols.Unit:
			item.Unit = value.String()
		case indexCols.Frequency:
			item.Frequency = value.String()
		case indexCols.StartDate:
			item.StartDate = value.String()
		case indexCols.EndDate:
			item.EndDate = value.String()
		case indexCols.LatestValue:
			if value.Kind() == reflect.String {
				item.LatestValue = value.String()
			}
			if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
				item.LatestValue = fmt.Sprint(value.Int())
			}
			if value.Kind() == reflect.Float64 {
				item.LatestValue = fmt.Sprint(value.Float())
			}
		case indexCols.CreateTime:
			item.CreateTime = value.String()
		case indexCols.ModifyTime:
			item.ModifyTime = value.String()
		default:
			continue
		}
	}
	return item
}