From f38ed31279a6f16547d0fe86bd91af470c271aa1 Mon Sep 17 00:00:00 2001 From: Rick Carlino Date: Sat, 7 Nov 2020 19:29:36 -0600 Subject: [PATCH] Blob inest works. Needs tests and general cleanup --- project/blob.go | 46 ++++++++++++++++++++++++------------- project/cli.go | 3 ++- project/decoders.go | 8 +++++++ project/encoders.go | 2 +- project/import_bundle.go | 49 +++++++++++++++++++++++++++++++++++----- project/migrations.go | 24 ++++++++++++++++++++ project/parser.go | 13 +++++++++-- project/util.go | 8 +++++++ 8 files changed, 127 insertions(+), 26 deletions(-) diff --git a/project/blob.go b/project/blob.go index 139d410..be8e906 100644 --- a/project/blob.go +++ b/project/blob.go @@ -42,12 +42,23 @@ func createBlobDirectory(mhash string) string { return path.Join(dirPath, fileName) } -func addBlob(data []byte) string { +// func fileExists(filename string) bool { +// info, err := os.Stat(filename) +// if os.IsNotExist(err) { +// return false +// } +// return !info.IsDir() +// } + +// func blobExists(mhash string) bool { +// return fileExists(createBlobDirectory(mhash)) +// } + +func addBlob(mhash string, data []byte) string { size := len(data) if size > blobByteLimit { panicf("Expected blob smaller than %d. Got: %d", blobByteLimit, size) } - mhash := encodeBlobMhash(sha256.Sum256(data)) blobPath := createBlobDirectory(mhash) write(blobPath, data) return mhash @@ -65,15 +76,7 @@ func addBlobFromPipe() string { output = append(output, input) } - return addBlob(output) -} - -func addBlobFromPath(path string) string { - dat, err := ioutil.ReadFile(path) - if err != nil { - panicf("Unable to read %s: %s", path, err) - } - return addBlob(dat) + return addBlob(getMhashForBytes(output), output) } func write(path string, data []byte) { @@ -94,8 +97,19 @@ func write(path string, data []byte) { } } -// def get_blob(mhash) -// path1 = File.join(createBlobDirectory(mhash) -// path2 = File.join(DEFAULT_BLOB_DIR, path1) -// File.read(path2) if File.file?(path2) -// end +func getSha256(data []byte) []byte { + h := sha256.New() + h.Write(data) + return h.Sum(nil) +} + +func getMhashForBytes(data []byte) string { + return encodeBlobMhash(getSha256(data)) +} + +/* getMhashForFile Returns the mHash and data for a path. */ +func getMhashForFile(path string) (string, []byte) { + data, err := ioutil.ReadFile(path) + check(err, "Can't open %s", path) + return getMhashForBytes(data), data +} diff --git a/project/cli.go b/project/cli.go index 259c666..a214111 100644 --- a/project/cli.go +++ b/project/cli.go @@ -126,7 +126,8 @@ var blobAddCommand = &cobra.Command{ if len(args) == 0 { output = addBlobFromPipe() } else { - output = addBlobFromPath(args[0]) + mhash, data := getMhashForFile(args[0]) + output = addBlob(mhash, data) } fmt.Printf(tpl, output) }, diff --git a/project/decoders.go b/project/decoders.go index d8e7c1a..8035069 100644 --- a/project/decoders.go +++ b/project/decoders.go @@ -21,6 +21,14 @@ func decodeMhash(input string) []byte { return []byte(B32Decode(input[5:])) } +func isBlob(input string) bool { + if len(input) < 5 { + return false + } else { + return input[0:5] == BlobSigil + } +} + func validateMhash(input string) string { arry := strings.Split(input, ".") if len(arry) != 2 { diff --git a/project/encoders.go b/project/encoders.go index f91ffd1..43f5a5a 100644 --- a/project/encoders.go +++ b/project/encoders.go @@ -16,6 +16,6 @@ func encodePeerMhash(pubKey []byte) string { return PeerSigil + B32Encode(pubKey) } -func encodeBlobMhash(sha256 [32]byte) string { +func encodeBlobMhash(sha256 []byte) string { return BlobSigil + B32Encode(sha256[:]) } diff --git a/project/import_bundle.go b/project/import_bundle.go index b652fa9..ad1a38f 100644 --- a/project/import_bundle.go +++ b/project/import_bundle.go @@ -1,26 +1,64 @@ package main import ( - "errors" "fmt" "io/ioutil" + "path" + "path/filepath" ) +const insertMessageQuery = "INSERT INTO messages(author, depth, kind, lipmaa, prev, signature) VALUES(?1, ?2, ?3, ?4, ?5, ?6)" +const insertBodyItemQuery = "INSERT INTO body_items(parent, key, value, rank) VALUES(?1, ?2, ?3, ?4)" + func ingestOneMessage(msg pigeonMessage, blobIndex map[string]bool) { if getPeerStatus(msg.author) == following { - fmt.Println("TODO: Ingest this message") + tx, err1 := getDB().Begin() + check(err1, "ingestOneMessage: Can't open DB: %s", err1) + results, err2 := tx.Exec(insertMessageQuery, msg.author, msg.depth, msg.kind, msg.lipmaa, msg.prev, msg.signature) + rollbackCheck(tx, err2, "Failed to save message %s", msg.signature) + parent, err3 := results.LastInsertId() + rollbackCheck(tx, err3, "Failed to get last ID for message %s", msg.signature) + + for rank, pair := range msg.body { + _, err4 := tx.Exec(insertBodyItemQuery, parent, pair.key, pair.value, rank) + if err4 != nil { + fmt.Printf("%s", err4) + } + rollbackCheck(tx, err4, "Failed to insert body item %d of %s", rank, msg.signature) + } + err5 := tx.Commit() + check(err5, "Failed to commit message %s", msg.signature) } } /** ingestManyMessages takes an array of Pigeon messages and adds them to the local database, assuming that they are -messages of interest. */ +messages of interest and assuming that they do not already +exist in the database. */ func ingestManyMessages(outp parserOutput) { for _, message := range outp.messages { ingestOneMessage(message, outp.blobIndex) } } +func ingestBlobs(p string, blobIndex map[string]bool) { + dir, _ := path.Split(p) + wildcard := path.Join(dir, "*.blb") + blobPaths, err1 := filepath.Glob(wildcard) + check(err1, "Blob wildcard failure %s", wildcard) + for _, blobPath := range blobPaths { + mhash, data := getMhashForFile(blobPath) + + if blobIndex[mhash] { + fmt.Printf("Ingesting %s\n", mhash) + addBlob(mhash, data) + blobIndex[mhash] = false + } else { + fmt.Printf("Don't need %s\n", mhash) + } + } +} + func importBundle(path string) error { // Get messages.pgn file dat, err1 := ioutil.ReadFile(path) @@ -28,7 +66,6 @@ func importBundle(path string) error { outp, err2 := parseMessage(string(dat)) check(err2, "Failed to parse %s. Error: %s", path, err2) ingestManyMessages(outp) - // Parse messages - // Map over messages - return errors.New("Not done yet") + ingestBlobs(path, outp.blobIndex) + return nil } diff --git a/project/migrations.go b/project/migrations.go index 5858bcb..6dda62d 100644 --- a/project/migrations.go +++ b/project/migrations.go @@ -28,6 +28,30 @@ var migrations = []migration{ `, down: `DROP TABLE IF EXISTS peers`, }, + migration{ + up: `CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + author string NOT NULL, + depth int NOT NULL, + kind string NOT NULL, + lipmaa string NOT NULL, + prev string NOT NULL, + signature string NOT NULL + ); + `, + down: `DROP TABLE IF EXISTS messages`, + }, + migration{ + up: `CREATE TABLE IF NOT EXISTS body_items ( + parent int NOT NULL, + key string NOT NULL, + value string NOT NULL, + rank int NOT NULL, + FOREIGN KEY(parent) REFERENCES messages(id) + ); + `, + down: `DROP TABLE IF EXISTS body_items`, + }, } func migrateUp(db *sql.DB) { diff --git a/project/parser.go b/project/parser.go index 7161079..f778538 100644 --- a/project/parser.go +++ b/project/parser.go @@ -57,6 +57,12 @@ func newState(message string) parserState { } } +func maybeIndexBlob(index map[string]bool, input string) { + if isBlob(input) { + index[input] = true + } +} + func parseMessage(message string) (parserOutput, error) { empty := parserOutput{ messages: []pigeonMessage{}, @@ -87,8 +93,11 @@ func parseMessage(message string) (parserOutput, error) { } blobIndex := map[string]bool{} for _, msg := range state.results { - for _, pair := range msg.body { - panicf("YOU NEED TO FINISH CREATING A BLOB INDEX FOR IMPORTED BUNDLES: %s", pair.key) + if getPeerStatus(msg.author) == following { + for _, pair := range msg.body { + maybeIndexBlob(blobIndex, pair.key) + maybeIndexBlob(blobIndex, pair.value) + } } } output := parserOutput{messages: state.results, blobIndex: blobIndex} diff --git a/project/util.go b/project/util.go index 9583223..c451218 100644 --- a/project/util.go +++ b/project/util.go @@ -2,6 +2,7 @@ package main import ( "crypto/ed25519" + "database/sql" "fmt" ) @@ -42,6 +43,13 @@ func panicf(tpl string, args ...interface{}) { panic(fmt.Sprintf(tpl, args...)) } +func rollbackCheck(tx *sql.Tx, e error, tpl string, args ...interface{}) { + if e != nil { + tx.Rollback() + panicf(tpl, args...) + } +} + func check(e error, tpl string, args ...interface{}) { if e != nil { panicf(tpl, args...)