Blob inest works. Needs tests and general cleanup

This commit is contained in:
Netscape Navigator 2020-11-07 19:29:36 -06:00
parent 6face73482
commit f38ed31279
8 changed files with 127 additions and 26 deletions

View File

@ -42,12 +42,23 @@ func createBlobDirectory(mhash string) string {
return path.Join(dirPath, fileName) return path.Join(dirPath, fileName)
} }
func addBlob(data []byte) string { // func fileExists(filename string) bool {
// info, err := os.Stat(filename)
// if os.IsNotExist(err) {
// return false
// }
// return !info.IsDir()
// }
// func blobExists(mhash string) bool {
// return fileExists(createBlobDirectory(mhash))
// }
func addBlob(mhash string, data []byte) string {
size := len(data) size := len(data)
if size > blobByteLimit { if size > blobByteLimit {
panicf("Expected blob smaller than %d. Got: %d", blobByteLimit, size) panicf("Expected blob smaller than %d. Got: %d", blobByteLimit, size)
} }
mhash := encodeBlobMhash(sha256.Sum256(data))
blobPath := createBlobDirectory(mhash) blobPath := createBlobDirectory(mhash)
write(blobPath, data) write(blobPath, data)
return mhash return mhash
@ -65,15 +76,7 @@ func addBlobFromPipe() string {
output = append(output, input) output = append(output, input)
} }
return addBlob(output) return addBlob(getMhashForBytes(output), output)
}
func addBlobFromPath(path string) string {
dat, err := ioutil.ReadFile(path)
if err != nil {
panicf("Unable to read %s: %s", path, err)
}
return addBlob(dat)
} }
func write(path string, data []byte) { func write(path string, data []byte) {
@ -94,8 +97,19 @@ func write(path string, data []byte) {
} }
} }
// def get_blob(mhash) func getSha256(data []byte) []byte {
// path1 = File.join(createBlobDirectory(mhash) h := sha256.New()
// path2 = File.join(DEFAULT_BLOB_DIR, path1) h.Write(data)
// File.read(path2) if File.file?(path2) return h.Sum(nil)
// end }
func getMhashForBytes(data []byte) string {
return encodeBlobMhash(getSha256(data))
}
/* getMhashForFile Returns the mHash and data for a path. */
func getMhashForFile(path string) (string, []byte) {
data, err := ioutil.ReadFile(path)
check(err, "Can't open %s", path)
return getMhashForBytes(data), data
}

View File

@ -126,7 +126,8 @@ var blobAddCommand = &cobra.Command{
if len(args) == 0 { if len(args) == 0 {
output = addBlobFromPipe() output = addBlobFromPipe()
} else { } else {
output = addBlobFromPath(args[0]) mhash, data := getMhashForFile(args[0])
output = addBlob(mhash, data)
} }
fmt.Printf(tpl, output) fmt.Printf(tpl, output)
}, },

View File

