neptune_project/main.go

453 lines
18 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"log"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/brianvoe/gofakeit/v7"
"github.com/go-sql-driver/mysql"
"github.com/gofiber/fiber/v2"
"github.com/mroth/weightedrand"
"github.com/segmentio/kafka-go"
)
func ByteCountSI(b int) string {
const unit = 1000
if b < unit {
return fmt.Sprintf("%dB", b)
}
div, exp := int(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f%cB",
float64(b)/float64(div), "kMGTPE"[exp])
}
func main() {
log.Println("Neptune Project v0.1")
DATABASE_HOST, ok := os.LookupEnv("DATABASE_HOST")
if !ok {
log.Fatalln("DATABASE_HOST_NOT_FOUND")
}
DATABASE_PORT, ok := os.LookupEnv("DATABASE_PORT")
if !ok {
log.Fatalln("DATABASE_PORT_NOT_FOUND")
}
DATABASE_USER, ok := os.LookupEnv("DATABASE_USER")
if !ok {
log.Fatalln("DATABASE_USER_NOT_FOUND")
}
DATABASE_PASS, ok := os.LookupEnv("DATABASE_PASS")
if !ok {
log.Fatalln("DATABASE_PASS_NOT_FOUND")
}
DATABASE_SCHEMA, ok := os.LookupEnv("DATABASE_SCHEMA")
if !ok {
log.Fatalln("DATABASE_SCHEMA_NOT_FOUND")
}
cfg := mysql.Config{
User: DATABASE_USER,
Passwd: DATABASE_PASS,
Net: "tcp",
Addr: DATABASE_HOST + ":" + DATABASE_PORT,
DBName: DATABASE_SCHEMA,
AllowNativePasswords: true,
}
db, err := sql.Open("mysql", cfg.FormatDSN())
if err != nil {
log.Fatalln("DATABASE_OPEN_FAILED")
}
err = db.Ping()
if err != nil {
log.Fatalln("DATABASE_CONNECTION_FAILED")
}
stopConn := false
log.Println("Creating large DB connection pool... Please wait...")
for i := 0; i < 7000; i++ {
db, err := sql.Open("mysql", cfg.FormatDSN())
if err != nil {
log.Println("DATABASE_OPEN_FAILED")
}
_, err = db.Query("SELECT 1")
if err != nil {
stopConn = true
}
}
if stopConn {
log.Println("DATABASE_CONNECTION_FAILED")
log.Println("Ctrl+C to terminate DB connection pool")
<-make(chan int)
}
app := fiber.New(fiber.Config{
DisableStartupMessage: true,
})
app.Post("/api/streams/persist", func(c *fiber.Ctx) error {
data := struct {
GeneratedUserId string `json:"generated_user_id"`
}{}
data.GeneratedUserId = gofakeit.UUID()
stmt, err := db.Prepare("INSERT INTO user (user_id, preferred_category, created_at) VALUES (?, ?, ?)")
if err != nil {
log.Println("DATABASE_PREPARE_FAILED")
}
stmt.Exec(data.GeneratedUserId, "Test Category", time.Now())
stmt.Close()
for j := 0; j < 10; j++ {
stmt2, err := db.Prepare("INSERT INTO purchase (user_id, purchase_id, item_id, item_category, created_at) VALUES (?, ?, ?, ?, ?)")
if err != nil {
log.Println("DATABASE_PREPARE_FAILED")
}
stmt2.Exec(data.GeneratedUserId, gofakeit.UUID(), gofakeit.UUID(), "Test Category", time.Now())
stmt2.Close()
}
return c.JSON(data)
})
go func() {
for {
preferred_categories, _ := weightedrand.NewChooser(
weightedrand.Choice{Item: "Development", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Novel", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Cookbook", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Journal", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Art", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "History", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Science", Weight: gofakeit.UintN(10) + 1},
)
for i := 1; i <= 20*60; i++ {
stmt, err := db.Prepare("INSERT INTO user (user_id, preferred_category, created_at) VALUES (?, ?, ?)")
if err != nil {
log.Println("DATABASE_PREPARE_FAILED")
}
user_id := gofakeit.UUID()
preferred_category := preferred_categories.Pick().(string)
stmt.Exec(user_id, preferred_category, time.Now())
stmt.Close()
for j := 0; j < gofakeit.IntN(6); j++ {
stmt2, err := db.Prepare("INSERT INTO purchase (user_id, purchase_id, item_id, item_category, created_at) VALUES (?, ?, ?, ?, ?)")
if err != nil {
log.Println("DATABASE_PREPARE_FAILED")
}
preferred_category2 := preferred_categories.Pick().(string)
stmt2.Exec(user_id, gofakeit.UUID(), gofakeit.UUID(), preferred_category2, time.Now())
stmt2.Close()
}
time.Sleep(50 * time.Millisecond)
}
}
}()
func() {
KAFKA_BROKERS, ok := os.LookupEnv("KAFKA_BROKERS")
if !ok {
return
}
KAFKA_TOPIC, ok := os.LookupEnv("KAFKA_TOPIC")
if !ok {
log.Fatalln("KAFKA_TOPIC_NOT_FOUND")
}
writer := &kafka.Writer{
Addr: kafka.TCP(strings.Split(KAFKA_BROKERS, ",")...),
Topic: KAFKA_TOPIC,
Balancer: &kafka.LeastBytes{},
}
app.Post("/api/streams/telemetry", func(c *fiber.Ctx) error {
data := struct {
GeneratedUserId string `json:"generated_user_id"`
GeneratedCategory string `json:"generated_category"`
GeneratedSessionId string `json:"generated_session_id"`
}{}
data.GeneratedUserId = gofakeit.UUID()
data.GeneratedCategory = gofakeit.UUID()
messages := []kafka.Message{}
for i := 0; i < 10; i++ {
messages = append(messages, kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"login\",\"details\":{\"user_id\":\"%s\",\"user_name\":\"%s\",\"user_age\":%d},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
data.GeneratedUserId, gofakeit.Name(), gofakeit.IntRange(16, 65), "manual_generated", gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
})
}
for i := 0; i < 5; i++ {
messages = append(messages,
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
"searchFilterSelector", data.GeneratedCategory, "HIDE_ME", gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
})
}
for i := 0; i < 5; i++ {
messages = append(messages,
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
"categorySelector", data.GeneratedCategory, "HIDE_ME", gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
})
}
for i := 0; i < 5; i++ {
messages = append(messages,
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":null,\"session_issuer\":null,\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
"categorySelector", data.GeneratedCategory, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
})
}
data.GeneratedSessionId = gofakeit.UUID()
click_events1 := []string{}
click_events2 := []string{}
for j := 0; j < 5; j++ {
click_events1 = append(click_events1, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
"bookItem", gofakeit.Adjective()+" "+gofakeit.Noun(), data.GeneratedCategory, gofakeit.Street(), data.GeneratedSessionId, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00")))
}
for j := 0; j < 5; j++ {
click_events2 = append(click_events2, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
"bookItem", gofakeit.Adjective()+" "+gofakeit.Noun(), data.GeneratedCategory, gofakeit.Street(), data.GeneratedSessionId, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00")))
}
messages = append(messages,
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"login\",\"details\":{\"user_id\":\"%s\",\"user_name\":\"%s\",\"user_age\":%d},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
gofakeit.Username(), gofakeit.Name(), gofakeit.IntRange(16, 65), data.GeneratedSessionId, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
kafka.Message{
Value: []byte("[" + strings.Join(click_events1, ",") + "]"),
},
kafka.Message{
Value: []byte("[" + strings.Join(click_events2, ",") + "]"),
})
writer.WriteMessages(context.Background(),
messages...,
)
return c.JSON(data)
})
go func() {
for {
preferred_categories, _ := weightedrand.NewChooser(
weightedrand.Choice{Item: "Development", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Novel", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Cookbook", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Journal", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Art", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "History", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "Science", Weight: gofakeit.UintN(10) + 1},
)
for i := 1; i <= 100*60*5; i++ {
click_events := []string{}
click_event_count := gofakeit.IntRange(1, 5)
if gofakeit.IntN(10) > 3 {
for j := 0; j < click_event_count; j++ {
click_events = append(click_events, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":null,\"session_issuer\":null,\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
gofakeit.RandomString([]string{"bookItem", "audioItem"}), gofakeit.Adjective()+" "+gofakeit.Noun(), preferred_categories.Pick().(string), gofakeit.Street(), gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00")))
}
go func() {
writer.WriteMessages(context.Background(),
kafka.Message{
Value: []byte("[" + strings.Join(click_events, ",") + "]"),
},
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":null,\"session_issuer\":null,\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
gofakeit.RandomString([]string{"categorySelector", "searchFilterSelector"}), preferred_categories.Pick(), gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"move\",\"details\":{\"page\":\"/%s/%s\"},\"event_at\":\"%s\"}",
gofakeit.Adjective(), gofakeit.Noun(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"error\",\"details\":{\"page\":\"/%s/%s\",\"message\":\"%s\"},\"event_at\":\"%s\"}",
gofakeit.Adjective(), gofakeit.Noun(), gofakeit.LoremIpsumSentence(100), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
)
}()
} else {
session_id := gofakeit.UUID()
for j := 0; j < click_event_count; j++ {
click_events = append(click_events, fmt.Sprintf("{\"type\":\"click\",\"details\":{\"element\":\".%s\",\"content\":{\"item_name\":\"%s\",\"item_category\":\"%s\",\"item_location\":\"%s\"}},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
gofakeit.RandomString([]string{"bookItem", "audioItem"}), gofakeit.Adjective()+" "+gofakeit.Noun(), preferred_categories.Pick().(string), gofakeit.Street(), session_id, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00")))
}
go func() {
writer.WriteMessages(context.Background(),
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"login\",\"details\":{\"user_id\":\"%s\",\"user_name\":\"%s\",\"user_age\":%d},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
gofakeit.Username(), gofakeit.Name(), gofakeit.IntRange(16, 65), session_id, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
kafka.Message{
Value: []byte("[" + strings.Join(click_events, ",") + "]"),
},
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"select\",\"details\":{\"element\":\"#%s\",\"value\":\"%s\"},\"credentials\":{\"session_id\":\"ubks_%s\",\"session_issuer\":\"mainsvc__vNeptune_1\",\"sys_trace_id\":\"ubks__mainsvc__vNeptune_1_%s\"},\"event_at\":\"%s\"}",
gofakeit.RandomString([]string{"categorySelector", "searchFilterSelector"}), preferred_categories.Pick(), session_id, gofakeit.UUID(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"move\",\"details\":{\"page\":\"/%s/%s\"},\"event_at\":\"%s\"}",
gofakeit.Adjective(), gofakeit.Noun(), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
kafka.Message{
Value: []byte(fmt.Sprintf("{\"type\":\"error\",\"details\":{\"page\":\"/%s/%s\",\"message\":\"%s\"},\"event_at\":\"%s\"}",
gofakeit.Adjective(), gofakeit.Noun(), gofakeit.LoremIpsumSentence(100), time.Now().Format("2006-01-02T15:04:05.999999-07:00"))),
},
)
}()
}
time.Sleep(10 * time.Millisecond)
}
}
}()
}()
app.Post("/api/streams/log", func(c *fiber.Ctx) error {
data := struct {
GeneratedReferrer string `json:"generated_referrer"`
GeneratedCount int `json:"generated_count"`
SampledLog string `json:"sampled_log"`
}{}
data.GeneratedCount = 100000
data.GeneratedReferrer = gofakeit.Noun() + ".test.com"
for i := 0; i < data.GeneratedCount; i++ {
method := gofakeit.HTTPMethod()
referrer := gofakeit.RandomString([]string{"http://", "https://"}) + data.GeneratedReferrer + ":" + gofakeit.RandomString([]string{"80", "443", "8080", "8443", "8000", "5000"}) + "/" + url.PathEscape(gofakeit.Adjective()) + "/" + url.PathEscape(gofakeit.Noun())
if method == "GET" || method == "HEAD" || method == "DELETE" {
fmt.Printf("%s - [%s] %s %s %d %s %s %s \"%s\"\n",
gofakeit.IPv4Address(),
time.Now().Format("2006-01-02T15:04:05.999999 -07:00"),
method,
"/api/qry/"+url.PathEscape(gofakeit.Noun())+"?_qtt="+gofakeit.UUID()+"&_st="+url.QueryEscape(gofakeit.StreetName()),
gofakeit.HTTPStatusCode(),
"100MB",
"100s",
referrer,
gofakeit.UserAgent())
} else {
fmt.Printf("%s - [%s] %s %s %d %s %s %s %s \"%s\"\n",
gofakeit.IPv4Address(),
time.Now().Format("2006-01-02T15:04:05.999999 -07:00"),
method,
"/api/products?_qcw="+gofakeit.UUID()+"&_rf="+gofakeit.UUID(),
gofakeit.HTTPStatusCode(),
"100MB",
"100MB",
"100s",
referrer,
gofakeit.UserAgent())
}
}
data.SampledLog = fmt.Sprintf("%s - [%s] %s %s %d %s %s %s %s \"%s\"",
gofakeit.IPv4Address(),
time.Now().Format("2006-01-02T15:04:05.999999 -07:00"),
"POST",
"/test",
201,
"1MB",
"1B",
"1s",
"http://referrer",
"test/"+gofakeit.UUID())
fmt.Println(data.SampledLog)
return c.JSON(data)
})
go func() {
for {
rxSeed := gofakeit.IntN(50) + 1
txSeed := gofakeit.IntN(100) + 1
lateSeed := gofakeit.IntN(12) + 1
referrers, _ := weightedrand.NewChooser(
weightedrand.Choice{Item: "search.example.com", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "ad.unicorncorp.net", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "wiki.spreadinghoax.org", Weight: gofakeit.UintN(10) + 1},
weightedrand.Choice{Item: "books.unicorn.com", Weight: 15},
)
for i := 1; i <= 20*60; i++ {
method := gofakeit.HTTPMethod()
duration, _ := time.ParseDuration(strconv.Itoa(gofakeit.IntN(100*lateSeed)) + "ms")
referrer := gofakeit.RandomString([]string{"http://", "https://"}) + referrers.Pick().(string) + ":" + gofakeit.RandomString([]string{"80", "443", "8080", "8443", "8000", "5000"}) + "/" + url.PathEscape(gofakeit.Adjective()) + "/" + url.PathEscape(gofakeit.Noun())
if method == "GET" || method == "HEAD" || method == "DELETE" {
fmt.Printf("%s - [%s] %s %s %d %s %s %s \"%s\"\n",
gofakeit.IPv4Address(),
time.Now().Format("2006-01-02T15:04:05.999999 -07:00"),
method,
"/api/qry/"+url.PathEscape(gofakeit.Noun())+"?_qtt="+gofakeit.UUID()+"&_st="+url.QueryEscape(gofakeit.StreetName()),
gofakeit.HTTPStatusCode(),
ByteCountSI(gofakeit.IntN(1000*10*txSeed)),
duration,
referrer,
gofakeit.UserAgent())
} else {
fmt.Printf("%s - [%s] %s %s %d %s %s %s %s \"%s\"\n",
gofakeit.IPv4Address(),
time.Now().Format("2006-01-02T15:04:05.999999 -07:00"),
method,
"/api/products?_qcw="+gofakeit.UUID()+"&_rf="+gofakeit.UUID(),
gofakeit.HTTPStatusCode(),
ByteCountSI(gofakeit.IntN(100*rxSeed)),
ByteCountSI(gofakeit.IntN(1000*txSeed)),
duration,
referrer,
gofakeit.UserAgent())
}
time.Sleep(50 * time.Millisecond)
}
}
}()
app.Listen(":8080")
}