package main import ( "context" "database/sql" "fmt" "log" "net/url" "os" "strconv" "strings" "time" "github.com/brianvoe/gofakeit/v7" "github.com/go-sql-driver/mysql" "github.com/gofiber/fiber/v2" "github.com/mroth/weightedrand" "github.com/segmentio/kafka-go" ) func ByteCountSI(b int) string { const unit = 1000 if b < unit { return fmt.Sprintf("%dB", b) } div, exp := int(unit), 0 for n := b / unit; n >= unit; n /= unit { div *= unit exp++ } return fmt.Sprintf("%.1f%cB", float64(b)/float64(div), "kMGTPE"[exp]) } func main() { log.Println("Neptune Project v0.1") DATABASE_HOST, ok := os.LookupEnv("DATABASE_HOST") if !ok { log.Fatalln("DATABASE_HOST_NOT_FOUND") } DATABASE_PORT, ok := os.LookupEnv("DATABASE_PORT") if !ok { log.Fatalln("DATABASE_PORT_NOT_FOUND") } DATABASE_USER, ok := os.LookupEnv("DATABASE_USER") if !ok { log.Fatalln("DATABASE_USER_NOT_FOUND") } DATABASE_PASS, ok := os.LookupEnv("DATABASE_PASS") if !ok { log.Fatalln("DATABASE_PASS_NOT_FOUND") } DATABASE_SCHEMA, ok := os.LookupEnv("DATABASE_SCHEMA") if !ok { log.Fatalln("DATABASE_SCHEMA_NOT_FOUND") } cfg := mysql.Config{ User: DATABASE_USER, Passwd: DATABASE_PASS, Net: "tcp", Addr: DATABASE_HOST + ":" + DATABASE_PORT, DBName: DATABASE_SCHEMA, AllowNativePasswords: true, } db, err := sql.Open("mysql", cfg.FormatDSN()) if err != nil { log.Fatalln("DATABASE_OPEN_FAILED") } err = db.Ping() if err != nil { log.Fatalln("DATABASE_CONNECTION_FAILED") } stopConn := false log.Println("Creating large DB connection pool... Please wait...") for i := 0; i < 7000; i++ { db, err := sql.Open("mysql", cfg.FormatDSN()) if err != nil { log.Println("DATABASE_OPEN_FAILED") } _, err = db.Query("SELECT 1") if err != nil { stopConn = true } } if stopConn { log.Println("DATABASE_CONNECTION_FAILED") log.Println("Ctrl+C to terminate DB connection pool") <-make(chan int) } app := fiber.New(fiber.Config{ DisableStartupMessage: true, }) go func() { app.Post("/api/streams/persist", func(c *fiber.Ctx) error { data := struct { GeneratedUserId string `json:"generated_user_id"` }{} data.GeneratedUserId = gofakeit.UUID() stmt, err := db.Prepare("INSERT INTO user (user_id, preferred_category, created_at) VALUES (?, ?, ?)") if err != nil { log.Println("DATABASE_PREPARE_FAILED") } stmt.Exec(data.GeneratedUserId, "Test Category", time.Now()) stmt.Close() for j := 0; j < 10; j++ { stmt2, err := db.Prepare("INSERT INTO purchase (user_id, purchase_id, item_id, item_category, created_at) VALUES (?, ?, ?, ?, ?)") if err != nil { log.Println("DATABASE_PREPARE_FAILED") } stmt2.Exec(data.GeneratedUserId, gofakeit.UUID(), gofakeit.UUID(), "Test Category", time.Now()) stmt2.Close() } return c.JSON(data) }) for { preferred_categories, _ := weightedrand.NewChooser( weightedrand.Choice{Item: "Development", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Novel", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Cookbook", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Journal", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Art", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "History", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Science", Weight: gofakeit.UintN(10) + 1}, ) for i := 1; i <= 20*60; i++ { stmt, err := db.Prepare("INSERT INTO user (user_id, preferred_category, created_at) VALUES (?, ?, ?)") if err != nil { log.Println("DATABASE_PREPARE_FAILED") } user_id := gofakeit.UUID() preferred_category := preferred_categories.Pick().(string) stmt.Exec(user_id, preferred_category, time.Now()) stmt.Close() for j := 0; j < gofakeit.IntN(6); j++ { stmt2, err := db.Prepare("INSERT INTO purchase (user_id, purchase_id, item_id, item_category, created_at) VALUES (?, ?, ?, ?, ?)") if err != nil { log.Println("DATABASE_PREPARE_FAILED") } preferred_category2 := preferred_categories.Pick().(string) stmt2.Exec(user_id, gofakeit.UUID(), gofakeit.UUID(), preferred_category2, time.Now()) stmt2.Close() } time.Sleep(50 * time.Millisecond) } } }() go func() { KAFKA_BROKERS, ok := os.LookupEnv("KAFKA_BROKERS") if !ok { return } KAFKA_TOPIC, ok := os.LookupEnv("KAFKA_TOPIC") if !ok { log.Fatalln("KAFKA_TOPIC_NOT_FOUND") } writer := &kafka.Writer{ Addr: kafka.TCP(strings.Split(KAFKA_BROKERS, ",")...), Topic: KAFKA_TOPIC, Balancer: &kafka.LeastBytes{}, } app.Post("/api/streams/telemetry", func(c *fiber.Ctx) error { data := struct { GeneratedUserId string `json:"generated_user_id"` GeneratedCategory string `json:"generated_category"` GeneratedSessionId string `json:"generated_session_id"` }{} data.GeneratedUserId = gofakeit.UUID() data.GeneratedCategory = gofakeit.UUID() messages := []kafka.Message{} for i := 0; i < 10; i++ { messages = append(messages, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"login\",\"details\":{\"user_id\":\"%s\",\"user_name\":\"%s\",\"user_age\":%d},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", data.GeneratedUserId, gofakeit.Name(), gofakeit.IntRange(16, 65), "manual_generated", gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }) } for i := 0; i < 5; i++ { messages = append(messages, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", "searchFilterSelector", data.GeneratedCategory, "HIDE_ME", gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }) } for i := 0; i < 5; i++ { messages = append(messages, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", "categorySelector", data.GeneratedCategory, "HIDE_ME", gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }) } for i := 0; i < 5; i++ { messages = append(messages, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":null,\"session_issuer\":null,\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", "categorySelector", data.GeneratedCategory, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }) } data.GeneratedSessionId = gofakeit.UUID() click_events1 := []string{} click_events2 := []string{} for j := 0; j < 5; j++ { click_events1 = append(click_events1, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", "bookItem", gofakeit.Adjective()+" "+gofakeit.Noun(), data.GeneratedCategory, gofakeit.Street(), data.GeneratedSessionId, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))) } for j := 0; j < 5; j++ { click_events2 = append(click_events2, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", "bookItem", gofakeit.Adjective()+" "+gofakeit.Noun(), data.GeneratedCategory, gofakeit.Street(), data.GeneratedSessionId, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))) } messages = append(messages, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"login\",\"details\":{\"user_id\":\"%s\",\"user_name\":\"%s\",\"user_age\":%d},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", gofakeit.Username(), gofakeit.Name(), gofakeit.IntRange(16, 65), data.GeneratedSessionId, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, kafka.Message{ Value: []byte("[" + strings.Join(click_events1, ",") + "]"), }, kafka.Message{ Value: []byte("[" + strings.Join(click_events2, ",") + "]"), }) writer.WriteMessages(context.Background(), messages..., ) return c.JSON(data) }) for { is_anon := gofakeit.IntN(10) > 3 preferred_categories, _ := weightedrand.NewChooser( weightedrand.Choice{Item: "Development", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Novel", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Cookbook", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Journal", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Art", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "History", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "Science", Weight: gofakeit.UintN(10) + 1}, ) for i := 1; i <= 100*60*5; i++ { click_events := []string{} click_event_count := gofakeit.IntRange(1, 5) if is_anon { for j := 0; j < click_event_count; j++ { click_events = append(click_events, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":null,\"session_issuer\":null,\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", gofakeit.RandomString([]string{"bookItem", "audioItem"}), gofakeit.Adjective()+" "+gofakeit.Noun(), preferred_categories.Pick().(string), gofakeit.Street(), gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))) } writer.WriteMessages(context.Background(), kafka.Message{ Value: []byte("[" + strings.Join(click_events, ",") + "]"), }, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":null,\"session_issuer\":null,\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", gofakeit.RandomString([]string{"categorySelector", "searchFilterSelector"}), preferred_categories.Pick(), gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"move\",\"details\":{\"page\":\"/%s/%s\"},\"event_at\":\"%s\"}", gofakeit.Adjective(), gofakeit.Noun(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"error\",\"details\":{\"page\":\"/%s/%s\",\"message\":\"%s\"},\"event_at\":\"%s\"}", gofakeit.Adjective(), gofakeit.Noun(), gofakeit.LoremIpsumSentence(100), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, ) } else { session_id := gofakeit.UUID() for j := 0; j < click_event_count; j++ { click_events = append(click_events, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", gofakeit.RandomString([]string{"bookItem", "audioItem"}), gofakeit.Adjective()+" "+gofakeit.Noun(), preferred_categories.Pick().(string), gofakeit.Street(), session_id, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))) } writer.WriteMessages(context.Background(), kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"login\",\"details\":{\"user_id\":\"%s\",\"user_name\":\"%s\",\"user_age\":%d},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", gofakeit.Username(), gofakeit.Name(), gofakeit.IntRange(16, 65), session_id, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, kafka.Message{ Value: []byte("[" + strings.Join(click_events, ",") + "]"), }, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}", gofakeit.RandomString([]string{"categorySelector", "searchFilterSelector"}), preferred_categories.Pick(), session_id, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"move\",\"details\":{\"page\":\"/%s/%s\"},\"event_at\":\"%s\"}", gofakeit.Adjective(), gofakeit.Noun(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, kafka.Message{ Value: []byte(fmt.Sprintf("{\"type\":\"error\",\"details\":{\"page\":\"/%s/%s\",\"message\":\"%s\"},\"event_at\":\"%s\"}", gofakeit.Adjective(), gofakeit.Noun(), gofakeit.LoremIpsumSentence(100), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))), }, ) } time.Sleep(10 * time.Millisecond) } } }() go func() { app.Post("/api/streams/log", func(c *fiber.Ctx) error { data := struct { GeneratedReferrer string `json:"generated_referrer"` GeneratedCount int `json:"generated_count"` SampledLog string `json:"sampled_log"` }{} data.GeneratedCount = 100000 data.GeneratedReferrer = gofakeit.Noun() + ".test.com" for i := 0; i < data.GeneratedCount; i++ { method := gofakeit.HTTPMethod() referrer := gofakeit.RandomString([]string{"http://", "https://"}) + data.GeneratedReferrer + ":" + gofakeit.RandomString([]string{"80", "443", "8080", "8443", "8000", "5000"}) + "/" + url.PathEscape(gofakeit.Adjective()) + "/" + url.PathEscape(gofakeit.Noun()) if method == "GET" || method == "HEAD" || method == "DELETE" { fmt.Printf("%s - [%s] %s %s %d %s %s %s \"%s\"\n", gofakeit.IPv4Address(), time.Now().Format("2006-01-02T15:04:05.999999 -07:00"), method, "/api/qry/"+gofakeit.Noun()+"?_qtt="+gofakeit.UUID()+"&_st="+url.QueryEscape(gofakeit.StreetName()), gofakeit.HTTPStatusCode(), "100MB", "100s", referrer, gofakeit.UserAgent()) } else { fmt.Printf("%s - [%s] %s %s %d %s %s %s %s \"%s\"\n", gofakeit.IPv4Address(), time.Now().Format("2006-01-02T15:04:05.999999 -07:00"), method, "/api/products?_qcw="+gofakeit.UUID()+"&_rf="+gofakeit.UUID(), gofakeit.HTTPStatusCode(), "100MB", "100MB", "100s", referrer, gofakeit.UserAgent()) } } data.SampledLog = fmt.Sprintf("%s - [%s] %s %s %d %s %s %s %s \"%s\"\n", gofakeit.IPv4Address(), time.Now().Format("2006-01-02T15:04:05.999999 -07:00"), "POST", "/test", 201, "1MB", "1B", "1s", "referrer", "test/"+gofakeit.UUID()) fmt.Print(data.SampledLog) return c.JSON(data) }) for { rxSeed := gofakeit.IntN(50) + 1 txSeed := gofakeit.IntN(100) + 1 lateSeed := gofakeit.IntN(12) + 1 referrers, _ := weightedrand.NewChooser( weightedrand.Choice{Item: "search.example.com", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "ad.unicorncorp.net", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "wiki.spreadinghoax.org", Weight: gofakeit.UintN(10) + 1}, weightedrand.Choice{Item: "books.unicorn.com", Weight: 15}, ) for i := 1; i <= 20*60; i++ { method := gofakeit.HTTPMethod() duration, _ := time.ParseDuration(strconv.Itoa(gofakeit.IntN(100*lateSeed)) + "ms") referrer := gofakeit.RandomString([]string{"http://", "https://"}) + referrers.Pick().(string) + ":" + gofakeit.RandomString([]string{"80", "443", "8080", "8443", "8000", "5000"}) + "/" + url.PathEscape(gofakeit.Adjective()) + "/" + url.PathEscape(gofakeit.Noun()) if method == "GET" || method == "HEAD" || method == "DELETE" { fmt.Printf("%s - [%s] %s %s %d %s %s %s \"%s\"\n", gofakeit.IPv4Address(), time.Now().Format("2006-01-02T15:04:05.999999 -07:00"), method, "/api/qry/"+gofakeit.Noun()+"?_qtt="+gofakeit.UUID()+"&_st="+url.QueryEscape(gofakeit.StreetName()), gofakeit.HTTPStatusCode(), ByteCountSI(gofakeit.IntN(1000*10*txSeed)), duration, referrer, gofakeit.UserAgent()) } else { fmt.Printf("%s - [%s] %s %s %d %s %s %s %s \"%s\"\n", gofakeit.IPv4Address(), time.Now().Format("2006-01-02T15:04:05.999999 -07:00"), method, "/api/products?_qcw="+gofakeit.UUID()+"&_rf="+gofakeit.UUID(), gofakeit.HTTPStatusCode(), ByteCountSI(gofakeit.IntN(100*rxSeed)), ByteCountSI(gofakeit.IntN(1000*txSeed)), duration, referrer, gofakeit.UserAgent()) } time.Sleep(50 * time.Millisecond) } } }() app.Listen(":8080") }