@ -21,6 +21,14 @@ func decodeMhash(input string) []byte {
return []byte(B32Decode(input[5:])) return []byte(B32Decode(input[5:]))
} }
func isBlob(input string) bool {
if len(input) < 5 {
return false
} else {
return input[0:5] == BlobSigil
}
}
func validateMhash(input string) string { func validateMhash(input string) string {
arry := strings.Split(input, ".") arry := strings.Split(input, ".")
if len(arry) != 2 { if len(arry) != 2 {

View File

@ -16,6 +16,6 @@ func encodePeerMhash(pubKey []byte) string {
return PeerSigil + B32Encode(pubKey) return PeerSigil + B32Encode(pubKey)
} }
func encodeBlobMhash(sha256 [32]byte) string { func encodeBlobMhash(sha256 []byte) string {
return BlobSigil + B32Encode(sha256[:]) return BlobSigil + B32Encode(sha256[:])
} }

View File

@ -1,26 +1,64 @@
package main package main
import ( import (
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"path"
"path/filepath"
) )
const insertMessageQuery = "INSERT INTO messages(author, depth, kind, lipmaa, prev, signature) VALUES(?1, ?2, ?3, ?4, ?5, ?6)"
const insertBodyItemQuery = "INSERT INTO body_items(parent, key, value, rank) VALUES(?1, ?2, ?3, ?4)"
func ingestOneMessage(msg pigeonMessage, blobIndex map[string]bool) { func ingestOneMessage(msg pigeonMessage, blobIndex map[string]bool) {
if getPeerStatus(msg.author) == following { if getPeerStatus(msg.author) == following {
fmt.Println("TODO: Ingest this message") tx, err1 := getDB().Begin()
check(err1, "ingestOneMessage: Can't open DB: %s", err1)
results, err2 := tx.Exec(insertMessageQuery, msg.author, msg.depth, msg.kind, msg.lipmaa, msg.prev, msg.signature)
rollbackCheck(tx, err2, "Failed to save message %s", msg.signature)
parent, err3 := results.LastInsertId()
rollbackCheck(tx, err3, "Failed to get last ID for message %s", msg.signature)
for rank, pair := range msg.body {
_, err4 := tx.Exec(insertBodyItemQuery, parent, pair.key, pair.value, rank)
if err4 != nil {
fmt.Printf("%s", err4)
}
rollbackCheck(tx, err4, "Failed to insert body item %d of %s", rank, msg.signature)
}
err5 := tx.Commit()
check(err5, "Failed to commit message %s", msg.signature)
} }
} }
/** ingestManyMessages takes an array of Pigeon messages /** ingestManyMessages takes an array of Pigeon messages
and adds them to the local database, assuming that they are and adds them to the local database, assuming that they are
messages of interest. */ messages of interest and assuming that they do not already
exist in the database. */
func ingestManyMessages(outp parserOutput) { func ingestManyMessages(outp parserOutput) {
for _, message := range outp.messages { for _, message := range outp.messages {
ingestOneMessage(message, outp.blobIndex) ingestOneMessage(message, outp.blobIndex)
} }
} }
func ingestBlobs(p string, blobIndex map[string]bool) {
dir, _ := path.Split(p)
wildcard := path.Join(dir, "*.blb")
blobPaths, err1 := filepath.Glob(wildcard)
check(err1, "Blob wildcard failure %s", wildcard)
for _, blobPath := range blobPaths {
mhash, data := getMhashForFile(blobPath)
if blobIndex[mhash] {
fmt.Printf("Ingesting %s\n", mhash)
addBlob(mhash, data)
blobIndex[mhash] = false
} else {
fmt.Printf("Don't need %s\n", mhash)
}
}
}
func importBundle(path string) error { func importBundle(path string) error {
// Get messages.pgn file // Get messages.pgn file
dat, err1 := ioutil.ReadFile(path) dat, err1 := ioutil.ReadFile(path)
@ -28,7 +66,6 @@ func importBundle(path string) error {
outp, err2 := parseMessage(string(dat)) outp, err2 := parseMessage(string(dat))
check(err2, "Failed to parse %s. Error: %s", path, err2) check(err2, "Failed to parse %s. Error: %s", path, err2)
ingestManyMessages(outp) ingestManyMessages(outp)
// Parse messages ingestBlobs(path, outp.blobIndex)
// Map over messages return nil
return errors.New("Not done yet")
} }

View File

@ -28,6 +28,30 @@ var migrations = []migration{
`, `,
down: `DROP TABLE IF EXISTS peers`, down: `DROP TABLE IF EXISTS peers`,
}, },
migration{
up: `CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
author string NOT NULL,
depth int NOT NULL,
kind string NOT NULL,
lipmaa string NOT NULL,
prev string NOT NULL,
signature string NOT NULL
);
`,
down: `DROP TABLE IF EXISTS messages`,
},
migration{
up: `CREATE TABLE IF NOT EXISTS body_items (
parent int NOT NULL,
key string NOT NULL,
value string NOT NULL,
rank int NOT NULL,
FOREIGN KEY(parent) REFERENCES messages(id)
);
`,
down: `DROP TABLE IF EXISTS body_items`,
},
} }
func migrateUp(db *sql.DB) { func migrateUp(db *sql.DB) {

View File

@ -57,6 +57,12 @@ func newState(message string) parserState {
} }
} }
func maybeIndexBlob(index map[string]bool, input string) {
if isBlob(input) {
index[input] = true
}
}
func parseMessage(message string) (parserOutput, error) { func parseMessage(message string) (parserOutput, error) {
empty := parserOutput{ empty := parserOutput{
messages: []pigeonMessage{}, messages: []pigeonMessage{},
@ -87,8 +93,11 @@ func parseMessage(message string) (parserOutput, error) {
} }
blobIndex := map[string]bool{} blobIndex := map[string]bool{}
for _, msg := range state.results { for _, msg := range state.results {
for _, pair := range msg.body { if getPeerStatus(msg.author) == following {
panicf("YOU NEED TO FINISH CREATING A BLOB INDEX FOR IMPORTED BUNDLES: %s", pair.key) for _, pair := range msg.body {
maybeIndexBlob(blobIndex, pair.key)
maybeIndexBlob(blobIndex, pair.value)
}
} }
} }
output := parserOutput{messages: state.results, blobIndex: blobIndex} output := parserOutput{messages: state.results, blobIndex: blobIndex}

View File

@ -2,6 +2,7 @@ package main
import ( import (
"crypto/ed25519" "crypto/ed25519"
"database/sql"
"fmt" "fmt"
) )
@ -42,6 +43,13 @@ func panicf(tpl string, args ...interface{}) {
panic(fmt.Sprintf(tpl, args...)) panic(fmt.Sprintf(tpl, args...))
} }
func rollbackCheck(tx *sql.Tx, e error, tpl string, args ...interface{}) {
if e != nil {
tx.Rollback()
panicf(tpl, args...)
}
}
func check(e error, tpl string, args ...interface{}) { func check(e error, tpl string, args ...interface{}) {
if e != nil { if e != nil {
panicf(tpl, args...) panicf(tpl, args...)