1 /++ 2 Standalone Mongo driver implementation. 3 4 Copyright: 2020 Symmetry Investments with portion (c) 2012-2016 Nicolas Gurrola 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 +/ 7 module kaleidic.mongo_standalone; 8 9 import std.array; 10 import std.bitmanip; 11 import std.socket; 12 13 static immutable string mongoDriverName = "mongo-standalone"; 14 static immutable string mongoDriverVersion = "0.0.9"; 15 16 // just a demo of how it can be used 17 version(Demo) 18 void main() { 19 20 /+ 21 string omg; 22 foreach(a; 0 .. 8000) 23 omg ~= "a"; 24 25 26 document doc1 = document([ 27 bson_value("foo", 12), 28 bson_value("bar", 12.4), 29 bson_value("baz", 12L), 30 bson_value("faz", omg), 31 bson_value("eaz", RegEx("asdasdsa", "i")), 32 bson_value("asds", null), 33 bson_value("sub", document([ 34 bson_value("sub1", 12) 35 ])), 36 ]); 37 +/ 38 39 //auto connection = new MongoConnection("mongodb://testuser:testpassword@localhost/test?slaveOk=true"); 40 auto connection = new MongoConnection("mongodb://localhost/test?slaveOk=true", "testuser", "testpassword"); 41 42 connection.update("test.world", true, false, document([bson_value("_id", ObjectId("5ef17aea55c0abb51fbb53bc"))]), 43 document([ 44 //bson_value("_id", ObjectId("5ef17aea55c0abb51fbb53bc")), 45 bson_value("replaced","true") 46 ]) 47 ); 48 49 //connection.insert(false, "test.world", [doc1]); 50 51 import std.stdio; 52 //writeln(connection.query(0, "world", 0, 1, document.init)); 53 auto answer = connection.query("test.world", 35, 1, document.init, document.init, 1); 54 writeln(answer); 55 } 56 57 enum QueryFlags { 58 TailableCursor = (1 << 1), 59 SlaveOk = (1 << 2), 60 NoCursorTimeout = (1 << 4), 61 AwaitData = (1 << 5), 62 Exhaust = (1 << 6), 63 Partial = (1 << 7) 64 } 65 66 /// Server Wire version indicating supported features 67 /// $(LINK https://github.com/mongodb/specifications/blob/master/source/wireversion-featurelist.rst) 68 enum WireVersion { 69 /// Pre 2.6 (-2.4) 70 old = 0, 71 /// Server version 2.6 72 v26 = 1, 73 /// Server version 2.6 74 v26_2 = 2, 75 /// Server version 3.0 76 v30 = 3, 77 /// Server version 3.2 78 v32 = 4, 79 /// Server version 3.4 80 v34 = 5, 81 /// Server version 3.6 82 v36 = 6, 83 /// Server version 4.0 84 v40 = 7, 85 /// Server version 4.2 86 v42 = 8, 87 } 88 89 class MongoConnection { 90 91 Socket socket; 92 IReceiveStream stream; 93 94 uint defaultQueryFlags; 95 96 /// Reported minimum wire version by the server 97 WireVersion minWireVersion; 98 /// Reported maximum wire version by the server 99 WireVersion maxWireVersion; 100 101 private document handshake(string authDatabase, string username, 102 document application) { 103 import std.compiler : compilerName = name, version_major, version_minor; 104 import std.conv : text, to; 105 import std.system : os; 106 107 auto dbcmd = authDatabase ~ ".$cmd"; 108 109 static immutable osType = os.to!string; 110 111 version (X86_64) 112 static immutable osArchitecture = "x86_64"; 113 else version (X86) 114 static immutable osArchitecture = "x86"; 115 else version (ARM) 116 static immutable osArchitecture = "arm"; 117 else version (PPC64) 118 static immutable osArchitecture = "ppc64"; 119 else version (PPC) 120 static immutable osArchitecture = "ppc"; 121 else 122 static assert(false, "no name for this architecture"); 123 124 static immutable platform = text(compilerName, " v", version_major, ".", version_minor); 125 126 bson_value[] client; 127 if (application.length) 128 { 129 auto name = application["name"]; 130 if (name == bson_value.init) 131 throw new Exception("Given application without name"); 132 // https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#limitations 133 if (name.toString().length > 128) 134 throw new Exception("Application name must not exceed 128 bytes"); 135 136 client ~= bson_value("application", application); 137 } 138 139 client ~= [ 140 bson_value("driver", document([ 141 bson_value("name", mongoDriverName), 142 bson_value("version", mongoDriverVersion) 143 ])), 144 bson_value("os", document([ 145 bson_value("type", osType), 146 bson_value("architecture", osArchitecture) 147 ])), 148 bson_value("platform", platform) 149 ]; 150 151 auto cmd = [ 152 bson_value("isMaster", 1), 153 bson_value("client", document(client)) 154 ]; 155 if (username.length) 156 cmd ~= bson_value("saslSupportedMechs", authDatabase ~ "." ~ username); 157 158 auto reply = query(dbcmd, 0, -1, document(cmd)); 159 if (reply.documents.length != 1 || reply.documents[0]["ok"].get!double != 1) 160 throw new Exception("MongoDB Handshake failed"); 161 162 return reply.documents[0]; 163 } 164 165 private void authenticateScramSha1(string authDatabase, string username, 166 string password) { 167 auto dbcmd = authDatabase ~ ".$cmd"; 168 169 bson_value conversationId; 170 171 ScramState state; 172 173 ubyte[] payload = cast(ubyte[]) state.createInitialRequest(username); 174 175 auto cmd = document([ 176 bson_value("saslStart", 1), 177 bson_value("mechanism", "SCRAM-SHA-1"), 178 bson_value("payload", payload), 179 bson_value("options", document([ 180 bson_value("skipEmptyExchange", true) 181 ])) 182 ]); 183 184 185 auto firstReply = query(dbcmd, 0, -1, cmd); 186 if(firstReply.documents.length != 1 || firstReply.documents[0]["ok"].get!double != 1) 187 throw new Exception("Auth failed at first step (username)"); 188 189 conversationId = cast(bson_value) firstReply.documents[0]["conversationId"]; 190 191 auto response = firstReply.documents[0]["payload"].get!(const(ubyte[])); 192 193 const digest = makeDigest(username, password); 194 195 payload = cast(typeof(payload)) state.update(digest, cast(string) response); 196 197 cmd = document([ 198 bson_value("saslContinue", 1), 199 bson_value("conversationId", conversationId), 200 bson_value("payload", payload) 201 ]); 202 203 auto secondReply = query(dbcmd, 0, -1, cmd); 204 if(secondReply.documents.length != 1) 205 throw new Exception("Auth error at second step"); 206 207 if(secondReply.documents[0]["ok"].get!double != 1) 208 throw new Exception("Auth failed at second step (password)"); 209 210 auto response2 = secondReply.documents[0]["payload"].get!(const(ubyte[])); 211 212 payload = cast(typeof(payload)) state.finalize(cast(string) response2); 213 214 // newer servers can save a roundtrip (they know the password here already) 215 if(secondReply.documents[0]["done"].get!bool) 216 return; 217 218 cmd = document([ 219 bson_value("saslContinue", 1), 220 bson_value("conversationId", conversationId), 221 bson_value("payload", payload) 222 ]); 223 224 auto finalReply = query(dbcmd, 0, -1, cmd); 225 if(finalReply.documents.length != 1) 226 throw new Exception("Auth error at final step"); 227 228 if(finalReply.documents[0]["ok"].get!double != 1) 229 throw new Exception("Auth failed at final step"); 230 231 if(!finalReply.documents[0]["done"].get!bool) 232 throw new Exception("Authentication didn't respond 'done'"); 233 } 234 235 /++ 236 Connects to a Mongo database. You can give a username and password in the 237 connection string if URL encoded, OR you can pass it as arguments to this 238 constructor. 239 240 Please note that username/password in the connectionString will OVERRIDE 241 ones given as separate arguments. 242 +/ 243 this(string connectionString, string defaultUsername = null, string defaultPassword = null) { 244 import std.uri : decodeComponent; 245 import std..string; 246 247 auto uri = Uri(connectionString); 248 249 if(uri.hosts.length == 0) 250 throw new Exception("no host given in connection string"); 251 252 foreach(ref p; uri.hosts) { 253 if(p.port == 0) 254 p.port = 27017; 255 } 256 257 string host = decodeComponent(uri.hosts[0].host); 258 if(host.length == 0) 259 host = "localhost"; 260 261 // FIXME: break up the host into the list and at least 262 // pretend to care about replication sets 263 264 string authDb = "admin"; 265 string appName; 266 267 string username = defaultUsername; 268 string password = defaultPassword; 269 270 auto split = uri.userinfo.indexOf(":"); 271 if(split != -1) { 272 username = decodeComponent(uri.userinfo[0 .. split]); 273 password = decodeComponent(uri.userinfo[split + 1 .. $]); 274 } 275 if(uri.path.length > 1) 276 authDb = uri.path[1 .. $]; 277 278 bool ssl; 279 280 foreach(part; uri.query.splitter("&")) { 281 split = part.indexOf("="); 282 auto name = decodeComponent(part[0 .. split]); 283 auto value = decodeComponent(part[split + 1 .. $]); 284 285 // https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options 286 switch(name) { 287 case "slaveOk": defaultQueryFlags |= QueryFlags.SlaveOk; break; 288 case "appName": appName = value; break; 289 case "authSource": authDb = value; break; 290 case "tls", "ssl": ssl = value == "true"; break; 291 default: throw new Exception("Unsupported mongo db connect option: " ~ name); 292 } 293 } 294 295 if(host[0] == '/') { 296 version(Posix) { 297 socket = new Socket(AddressFamily.UNIX, SocketType.STREAM); 298 socket.connect(new UnixAddress(host)); 299 } else throw new Exception("Cannot use unix socket on Windows at this time"); 300 } else { 301 if(ssl) 302 throw new Exception("ssl/tls connections not supported by this driver"); 303 socket = new TcpSocket(new InternetAddress(host, cast(ushort) uri.hosts[0].port)); 304 } 305 306 stream = new SocketReceiveStream(socket); 307 308 document appArgs; 309 if (appName.length) 310 appArgs = document([bson_value("name", appName)]); 311 auto handshakeResponse = handshake(authDb, username, appArgs); 312 313 minWireVersion = cast(WireVersion)handshakeResponse["minWireVersion"].get!int; 314 maxWireVersion = cast(WireVersion)handshakeResponse["maxWireVersion"].get!int; 315 316 bool supportsScramSha1 = maxWireVersion >= WireVersion.v30; 317 bool supportsScramSha256 = maxWireVersion >= WireVersion.v40; 318 319 auto saslSupportedMechs = handshakeResponse["saslSupportedMechs"]; 320 if (saslSupportedMechs != bson_value.init) { 321 auto arr = saslSupportedMechs.get!(const(bson_value)[]); 322 supportsScramSha1 = false; 323 supportsScramSha256 = false; 324 foreach (v; arr) { 325 switch (v.toString) { 326 case "SCRAM-SHA-1": 327 supportsScramSha1 = true; 328 break; 329 case "SCRAM-SHA-256": 330 supportsScramSha256 = true; 331 break; 332 default: 333 // unsupported mechanism 334 break; 335 } 336 } 337 } 338 339 // https://github.com/mongodb/specifications/blob/master/source/auth/auth.rst#supported-authentication-methods 340 if (username.length) { 341 // TODO: support other (certificate based) authentication mechanisms 342 343 // TODO: SCRAM-SHA-256 support 344 // if (supportsScramSha256) { 345 // } else 346 if (supportsScramSha1) { 347 authenticateScramSha1(authDb, username, password); 348 } else { 349 if (maxWireVersion < WireVersion.v30) 350 throw new Exception("legacy MONGODB-CR authentication not implemented"); 351 else 352 throw new Exception( 353 "Cannot authenticate because no common authentication mechanism could be found."); 354 } 355 } 356 } 357 358 void update(const(char)[] fullCollectionName, bool upsert, bool multiupdate, document selector, document update) { 359 MsgHeader header; 360 361 header.requestID = ++nextRequestId; 362 header.opCode = OP.UPDATE; 363 364 SendBuffer sb; 365 366 sb.add(header); 367 368 int zero; 369 sb.add(zero); 370 371 sb.add(fullCollectionName); 372 int flags; 373 if(upsert) flags |= 1; 374 if(multiupdate) flags |= 2; 375 sb.add(flags); 376 sb.add(selector); 377 sb.add(update); 378 379 send(sb.data); 380 } 381 382 void insert(bool continueOnError, const(char)[] fullCollectionName, document[] documents) { 383 MsgHeader header; 384 385 header.requestID = ++nextRequestId; 386 header.opCode = OP.INSERT; 387 388 SendBuffer sb; 389 390 sb.add(header); 391 392 int flags = continueOnError ? 1 : 0; 393 sb.add(flags); 394 sb.add(fullCollectionName); 395 foreach(doc; documents) 396 sb.add(doc); 397 398 send(sb.data); 399 } 400 401 OP_REPLY getMore(const(char)[] fullCollectionName, int numberToReturn, long cursorId, int limitTotalEntries = int.max) { 402 MsgHeader header; 403 404 header.requestID = ++nextRequestId; 405 header.opCode = OP.GET_MORE; 406 407 SendBuffer sb; 408 409 sb.add(header); 410 411 int zero; 412 sb.add(zero); 413 sb.add(fullCollectionName); 414 sb.add(numberToReturn); 415 sb.add(cursorId); 416 417 send(sb.data); 418 419 auto reply = stream.readReply(); 420 421 reply.numberToReturn = numberToReturn; 422 reply.fullCollectionName = fullCollectionName; 423 reply.current = 0; 424 reply.connection = this; 425 426 return reply; 427 } 428 429 void delete_(const(char)[] fullCollectionName, bool singleRemove, document selector) { 430 MsgHeader header; 431 432 header.requestID = ++nextRequestId; 433 header.opCode = OP.DELETE; 434 435 SendBuffer sb; 436 437 sb.add(header); 438 439 int zero; 440 sb.add(0); 441 sb.add(fullCollectionName); 442 int flags = singleRemove ? 1 : 0; 443 sb.add(flags); 444 sb.add(selector); 445 446 send(sb.data); 447 } 448 449 void killCursors(const(long)[] cursorIds) { 450 MsgHeader header; 451 452 header.requestID = ++nextRequestId; 453 header.opCode = OP.KILL_CURSORS; 454 455 SendBuffer sb; 456 457 sb.add(header); 458 int zero = 0; 459 sb.add(zero); 460 461 sb.add(cast(int) cursorIds.length); 462 foreach(id; cursorIds) 463 sb.add(id); 464 465 send(sb.data); 466 } 467 468 OP_REPLY query(const(char)[] fullCollectionName, int numberToSkip, int numberToReturn, document query, document returnFieldsSelector = document.init, int flags = 1, int limitTotalEntries = int.max) { 469 if(flags == 1) 470 flags = defaultQueryFlags; 471 MsgHeader header; 472 473 header.requestID = ++nextRequestId; 474 header.opCode = OP.QUERY; 475 476 SendBuffer sb; 477 478 sb.add(header); 479 480 sb.add(flags); 481 sb.add(fullCollectionName); 482 sb.add(numberToSkip); 483 sb.add(numberToReturn); 484 sb.add(query); 485 486 if(returnFieldsSelector != document.init) 487 sb.add(returnFieldsSelector); 488 489 send(sb.data); 490 491 auto reply = stream.readReply(); 492 493 reply.numberToReturn = numberToReturn; 494 reply.limitRemaining = limitTotalEntries; 495 reply.fullCollectionName = fullCollectionName; 496 reply.current = 0; 497 reply.connection = this; 498 499 return reply; 500 } 501 502 private int nextRequestId; 503 504 private void send(const(ubyte)[] data) { 505 while(data.length) { 506 auto ret = socket.send(data); 507 if(ret <= 0) 508 throw new Exception("wtf"); 509 data = data[ret .. $]; 510 } 511 } 512 } 513 514 unittest { 515 auto uri = Uri("mongo://test:12,foo:14/"); 516 assert(uri.hosts.length == 2); 517 assert(uri.hosts[0] == UriHost("test", 12)); 518 assert(uri.hosts[1] == UriHost("foo", 14)); 519 } 520 521 interface IReceiveStream { 522 final OP_REPLY readReply() { 523 OP_REPLY reply; 524 525 526 reply.header.messageLength = readInt(); 527 reply.header.requestID = readInt(); 528 reply.header.responseTo = readInt(); 529 reply.header.opCode = readInt(); 530 531 // flag 0: cursor not found. 1: query failure. $err set in a thing. 532 reply.responseFlags = readInt(); 533 reply.cursorID = readLong(); 534 reply.startingFrom = readInt(); 535 reply.numberReturned = readInt(); 536 537 reply.documents.length = reply.numberReturned; 538 539 foreach(ref doc; reply.documents) { 540 doc = readBson(); 541 } 542 543 return reply; 544 } 545 546 final document readBson() { 547 document d; 548 d.bytesCount = readInt(); 549 550 int remaining = d.bytesCount; 551 remaining -= 4; // length 552 remaining -= 1; // terminating zero 553 554 while(remaining > 0) { 555 d.values_ ~= readBsonValue(remaining); 556 } 557 558 d.terminatingZero = readByte(); 559 if(d.terminatingZero != 0) 560 throw new Exception("something wrong reading bson"); 561 562 return d; 563 } 564 565 final bson_value readBsonValue(ref int remaining) { 566 bson_value v; 567 568 v.tag = readByte(); 569 remaining--; 570 571 v.e_name = readZeroTerminatedChars(); 572 remaining -= v.e_name.length + 1; // include the zero term 573 574 switch(v.tag) { 575 case 0x00: // not supposed to exist! 576 throw new Exception("invalid bson"); 577 case 0x01: 578 auto d = readDouble(); 579 remaining -= 8; 580 v.x01 = d; 581 break; 582 case 0x02: 583 v.x02 = readCharsWithLengthPrefix(); 584 remaining -= 5 + v.x02.length; // length and zero 585 break; 586 case 0x03: 587 v.x03 = readBson(); 588 remaining -= v.x03.bytesCount; 589 break; 590 case 0x04: 591 v.x04 = readBson(); 592 remaining -= v.x04.bytesCount; 593 break; 594 case 0x05: 595 auto length = readInt(); 596 v.x05_tag = readByte(); 597 v.x05_data = readBytes(length); 598 remaining -= v.x05_data.length + 5; 599 break; 600 case 0x06: // undefined 601 // intentionally blank, no additional data 602 break; 603 case 0x07: 604 foreach(ref b; v.x07) { 605 b = readByte(); 606 remaining--; 607 } 608 break; 609 case 0x08: 610 v.x08 = readByte() ? true : false; 611 remaining--; 612 break; 613 case 0x09: 614 v.x09 = readLong(); 615 remaining -= 8; 616 break; 617 case 0x0a: // null 618 // intentionally blank, no additional data 619 break; 620 case 0x0b: 621 v.x0b_regex = readZeroTerminatedChars(); 622 remaining -= v.x0b_regex.length + 1; 623 v.x0b_flags = readZeroTerminatedChars(); 624 remaining -= v.x0b_flags.length + 1; 625 break; 626 case 0x0d: 627 v.x0d = readCharsWithLengthPrefix(); 628 remaining -= 5 + v.x0d.length; // length and zero 629 break; 630 case 0x10: 631 v.x10 = readInt(); 632 remaining -= 4; 633 break; 634 case 0x11: 635 v.x11 = readLong(); 636 remaining -= 8; 637 break; 638 case 0x12: 639 v.x12 = readLong(); 640 remaining -= 8; 641 break; 642 case 0x13: 643 foreach(ref b; v.x13) { 644 b = readByte(); 645 remaining--; 646 } 647 break; 648 default: 649 import std.conv; 650 assert(0, "unsupported tag in bson: " ~ to!string(v.tag)); 651 } 652 653 return v; 654 } 655 656 final ubyte readByte() { 657 return readSmall!1[0]; 658 } 659 660 final int readInt() { 661 return readSmall!4.littleEndianToNative!int; 662 } 663 664 final long readLong() { 665 return readSmall!8.littleEndianToNative!long; 666 } 667 668 final double readDouble() { 669 return readSmall!8.littleEndianToNative!double; 670 } 671 672 final string readZeroTerminatedChars() { 673 return cast(string) readUntilNull(); 674 } 675 676 final string readCharsWithLengthPrefix() { 677 auto length = readInt(); 678 // length prefixed string allows 0 characters 679 auto bytes = readBytes(length); 680 if (bytes[$ - 1] != 0) 681 throw new Exception("Malformed length prefixed string"); 682 return cast(string)bytes[0 .. $ - 1]; 683 } 684 685 /// Reads exactly the amount of bytes specified and returns a newly 686 /// allocated buffer containing the data. 687 final immutable(ubyte)[] readBytes(size_t length) { 688 ubyte[] ret = new ubyte[length]; 689 readExact(ret); 690 return (() @trusted => cast(immutable)ret)(); 691 } 692 693 /// Reads a small number of bytes into a stack allocated array. Convenience 694 /// function calling $(LREF readExact). 695 ubyte[n] readSmall(size_t n)() { 696 ubyte[n] buf; 697 readExact(buf[]); 698 return buf; 699 } 700 701 /// Reads exactly the expected length into the given buffer argument. 702 void readExact(scope ubyte[] buffer); 703 704 /// Reads until a 0 byte and returns all data before it as newly allocated 705 /// buffer containing the data. 706 immutable(ubyte)[] readUntilNull(); 707 } 708 709 class SocketReceiveStream : IReceiveStream { 710 this(Socket socket) { 711 this.socket = socket; 712 } 713 714 Socket socket; 715 ubyte[4096] buffer; 716 // if bufferLength > bufferPos then following data is linear after bufferPos 717 // if bufferLength < bufferPos then data is following bufferPos and also from 0..bufferLength 718 // if bufferLength == bufferPos then all data is read 719 int bufferLength; 720 // may only increment until at the end of buffer.length, where it wraps back to 0 721 int bufferPos; 722 723 protected ptrdiff_t receiveSocket(ubyte[] buffer) { 724 const length = socket.receive(buffer); 725 726 if (length == 0) 727 throw new Exception("Remote side has closed the connection"); 728 else if (length == Socket.ERROR) 729 throw new Exception(lastSocketError()); 730 else 731 return length; 732 } 733 734 /// Loads more data after bufferPos if there is space, otherwise load before 735 /// bufferPos, indicating data is wrapping around. 736 void loadMore() { 737 if (bufferPos == bufferLength) { 738 // we read up all the bytes, start back from 0 for bigger recv buffer 739 bufferPos = 0; 740 bufferLength = 0; 741 } 742 743 ptrdiff_t length; 744 if (bufferLength < bufferPos) { 745 // length wrapped around before pos 746 // try to read up to the byte before bufferPos to not trigger the 747 // bufferPos == bufferLength case 748 length = receiveSocket(buffer[bufferLength .. bufferPos - 1]); 749 } else { 750 length = receiveSocket(buffer[bufferLength .. $ - 1]); 751 } 752 753 bufferLength += length; 754 if (bufferLength == buffer.length) 755 bufferLength = 0; 756 } 757 758 void readExact(scope ubyte[] dst) { 759 if (!dst.length) 760 return; 761 762 if (bufferPos == bufferLength) 763 loadMore(); 764 765 auto end = bufferPos + dst.length; 766 if (end < buffer.length) { 767 // easy linear copy 768 // receive until bufferLength wrapped around or we have enough bytes 769 while (bufferLength >= bufferPos && end > bufferLength) 770 loadMore(); 771 dst[] = buffer[bufferPos .. bufferPos = cast(typeof(bufferPos))end]; 772 } else { 773 // copy from wrapping buffer 774 size_t i; 775 while (i < dst.length) { 776 auto remaining = dst.length - i; 777 if (bufferPos < bufferLength) { 778 auto d = min(remaining, bufferLength - bufferPos); 779 dst[i .. i += d] = buffer[bufferPos .. bufferPos += d]; 780 } else if (bufferPos > bufferLength) { 781 auto d = buffer.length - bufferPos; 782 if (remaining >= d) { 783 dst[i .. i += d] = buffer[bufferPos .. $]; 784 dst[i .. i += bufferLength] = buffer[0 .. bufferPos = bufferLength]; 785 } else { 786 dst[i .. i += remaining] = buffer[bufferPos .. bufferPos += remaining]; 787 } 788 } else { 789 loadMore(); 790 } 791 } 792 } 793 } 794 795 immutable(ubyte)[] readUntilNull() { 796 auto ret = appender!(immutable(ubyte)[]); 797 while (true) { 798 ubyte[] chunk; 799 if (bufferPos < bufferLength) { 800 chunk = buffer[bufferPos .. bufferLength]; 801 } else if (bufferPos > bufferLength) { 802 chunk = buffer[bufferPos .. $]; 803 } else { 804 loadMore(); 805 continue; 806 } 807 808 auto zero = chunk.countUntil(0); 809 if (zero == -1) { 810 ret ~= chunk; 811 bufferPos += chunk.length; 812 if (bufferPos == buffer.length) 813 bufferPos = 0; 814 } else { 815 ret ~= chunk[0 .. zero]; 816 bufferPos += zero + 1; 817 return ret.data; 818 } 819 } 820 } 821 } 822 823 unittest { 824 class MockedSocketReceiveStream : SocketReceiveStream { 825 int maxChunk = 64; 826 ubyte generator = 0; 827 828 this() { 829 super(null); 830 } 831 832 protected override ptrdiff_t receiveSocket(ubyte[] buffer) { 833 if (buffer.length >= maxChunk) { 834 buffer[0 .. maxChunk] = ++generator; 835 return maxChunk; 836 } else { 837 buffer[] = ++generator; 838 return buffer.length; 839 } 840 } 841 } 842 843 auto stream = new MockedSocketReceiveStream(); 844 stream.buffer[] = 0; 845 auto b = stream.readBytes(3); 846 assert(b == [1, 1, 1]); 847 assert(stream.buffer[0 .. 64].all!(a => a == 1)); 848 assert(stream.buffer[64 .. $].all!(a => a == 0)); 849 b = stream.readBytes(3); 850 assert(b == [1, 1, 1]); 851 assert(stream.buffer[64 .. $].all!(a => a == 0)); 852 assert(stream.bufferPos == 6); 853 assert(stream.bufferLength == 64); 854 b = stream.readBytes(64); 855 assert(b[0 .. $ - 6].all!(a => a == 1)); 856 assert(b[$ - 6 .. $].all!(a => a == 2)); 857 assert(stream.bufferPos == 70); 858 assert(stream.bufferLength == 128); 859 b = stream.readBytes(57); 860 assert(b.all!(a => a == 2)); 861 assert(stream.bufferPos == 127); 862 assert(stream.bufferLength == 128); 863 b = stream.readBytes(8000); 864 assert(b[0] == 2); 865 assert(b[1 .. 65].all!(a => a == 3)); 866 assert(b[65 .. 129].all!(a => a == 4)); 867 868 stream.buffer[] = 0; 869 stream.bufferPos = stream.bufferLength = 0; 870 stream.generator = 0; 871 assert(stream.readUntilNull().length == 64 * 255); 872 } 873 874 struct SendBuffer { 875 private ubyte[4096] backing; 876 877 private ubyte[] buffer; 878 private size_t position; 879 880 ubyte[] data() { 881 assert(position > 4); 882 883 resetSize(); 884 return buffer[0 .. position]; 885 } 886 887 private void size(size_t addition) { 888 if(buffer is null) 889 buffer = backing[]; 890 while (position + addition > buffer.length) { 891 buffer.length = buffer.length * 2; 892 } 893 } 894 895 void add(const document d) { 896 SendBuffer sb; 897 898 sb.add(d.bytesCount); 899 foreach(v; d.values) 900 sb.add(v); 901 sb.addByte(0); 902 903 auto data = sb.data; 904 //import std.stdio; writefln("%(%x %)",data); 905 906 this.add(data); 907 } 908 909 void add(const MsgHeader header) { 910 add(header.messageLength); 911 add(header.requestID); 912 add(header.responseTo); 913 add(header.opCode); 914 } 915 916 void add(const bson_value v) { 917 addByte(v.tag); 918 add(v.name); 919 920 static struct visitor { 921 this(SendBuffer* buf) { this.buf = buf; } 922 SendBuffer* buf; 923 void visit(long v) { buf.add(v); } 924 void visit(int v) { buf.add(v); } 925 void visit(bool v) { buf.addByte(v ? 1 : 0); } 926 void visit(double v) { buf.add(v); } 927 void visit(const document v) { buf.add(v); } 928 void visit(const(char)[] v) { buf.add(cast(int) v.length + 1); buf.add(v); } 929 void visit(const typeof(null)) { } 930 void visit(ubyte tag, const(ubyte)[] v) { buf.add(cast(int) v.length); buf.addByte(tag); buf.add(v); } 931 void visit(const Undefined v) { } 932 void visit(const ObjectId v) { buf.add(v.v[]); } 933 void visit(const Javascript v) { buf.add(cast(int) v.v.length + 1); buf.add(v.v); } 934 void visit(const Timestamp v) { buf.add(v.v); } 935 void visit(const UtcTimestamp v) { buf.add(v.v); } 936 void visit(const Decimal128 v) { buf.add(v.v[]); } 937 void visit(const RegEx v) { buf.add(v.regex); buf.add(v.flags); } 938 939 void visit(const(bson_value)[] v) { buf.add(document(v)); } 940 } 941 942 visitor mv = visitor(&this); 943 v.visit(mv); 944 } 945 946 947 void addByte(ubyte a) { 948 size(1); 949 buffer[position++] = (a >> 0) & 0xff; 950 } 951 952 void add(double i) { 953 long a = *(cast(long*) &i); 954 add(a); 955 } 956 957 void add(int a) { 958 size(4); 959 buffer[position++] = (a >> 0) & 0xff; 960 buffer[position++] = (a >> 8) & 0xff; 961 buffer[position++] = (a >> 16) & 0xff; 962 buffer[position++] = (a >> 24) & 0xff; 963 } 964 965 void add(long a) { 966 size(8); 967 buffer[position++] = (a >> 0) & 0xff; 968 buffer[position++] = (a >> 8) & 0xff; 969 buffer[position++] = (a >> 16) & 0xff; 970 buffer[position++] = (a >> 24) & 0xff; 971 buffer[position++] = (a >> 32) & 0xff; 972 buffer[position++] = (a >> 40) & 0xff; 973 buffer[position++] = (a >> 48) & 0xff; 974 buffer[position++] = (a >> 56) & 0xff; 975 } 976 977 // does NOT write the length out first! 978 void add(const(char)[] a) { 979 size(a.length + 1); 980 buffer[position .. position + a.length] = cast(ubyte[]) a[]; 981 position += a.length; 982 buffer[position++] = 0; 983 } 984 985 // does NOT write the length out first! 986 void add(const(ubyte)[] a) { 987 size(a.length); 988 buffer[position .. position + a.length] = cast(ubyte[]) a[]; 989 position += a.length; 990 } 991 992 private void resetSize() { 993 auto sz = cast(int) position; 994 buffer[0] = (sz >> 0) & 0xff; 995 buffer[1] = (sz >> 8) & 0xff; 996 buffer[2] = (sz >> 16) & 0xff; 997 buffer[3] = (sz >> 24) & 0xff; 998 } 999 } 1000 1001 unittest { 1002 // A test for correct buffer allocation 1003 import std.algorithm; 1004 import std.range; 1005 import std.conv; 1006 1007 // a large BSON array 1008 auto anArray = iota(100000) 1009 .enumerate() 1010 .map!(tup => bson_value(tup.index.to!string, tup.value)) 1011 .array(); 1012 1013 auto doc = document([bson_value("array", anArray)]); 1014 SendBuffer buf; 1015 buf.add(doc); 1016 } 1017 1018 struct MsgHeader { 1019 int messageLength; 1020 int requestID; 1021 int responseTo; 1022 int opCode; 1023 } 1024 1025 enum OP { 1026 REPLY = 1, 1027 UPDATE = 2001, 1028 INSERT = 2002, 1029 QUERY = 2004, // sends a reply 1030 GET_MORE = 2005, // sends a reply 1031 DELETE = 2006, 1032 KILL_CURSORS = 2007, 1033 MSG = 2013 1034 } 1035 1036 struct OP_UPDATE { 1037 MsgHeader header; 1038 int zero; 1039 const(char)[] fullCollectionName; 1040 int flags; // bit 0 == upsert, bit 1 == multiupdate if 1041 document selector; 1042 document update; 1043 } 1044 1045 struct OP_INSERT { 1046 MsgHeader header; 1047 int flags; // bit 0 == ContinueOnError 1048 const(char)[] fullCollectionName; 1049 document[] documents; 1050 } 1051 1052 struct OP_QUERY { 1053 MsgHeader header; 1054 int flags; // SEE: https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-query 1055 const(char)[] fullCollectionName; 1056 int numberToSkip; 1057 int numberToReturn; 1058 document query; 1059 document returnFieldsSelector; // optional..... 1060 } 1061 1062 struct OP_GET_MORE { 1063 MsgHeader header; 1064 int zero; 1065 const(char)[] fullCollectionName; 1066 int numberToReturn; 1067 long cursorID; 1068 } 1069 1070 struct OP_DELETE { 1071 MsgHeader header; 1072 int zero; 1073 const(char)[] fullCollectionName; 1074 int flags; // bit 0 is single remove 1075 document selector; 1076 } 1077 1078 // If a cursor is read until exhausted (read until OP_QUERY or OP_GET_MORE returns zero for the cursor id), there is no need to kill the cursor. 1079 struct OP_KILL_CURSORS { 1080 MsgHeader header; 1081 int zero; 1082 int numberOfCursorIds; 1083 const(long)[] cursorIDs; 1084 } 1085 1086 /+ 1087 // in mongo 3.6 1088 struct OP_MSG { 1089 MsgHeader header, 1090 uint flagBits; 1091 Section[] 1092 } 1093 +/ 1094 1095 struct OP_REPLY { 1096 MsgHeader header; 1097 int responseFlags; // flag 0: cursor not found. 1: query failure. $err set in a thing. 1098 long cursorID; 1099 int startingFrom; 1100 int numberReturned; 1101 document[] documents; 1102 1103 /* range elements */ 1104 int numberToReturn; 1105 const(char)[] fullCollectionName; 1106 size_t current; 1107 MongoConnection connection; 1108 int limitRemaining = int.max; 1109 1110 string errorMessage; 1111 int errorCode; 1112 1113 @property bool empty() { 1114 return errorCode != 0 || numberReturned == 0 || limitRemaining <= 0; 1115 } 1116 1117 void popFront() { 1118 limitRemaining--; 1119 current++; 1120 if(current == numberReturned) { 1121 this = connection.getMore(fullCollectionName, numberToReturn, cursorID, limitRemaining); 1122 if(numberReturned == 1) { 1123 auto err = documents[0]["$err"]; 1124 auto ecode = documents[0]["code"]; 1125 if(err !is bson_value.init) { 1126 errorMessage = err.get!string; 1127 errorCode = ecode.get!int; 1128 } 1129 } 1130 } 1131 } 1132 1133 @property document front() { 1134 return documents[current]; 1135 } 1136 } 1137 1138 /* bson */ 1139 1140 struct document { 1141 private int bytesCount; 1142 private const(bson_value)[] values_; 1143 private ubyte terminatingZero; 1144 1145 const(bson_value) opIndex(string name) { 1146 foreach(value; values) 1147 if(value.name == name) 1148 return value; 1149 return bson_value.init; 1150 } 1151 1152 this(const(bson_value)[] values) { 1153 values_ = values; 1154 terminatingZero = 0; 1155 foreach(v; values) 1156 bytesCount += v.size; 1157 } 1158 1159 size_t length() const { 1160 return values_.length; 1161 } 1162 1163 const(bson_value)[] values() const { 1164 return values_; 1165 } 1166 1167 string toString(int indent = 0) const { 1168 string s; 1169 1170 s ~= "{\n"; 1171 1172 foreach(value; values) { 1173 foreach(i; 0 .. indent + 1) s ~= "\t"; 1174 s ~= value.name; 1175 s ~= ": "; 1176 s ~= value.toString(); 1177 s ~= "\n"; 1178 } 1179 1180 foreach(i; 0 .. indent) s ~= "\t"; 1181 s ~= "}\n"; 1182 1183 return s; 1184 } 1185 } 1186 1187 struct ObjectId { 1188 ubyte[12] v; 1189 string toString() const { 1190 import std.format; 1191 return format("ObjectId(%(%02x%))", v[]); 1192 } 1193 1194 this(const ubyte[12] v) { 1195 this.v = v; 1196 } 1197 1198 this(string s) { 1199 import std.algorithm; 1200 if(s.startsWith("ObjectId(")) 1201 s = s["ObjectId(".length .. $-1]; 1202 if(s.length != 24) 1203 throw new Exception("invalid object id: " ~ s); 1204 static int hexToDec(char c) { 1205 if(c >= '0' && c <= '9') 1206 return c - '0'; 1207 if(c >= 'A' && c <= 'F') 1208 return c - 'A' + 10; 1209 if(c >= 'a' && c <= 'f') 1210 return c - 'a' + 10; 1211 throw new Exception("Invalid hex char " ~ c); 1212 } 1213 foreach(ref b; v) { 1214 b = cast(ubyte) ((hexToDec(s[0]) << 4) | hexToDec(s[1])); 1215 s = s[2 .. $]; 1216 } 1217 } 1218 1219 int opCmp(ObjectId id) { 1220 import std.algorithm; 1221 return cmp(cast(ubyte[])this.v, cast(ubyte[])id.v); 1222 } 1223 } 1224 struct UtcTimestamp { 1225 long v; 1226 } 1227 struct RegEx { 1228 const(char)[] regex; 1229 const(char)[] flags; 1230 } 1231 struct Javascript { 1232 string v; 1233 } 1234 struct Timestamp { 1235 ulong v; 1236 } 1237 struct Decimal128 { 1238 ubyte[16] v; 1239 } 1240 struct Undefined {} 1241 1242 1243 struct ToStringVisitor(T) if (isSomeString!T) { 1244 import std.conv; 1245 1246 // of course this could have been a template but I wrote it out long-form for copy/paste purposes 1247 T visit(const double v) { return to!T(v); } 1248 T visit(const string v) { return to!T(v); } 1249 T visit(const document v) { return to!T(v); } 1250 T visit(const const(bson_value)[] v) { return to!T(v); } 1251 T visit(const ubyte tag, const ubyte[] v) { return to!T(v); } 1252 T visit(const ObjectId v) { return to!T(v); } 1253 T visit(const bool v) { return to!T(v); } 1254 T visit(const UtcTimestamp v) { return to!T(v); } 1255 T visit(const typeof(null)) { return to!T(null); } 1256 T visit(const RegEx v) { return to!T(v); } 1257 T visit(const Javascript v) { return to!T(v); } 1258 T visit(const int v) { return to!T(v); } 1259 T visit(const Undefined) { return "undefined".to!T; } 1260 T visit(const Timestamp v) { return to!T(v); } 1261 T visit(const long v) { return to!T(v); } 1262 T visit(const Decimal128 v) { return to!T(v); } 1263 } 1264 1265 struct GetVisitor(T) { 1266 T visit(V)(const V t) { 1267 static if (isIntegral!V) { 1268 static if(isIntegral!T || isFloatingPoint!T) 1269 return cast(T)t; 1270 else throw new Exception("incompatible type"); 1271 } else static if (isFloatingPoint!V) { 1272 static if(isFloatingPoint!T) 1273 return cast(T)t; 1274 else throw new Exception("incompatible type"); 1275 } else { 1276 static if(is(V : T)) 1277 return t; 1278 else throw new Exception("incompatible type"); 1279 } 1280 } 1281 1282 T visit(ubyte tag, const(ubyte)[] v) { 1283 static if(is(typeof(v) : T)) 1284 return v; 1285 else throw new Exception("incompatible type " ~ T.stringof); 1286 } 1287 } 1288 1289 struct bson_value { 1290 private ubyte tag; 1291 private const(char)[] e_name; 1292 1293 // It only allows integer types or const getting in order to work right in the visitor... 1294 /// Tries to get the value matching exactly this type. The type will convert 1295 /// between different floating point types and integral types as well as 1296 /// perform a conversion from integral types to floating point types. 1297 T get(T)() const if (__traits(compiles, GetVisitor!T)) { 1298 GetVisitor!T v; 1299 return visit(v); 1300 } 1301 1302 const(char)[] name() const { 1303 return e_name; 1304 } 1305 1306 this(const(char)[] name, bson_value v) { 1307 this = v; 1308 e_name = name; 1309 } 1310 1311 this(const(char)[] name, double v) { 1312 e_name = name; 1313 tag = 0x01; 1314 x01 = v; 1315 } 1316 this(const(char)[] name, string v) { 1317 e_name = name; 1318 tag = 0x02; 1319 x02 = v; 1320 } 1321 this(const(char)[] name, document v) { 1322 e_name = name; 1323 tag = 0x03; 1324 x03 = v; 1325 } 1326 this(const(char)[] name, bson_value[] v) { 1327 e_name = name; 1328 tag = 0x04; 1329 const(bson_value)[] n; 1330 n.reserve(v.length); 1331 import std.conv; 1332 foreach(idx, i; v) 1333 n ~= bson_value(to!string(idx), i); 1334 x04 = document(n); 1335 } 1336 this(const(char)[] name, const(ubyte)[] v, ubyte tag = 0x00) { 1337 e_name = name; 1338 this.tag = 0x05; 1339 x05_tag = tag; 1340 x05_data = v; 1341 } 1342 this(const(char)[] name, ObjectId v) { 1343 e_name = name; 1344 tag = 0x07; 1345 x07 = v.v; 1346 } 1347 this(const(char)[] name, bool v) { 1348 e_name = name; 1349 tag = 0x08; 1350 x08 = v; 1351 } 1352 this(const(char)[] name, UtcTimestamp v) { 1353 e_name = name; 1354 tag = 0x09; 1355 x09 = v.v; 1356 } 1357 this(const(char)[] name, typeof(null)) { 1358 e_name = name; 1359 tag = 0x0a; 1360 } 1361 this(const(char)[] name, RegEx v) { 1362 e_name = name; 1363 tag = 0x0b; 1364 x0b_regex = v.regex; 1365 x0b_flags = v.flags; 1366 } 1367 this(const(char)[] name, Javascript v) { 1368 e_name = name; 1369 tag = 0x0d; 1370 x0d = v.v; 1371 } 1372 this(const(char)[] name, int v) { 1373 e_name = name; 1374 tag = 0x10; 1375 x10 = v; 1376 } 1377 this(const(char)[] name, Timestamp v) { 1378 e_name = name; 1379 tag = 0x11; 1380 x11 = v.v; 1381 } 1382 this(const(char)[] name, long v) { 1383 e_name = name; 1384 tag = 0x12; 1385 x12 = v; 1386 } 1387 this(const(char)[] name, Decimal128 v) { 1388 e_name = name; 1389 tag = 0x13; 1390 x13 = v.v; 1391 } 1392 1393 auto visit(T)(T t) const { 1394 switch(tag) { 1395 case 0x00: throw new Exception("invalid bson"); 1396 case 0x01: return t.visit(x01); 1397 case 0x02: return t.visit(x02); 1398 case 0x03: return t.visit(x03); 1399 case 0x04: return t.visit(x04.values); 1400 case 0x05: return t.visit(x05_tag, x05_data); 1401 case 0x06: return t.visit(Undefined()); 1402 case 0x07: return t.visit(ObjectId(x07)); 1403 case 0x08: return t.visit(x08); 1404 case 0x09: return t.visit(UtcTimestamp(x09)); 1405 case 0x0a: return t.visit(null); 1406 case 0x0b: return t.visit(RegEx(x0b_regex, x0b_flags)); 1407 case 0x0d: return t.visit(Javascript(x0d)); 1408 case 0x10: return t.visit(x10); 1409 case 0x11: return t.visit(Timestamp(x11)); 1410 case 0x12: return t.visit(x12); 1411 case 0x13: return t.visit(Decimal128(x13)); 1412 default: 1413 import std.conv; 1414 assert(0, "unsupported tag in bson: " ~ to!string(tag)); 1415 1416 } 1417 } 1418 1419 string toString() const { 1420 ToStringVisitor!string v; 1421 return visit(v); 1422 } 1423 1424 private union { 1425 void* zero; 1426 double x01; 1427 string x02; // given with an int length preceding and 0 terminator 1428 document x03; // child object 1429 document x04; // array with indexes as key values 1430 struct { 1431 ubyte x05_tag; 1432 const(ubyte)[] x05_data; // binary data 1433 } 1434 void* undefined; 1435 ubyte[12] x07; // a guid/ ObjectId 1436 bool x08; 1437 long x09; // utc datetime stamp 1438 void* x0a; // null 1439 struct { 1440 const(char)[] x0b_regex; 1441 const(char)[] x0b_flags; // alphabetized 1442 } 1443 string x0d; // javascript 1444 int x10; // integer!!! omg 1445 ulong x11; // timestamp 1446 long x12; // integer!!! 1447 ubyte[16] x13; // decimal 128 1448 } 1449 1450 size_t size() const { 1451 auto count = 2 + e_name.length; 1452 1453 switch(this.tag) { 1454 case 0x00: // not supposed to exist! 1455 throw new Exception("invalid bson"); 1456 case 0x01: 1457 count += 8; 1458 break; 1459 case 0x02: 1460 count += 5 + x02.length; // length and zero 1461 break; 1462 case 0x03: 1463 count += x03.bytesCount; 1464 break; 1465 case 0x04: 1466 count += x04.bytesCount; 1467 break; 1468 case 0x05: 1469 count += x05_data.length; 1470 count += 5; // length and tag 1471 break; 1472 case 0x06: // undefined 1473 // intentionally blank, no additional data 1474 break; 1475 case 0x07: 1476 count += x07.length; 1477 break; 1478 case 0x08: 1479 count++; 1480 break; 1481 case 0x09: 1482 count += 8; 1483 break; 1484 case 0x0a: // null 1485 // intentionally blank, no additional data 1486 break; 1487 case 0x0b: 1488 count += x0b_regex.length + 1; 1489 count += x0b_flags.length + 1; 1490 break; 1491 case 0x0d: 1492 count += 5 + x0d.length; // length and zero 1493 break; 1494 case 0x10: 1495 count += 4; 1496 break; 1497 case 0x11: 1498 count += 8; 1499 break; 1500 case 0x12: 1501 count += 8; 1502 break; 1503 case 0x13: 1504 count += x13.length; 1505 break; 1506 default: 1507 import std.conv; 1508 assert(0, "unsupported tag in bson: " ~ to!string(tag)); 1509 } 1510 1511 return count; 1512 } 1513 } 1514 1515 struct UriHost { 1516 string host; 1517 int port; 1518 } 1519 1520 /* copy/pasted from arsd.cgi, modified for mongo-specific comma behavior. used with permission */ 1521 struct Uri { 1522 // scheme//userinfo@host:port/path?query#fragment 1523 1524 string scheme; /// e.g. "http" in "http://example.com/" 1525 string userinfo; /// the username (and possibly a password) in the uri 1526 UriHost[] hosts; 1527 string path; /// e.g. "/folder/file.html" in "http://example.com/folder/file.html" 1528 string query; /// the stuff after the ? in a uri 1529 string fragment; /// the stuff after the # in a uri. 1530 1531 /// Breaks down a uri string to its components 1532 this(string uri) { 1533 reparse(uri); 1534 } 1535 1536 private void reparse(string uri) { 1537 // from RFC 3986 1538 // the ctRegex triples the compile time and makes ugly errors for no real benefit 1539 // it was a nice experiment but just not worth it. 1540 // enum ctr = ctRegex!r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?"; 1541 /* 1542 Captures: 1543 0 = whole url 1544 1 = scheme, with : 1545 2 = scheme, no : 1546 3 = authority, with // 1547 4 = authority, no // 1548 5 = path 1549 6 = query string, with ? 1550 7 = query string, no ? 1551 8 = anchor, with # 1552 9 = anchor, no # 1553 */ 1554 // Yikes, even regular, non-CT regex is also unacceptably slow to compile. 1.9s on my computer! 1555 // instead, I will DIY and cut that down to 0.6s on the same computer. 1556 /* 1557 1558 Note that authority is 1559 user:password@domain:port 1560 where the user:password@ part is optional, and the :port is optional. 1561 1562 Regex translation: 1563 1564 Scheme cannot have :, /, ?, or # in it, and must have one or more chars and end in a :. It is optional, but must be first. 1565 Authority must start with //, but cannot have any other /, ?, or # in it. It is optional. 1566 Path cannot have any ? or # in it. It is optional. 1567 Query must start with ? and must not have # in it. It is optional. 1568 Anchor must start with # and can have anything else in it to end of string. It is optional. 1569 */ 1570 1571 this = Uri.init; // reset all state 1572 1573 // empty uri = nothing special 1574 if(uri.length == 0) { 1575 return; 1576 } 1577 1578 size_t idx; 1579 1580 scheme_loop: foreach(char c; uri[idx .. $]) { 1581 switch(c) { 1582 case ':': 1583 case '/': 1584 case '?': 1585 case '#': 1586 break scheme_loop; 1587 default: 1588 } 1589 idx++; 1590 } 1591 1592 if(idx == 0 && uri[idx] == ':') { 1593 // this is actually a path! we skip way ahead 1594 goto path_loop; 1595 } 1596 1597 if(idx == uri.length) { 1598 // the whole thing is a path, apparently 1599 path = uri; 1600 return; 1601 } 1602 1603 if(idx > 0 && uri[idx] == ':') { 1604 scheme = uri[0 .. idx]; 1605 idx++; 1606 } else { 1607 // we need to rewind; it found a / but no :, so the whole thing is prolly a path... 1608 idx = 0; 1609 } 1610 1611 if(idx + 2 < uri.length && uri[idx .. idx + 2] == "//") { 1612 // we have an authority.... 1613 idx += 2; 1614 1615 auto authority_start = idx; 1616 authority_loop: foreach(char c; uri[idx .. $]) { 1617 switch(c) { 1618 case '/': 1619 case '?': 1620 case '#': 1621 break authority_loop; 1622 default: 1623 } 1624 idx++; 1625 } 1626 1627 auto authority = uri[authority_start .. idx]; 1628 1629 auto idx2 = authority.indexOf("@"); 1630 if(idx2 != -1) { 1631 userinfo = authority[0 .. idx2]; 1632 authority = authority[idx2 + 1 .. $]; 1633 } 1634 1635 auto remaining = authority; 1636 1637 while(remaining.length) { 1638 auto idx3 = remaining.indexOf(","); 1639 if(idx3 != -1) { 1640 authority = remaining[0 .. idx3]; 1641 remaining = remaining[idx3 + 1 .. $]; 1642 } else { 1643 authority = remaining; 1644 remaining = null; 1645 } 1646 1647 string host; 1648 int port; 1649 1650 idx2 = authority.indexOf(":"); 1651 if(idx2 == -1) { 1652 port = 0; // 0 means not specified; we should use the default for the scheme 1653 host = authority; 1654 } else { 1655 host = authority[0 .. idx2]; 1656 port = to!int(authority[idx2 + 1 .. $]); 1657 } 1658 1659 hosts ~= UriHost(host, port); 1660 } 1661 } 1662 1663 path_loop: 1664 auto path_start = idx; 1665 1666 foreach(char c; uri[idx .. $]) { 1667 if(c == '?' || c == '#') 1668 break; 1669 idx++; 1670 } 1671 1672 path = uri[path_start .. idx]; 1673 1674 if(idx == uri.length) 1675 return; // nothing more to examine... 1676 1677 if(uri[idx] == '?') { 1678 idx++; 1679 auto query_start = idx; 1680 foreach(char c; uri[idx .. $]) { 1681 if(c == '#') 1682 break; 1683 idx++; 1684 } 1685 query = uri[query_start .. idx]; 1686 } 1687 1688 if(idx < uri.length && uri[idx] == '#') { 1689 idx++; 1690 fragment = uri[idx .. $]; 1691 } 1692 1693 // uriInvalidated = false; 1694 } 1695 } 1696 /* end */ 1697 1698 1699 version(linux) 1700 @trusted 1701 extern(C) 1702 int getentropy(scope void*, size_t); 1703 else version(Windows) { 1704 import core.sys.windows.windows; 1705 import core.sys.windows.ntdef; 1706 enum STATUS_SUCCESS = 0; 1707 pragma(lib, "bcrypt"); 1708 @trusted 1709 extern(Windows) 1710 NTSTATUS BCryptGenRandom(scope void*, PUCHAR, ULONG, ULONG); 1711 } else static assert(0); 1712 1713 1714 1715 1716 // copy pasted from vibe.d; https://raw.githubusercontent.com/vibe-d/vibe.d/master/mongodb/vibe/db/mongo/sasl.d 1717 1718 /* 1719 SASL authentication functions 1720 1721 Copyright: © 2012-2016 Nicolas Gurrola 1722 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 1723 Authors: Nicolas Gurrola 1724 */ 1725 1726 import std.algorithm; 1727 import std.base64; 1728 import std.conv; 1729 import std.digest.hmac; 1730 import std.digest.sha; 1731 import std.exception; 1732 import std.format; 1733 import std..string; 1734 import std.traits; 1735 import std.utf; 1736 1737 @safe: 1738 1739 package struct ScramState 1740 { 1741 @safe: 1742 1743 private string m_firstMessageBare; 1744 private string m_nonce; 1745 private DigestType!SHA1 m_saltedPassword; 1746 private string m_authMessage; 1747 1748 string createInitialRequest(string user) 1749 { 1750 ubyte[18] randomBytes; 1751 version(linux) { 1752 if(getentropy(&(randomBytes[0]), randomBytes.length) != 0) 1753 throw new Exception("get random failure"); 1754 } else version(Windows) { 1755 if(BCryptGenRandom(null, &(randomBytes[0]), randomBytes.length, 2 /*BCRYPT_USE_SYSTEM_PREFERRED_RNG */) != STATUS_SUCCESS) 1756 throw new Exception("get random failure"); 1757 } else static assert(0); 1758 1759 m_nonce = Base64.encode(randomBytes); 1760 1761 m_firstMessageBare = format("n=%s,r=%s", escapeUsername(user), m_nonce); 1762 return format("n,,%s", m_firstMessageBare); 1763 } 1764 1765 version (unittest) private string createInitialRequestWithFixedNonce(string user, string nonce) 1766 { 1767 m_nonce = nonce; 1768 1769 m_firstMessageBare = format("n=%s,r=%s", escapeUsername(user), m_nonce); 1770 return format("n,,%s", m_firstMessageBare); 1771 } 1772 1773 // MongoDB drivers require 4096 min iterations https://github.com/mongodb/specifications/blob/59390a7ab2d5c8f9c29b8af1775ff25915c44036/source/auth/auth.rst#scram-sha-1 1774 string update(string password, string challenge, int minIterations = 4096) 1775 { 1776 string serverFirstMessage = challenge; 1777 1778 string next = challenge.find(','); 1779 if (challenge.length < 2 || challenge[0 .. 2] != "r=" || next.length < 3 || next[1 .. 3] != "s=") 1780 throw new Exception("Invalid server challenge format: " ~ challenge); 1781 string serverNonce = challenge[2 .. $ - next.length]; 1782 challenge = next[3 .. $]; 1783 next = challenge.find(','); 1784 ubyte[] salt = Base64.decode(challenge[0 .. $ - next.length]); 1785 1786 if (next.length < 3 || next[1 .. 3] != "i=") 1787 throw new Exception("Invalid server challenge format"); 1788 int iterations = next[3 .. $].to!int(); 1789 1790 if (iterations < minIterations) 1791 throw new Exception("Server must request at least " ~ minIterations.to!string ~ " iterations"); 1792 1793 if (serverNonce[0 .. m_nonce.length] != m_nonce) 1794 throw new Exception("Invalid server nonce received"); 1795 string finalMessage = format("c=biws,r=%s", serverNonce); 1796 1797 m_saltedPassword = pbkdf2(password.representation, salt, iterations); 1798 m_authMessage = format("%s,%s,%s", m_firstMessageBare, serverFirstMessage, finalMessage); 1799 1800 auto proof = getClientProof(m_saltedPassword, m_authMessage); 1801 return format("%s,p=%s", finalMessage, Base64.encode(proof)); 1802 } 1803 1804 string finalize(string challenge) 1805 { 1806 if (challenge.length < 2 || challenge[0 .. 2] != "v=") 1807 { 1808 throw new Exception("Invalid server signature format"); 1809 } 1810 if (!verifyServerSignature(Base64.decode(challenge[2 .. $]), m_saltedPassword, m_authMessage)) 1811 { 1812 throw new Exception("Invalid server signature"); 1813 } 1814 return null; 1815 } 1816 1817 private static string escapeUsername(string user) 1818 { 1819 char[] buffer; 1820 foreach (i, dchar ch; user) 1821 { 1822 if (ch == ',' || ch == '=') { 1823 if (!buffer) { 1824 buffer.reserve(user.length + 2); 1825 buffer ~= user[0 .. i]; 1826 } 1827 if (ch == ',') 1828 buffer ~= "=2C"; 1829 else 1830 buffer ~= "=3D"; 1831 } else if (buffer) 1832 encode(buffer, ch); 1833 } 1834 return buffer ? () @trusted { return assumeUnique(buffer); } () : user; 1835 } 1836 1837 unittest 1838 { 1839 string user = "user"; 1840 assert(escapeUsername(user) == user); 1841 assert(escapeUsername(user) is user); 1842 assert(escapeUsername("user,1") == "user=2C1"); 1843 assert(escapeUsername("user=1") == "user=3D1"); 1844 assert(escapeUsername("u,=ser1") == "u=2C=3Dser1"); 1845 assert(escapeUsername("u=se=r1") == "u=3Dse=3Dr1"); 1846 } 1847 1848 private static auto getClientProof(DigestType!SHA1 saltedPassword, string authMessage) 1849 { 1850 auto clientKey = () @trusted { return hmac!SHA1("Client Key".representation, saltedPassword); } (); 1851 auto storedKey = sha1Of(clientKey); 1852 auto clientSignature = () @trusted { return hmac!SHA1(authMessage.representation, storedKey); } (); 1853 1854 foreach (i; 0 .. clientKey.length) 1855 { 1856 clientKey[i] = clientKey[i] ^ clientSignature[i]; 1857 } 1858 return clientKey; 1859 } 1860 1861 private static bool verifyServerSignature(ubyte[] signature, DigestType!SHA1 saltedPassword, string authMessage) 1862 @trusted { 1863 auto serverKey = hmac!SHA1("Server Key".representation, saltedPassword); 1864 auto serverSignature = hmac!SHA1(authMessage.representation, serverKey); 1865 return serverSignature == signature; 1866 } 1867 } 1868 1869 private DigestType!SHA1 pbkdf2(const ubyte[] password, const ubyte[] salt, int iterations) 1870 { 1871 import std.bitmanip; 1872 1873 ubyte[4] intBytes = [0, 0, 0, 1]; 1874 auto last = () @trusted { return hmac!SHA1(salt, intBytes[], password); } (); 1875 static assert(isStaticArray!(typeof(last)), 1876 "Code is written so that the hash array is expected to be placed on the stack"); 1877 auto current = last; 1878 foreach (i; 1 .. iterations) 1879 { 1880 last = () @trusted { return hmac!SHA1(last[], password); } (); 1881 foreach (j; 0 .. current.length) 1882 { 1883 current[j] = current[j] ^ last[j]; 1884 } 1885 } 1886 return current; 1887 } 1888 1889 unittest { 1890 ScramState state; 1891 assert(state.createInitialRequestWithFixedNonce("user", "fyko+d2lbbFgONRv9qkxdawL") 1892 == "n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL"); 1893 auto last = state.update(makeDigest("user", "pencil"), 1894 "r=fyko+d2lbbFgONRv9qkxdawLHo+Vgk7qvUOKUwuWLIWg4l/9SraGMHEE,s=rQ9ZY3MntBeuP3E1TDVC4w==,i=10000"); 1895 assert(last == "c=biws,r=fyko+d2lbbFgONRv9qkxdawLHo+Vgk7qvUOKUwuWLIWg4l/9SraGMHEE,p=MC2T8BvbmWRckDw8oWl5IVghwCY=", 1896 last); 1897 last = state.finalize("v=UMWeI25JD1yNYZRMpZ4VHvhZ9e0="); 1898 assert(last == "", last); 1899 } 1900 1901 string makeDigest(string username, string password) { 1902 import std.digest.md; 1903 return md5Of(username ~ ":mongo:" ~ password).toHexString().idup.toLower(); 1904 }