1 /++
2 	Standalone Mongo driver implementation.
3 
4 	Copyright: 2020 Symmetry Investments with portion (c) 2012-2016 Nicolas Gurrola
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 +/
7 module kaleidic.mongo_standalone;
8 
9 import std.array;
10 import std.bitmanip;
11 import std.socket;
12 
13 static immutable string mongoDriverName = "mongo-standalone";
14 static immutable string mongoDriverVersion = "0.0.9";
15 
16 // just a demo of how it can be used
17 version(Demo)
18 void main() {
19 
20 	/+
21 	string omg;
22 	foreach(a; 0 .. 8000)
23 		omg ~= "a";
24 
25 
26 		document doc1 = document([
27 			bson_value("foo", 12),
28 			bson_value("bar", 12.4),
29 			bson_value("baz", 12L),
30 			bson_value("faz", omg),
31 			bson_value("eaz", RegEx("asdasdsa", "i")),
32 			bson_value("asds", null),
33 			bson_value("sub", document([
34 				bson_value("sub1", 12)
35 			])),
36 		]);
37 		+/
38 
39 	//auto connection = new MongoConnection("mongodb://testuser:testpassword@localhost/test?slaveOk=true");
40 	auto connection = new MongoConnection("mongodb://localhost/test?slaveOk=true", "testuser", "testpassword");
41 
42 	connection.update("test.world", true, false, document([bson_value("_id", ObjectId("5ef17aea55c0abb51fbb53bc"))]),
43 		document([
44 			//bson_value("_id", ObjectId("5ef17aea55c0abb51fbb53bc")),
45 			bson_value("replaced","true")
46 		])
47 	);
48 
49 	//connection.insert(false, "test.world", [doc1]);
50 
51 	import std.stdio;
52 	//writeln(connection.query(0, "world", 0, 1, document.init));
53 	auto answer = connection.query("test.world", 35, 1, document.init, document.init, 1);
54 	writeln(answer);
55 }
56 
57 enum QueryFlags {
58 	TailableCursor = (1 << 1),
59 	SlaveOk = (1 << 2),
60 	NoCursorTimeout = (1 << 4),
61 	AwaitData = (1 << 5),
62 	Exhaust = (1 << 6),
63 	Partial = (1 << 7)
64 }
65 
66 /// Server Wire version indicating supported features
67 /// $(LINK https://github.com/mongodb/specifications/blob/master/source/wireversion-featurelist.rst)
68 enum WireVersion {
69 	/// Pre 2.6 (-2.4)
70 	old = 0,
71 	/// Server version 2.6
72 	v26 = 1,
73 	/// Server version 2.6
74 	v26_2 = 2,
75 	/// Server version 3.0
76 	v30 = 3,
77 	/// Server version 3.2
78 	v32 = 4,
79 	/// Server version 3.4
80 	v34 = 5,
81 	/// Server version 3.6
82 	v36 = 6,
83 	/// Server version 4.0
84 	v40 = 7,
85 	/// Server version 4.2
86 	v42 = 8,
87 }
88 
89 class MongoConnection {
90 
91 	Socket socket;
92 	IReceiveStream stream;
93 
94 	uint defaultQueryFlags;
95 
96 	/// Reported minimum wire version by the server
97 	WireVersion minWireVersion;
98 	/// Reported maximum wire version by the server
99 	WireVersion maxWireVersion;
100 
101 	private document handshake(string authDatabase, string username,
102 			document application) {
103 		import std.compiler : compilerName = name, version_major, version_minor;
104 		import std.conv : text, to;
105 		import std.system : os;
106 
107 		auto dbcmd = authDatabase ~ ".$cmd";
108 
109 		static immutable osType = os.to!string;
110 
111 		version (X86_64)
112 			static immutable osArchitecture = "x86_64";
113 		else version (X86)
114 			static immutable osArchitecture = "x86";
115 		else version (ARM)
116 			static immutable osArchitecture = "arm";
117 		else version (PPC64)
118 			static immutable osArchitecture = "ppc64";
119 		else version (PPC)
120 			static immutable osArchitecture = "ppc";
121 		else
122 			static assert(false, "no name for this architecture");
123 
124 		static immutable platform = text(compilerName, " v", version_major, ".", version_minor);
125 
126 		bson_value[] client;
127 		if (application.length)
128 		{
129 			auto name = application["name"];
130 			if (name == bson_value.init)
131 				throw new Exception("Given application without name");
132 			// https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#limitations
133 			if (name.toString().length > 128)
134 				throw new Exception("Application name must not exceed 128 bytes");
135 
136 			client ~= bson_value("application", application);
137 		}
138 
139 		client ~= [
140 			bson_value("driver", document([
141 				bson_value("name", mongoDriverName),
142 				bson_value("version", mongoDriverVersion)
143 			])),
144 			bson_value("os", document([
145 				bson_value("type", osType),
146 				bson_value("architecture", osArchitecture)
147 			])),
148 			bson_value("platform", platform)
149 		];
150 
151 		auto cmd = [
152 			bson_value("isMaster", 1),
153 			bson_value("client", document(client))
154 		];
155 		if (username.length)
156 			cmd ~= bson_value("saslSupportedMechs", authDatabase ~ "." ~ username);
157 
158 		auto reply = query(dbcmd, 0, -1, document(cmd));
159 		if (reply.documents.length != 1 || reply.documents[0]["ok"].get!double != 1)
160 			throw new Exception("MongoDB Handshake failed");
161 
162 		return reply.documents[0];
163 	}
164 
165 	private void authenticateScramSha1(string authDatabase, string username,
166 			string password) {
167 		auto dbcmd = authDatabase ~ ".$cmd";
168 
169 		bson_value conversationId;
170 
171 		ScramState state;
172 
173 		ubyte[] payload = cast(ubyte[]) state.createInitialRequest(username);
174 
175 		auto cmd = document([
176 			bson_value("saslStart", 1),
177 			bson_value("mechanism", "SCRAM-SHA-1"),
178 			bson_value("payload", payload),
179 			bson_value("options", document([
180 				bson_value("skipEmptyExchange", true)
181 			]))
182 		]);
183 
184 
185 		auto firstReply = query(dbcmd, 0, -1, cmd);
186 		if(firstReply.documents.length != 1 || firstReply.documents[0]["ok"].get!double != 1)
187 			throw new Exception("Auth failed at first step (username)");
188 
189 		conversationId = cast(bson_value) firstReply.documents[0]["conversationId"];
190 
191 		auto response = firstReply.documents[0]["payload"].get!(const(ubyte[]));
192 		
193 		const digest = makeDigest(username, password);
194 
195 		payload = cast(typeof(payload)) state.update(digest, cast(string) response);
196 
197 		cmd = document([
198 			bson_value("saslContinue", 1),
199 			bson_value("conversationId", conversationId),
200 			bson_value("payload", payload)
201 		]);
202 
203 		auto secondReply = query(dbcmd, 0, -1, cmd);
204 		if(secondReply.documents.length != 1)
205 			throw new Exception("Auth error at second step");
206 
207 		if(secondReply.documents[0]["ok"].get!double != 1)
208 			throw new Exception("Auth failed at second step (password)");
209 
210 		auto response2 = secondReply.documents[0]["payload"].get!(const(ubyte[]));
211 
212 		payload = cast(typeof(payload)) state.finalize(cast(string) response2);
213 
214 		// newer servers can save a roundtrip (they know the password here already)
215 		if(secondReply.documents[0]["done"].get!bool)
216 			return;
217 
218 		cmd = document([
219 			bson_value("saslContinue", 1),
220 			bson_value("conversationId", conversationId),
221 			bson_value("payload", payload)
222 		]);
223 
224 		auto finalReply = query(dbcmd, 0, -1, cmd);
225 		if(finalReply.documents.length != 1)
226 			throw new Exception("Auth error at final step");
227 
228 		if(finalReply.documents[0]["ok"].get!double != 1)
229 			throw new Exception("Auth failed at final step");
230 
231 		if(!finalReply.documents[0]["done"].get!bool)
232 			throw new Exception("Authentication didn't respond 'done'");
233 	}
234 
235 	/++
236 		Connects to a Mongo database. You can give a username and password in the
237 		connection string if URL encoded, OR you can pass it as arguments to this
238 		constructor.
239 
240 		Please note that username/password in the connectionString will OVERRIDE
241 		ones given as separate arguments.
242 	+/
243 	this(string connectionString, string defaultUsername = null, string defaultPassword = null) {
244 		import std.uri : decodeComponent;
245 		import std..string;
246 
247 		auto uri = Uri(connectionString);
248 
249 		if(uri.hosts.length == 0)
250 			throw new Exception("no host given in connection string");
251 
252 		foreach(ref p; uri.hosts) {
253 			if(p.port == 0)
254 				p.port = 27017;
255 		}
256 
257 		string host = decodeComponent(uri.hosts[0].host);
258 		if(host.length == 0)
259 			host = "localhost";
260 
261 		// FIXME: break up the host into the list and at least
262 		// pretend to care about replication sets
263 
264 		string authDb = "admin";
265 		string appName;
266 
267 		string username = defaultUsername;
268 		string password = defaultPassword;
269 
270 		auto split = uri.userinfo.indexOf(":");
271 		if(split != -1) {
272 			username = decodeComponent(uri.userinfo[0 .. split]);
273 			password = decodeComponent(uri.userinfo[split + 1 .. $]);
274 		}
275 		if(uri.path.length > 1)
276 			authDb = uri.path[1 .. $];
277 
278 		bool ssl;
279 
280 		foreach(part; uri.query.splitter("&")) {
281 			split = part.indexOf("=");
282 			auto name = decodeComponent(part[0 .. split]);
283 			auto value = decodeComponent(part[split + 1 .. $]);
284 
285 			// https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options
286 			switch(name) {
287 				case "slaveOk": defaultQueryFlags |= QueryFlags.SlaveOk; break;
288 				case "appName": appName = value; break;
289 				case "authSource": authDb = value; break;
290 				case "tls", "ssl": ssl = value == "true"; break;
291 				default: throw new Exception("Unsupported mongo db connect option: " ~ name);
292 			}
293 		}
294 
295 		if(host[0] == '/') {
296 			version(Posix) {
297 				socket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
298 				socket.connect(new UnixAddress(host));
299 			} else throw new Exception("Cannot use unix socket on Windows at this time");
300 		} else {
301 			if(ssl)
302 				throw new Exception("ssl/tls connections not supported by this driver");
303 			socket = new TcpSocket(new InternetAddress(host, cast(ushort) uri.hosts[0].port));
304 		}
305 
306 		stream = new SocketReceiveStream(socket);
307 
308 		document appArgs;
309 		if (appName.length)
310 			appArgs = document([bson_value("name", appName)]);
311 		auto handshakeResponse = handshake(authDb, username, appArgs);
312 
313 		minWireVersion = cast(WireVersion)handshakeResponse["minWireVersion"].get!int;
314 		maxWireVersion = cast(WireVersion)handshakeResponse["maxWireVersion"].get!int;
315 
316 		bool supportsScramSha1 = maxWireVersion >= WireVersion.v30;
317 		bool supportsScramSha256 = maxWireVersion >= WireVersion.v40;
318 
319 		auto saslSupportedMechs = handshakeResponse["saslSupportedMechs"];
320 		if (saslSupportedMechs != bson_value.init) {
321 			auto arr = saslSupportedMechs.get!(const(bson_value)[]);
322 			supportsScramSha1 = false;
323 			supportsScramSha256 = false;
324 			foreach (v; arr) {
325 				switch (v.toString) {
326 				case "SCRAM-SHA-1":
327 					supportsScramSha1 = true;
328 					break;
329 				case "SCRAM-SHA-256":
330 					supportsScramSha256 = true;
331 					break;
332 				default:
333 					// unsupported mechanism
334 					break;
335 				}
336 			}
337 		}
338 
339 		// https://github.com/mongodb/specifications/blob/master/source/auth/auth.rst#supported-authentication-methods
340 		if (username.length) {
341 			// TODO: support other (certificate based) authentication mechanisms
342 
343 			// TODO: SCRAM-SHA-256 support
344 			// if (supportsScramSha256) {
345 			// } else
346 			if (supportsScramSha1) {
347 				authenticateScramSha1(authDb, username, password);
348 			} else {
349 				if (maxWireVersion < WireVersion.v30)
350 					throw new Exception("legacy MONGODB-CR authentication not implemented");
351 				else
352 					throw new Exception(
353 						"Cannot authenticate because no common authentication mechanism could be found.");
354 			}
355 		}
356 	}
357 
358 	void update(const(char)[] fullCollectionName, bool upsert, bool multiupdate, document selector, document update) {
359 		MsgHeader header;
360 
361 		header.requestID = ++nextRequestId;
362 		header.opCode = OP.UPDATE;
363 
364 		SendBuffer sb;
365 
366 		sb.add(header);
367 
368 		int zero;
369 		sb.add(zero);
370 
371 		sb.add(fullCollectionName);
372 		int flags;
373 		if(upsert) flags |= 1;
374 		if(multiupdate) flags |= 2;
375 		sb.add(flags);
376 		sb.add(selector);
377 		sb.add(update);
378 
379 		send(sb.data);
380 	}
381 
382 	void insert(bool continueOnError, const(char)[] fullCollectionName, document[] documents) {
383 		MsgHeader header;
384 
385 		header.requestID = ++nextRequestId;
386 		header.opCode = OP.INSERT;
387 
388 		SendBuffer sb;
389 
390 		sb.add(header);
391 
392 		int flags = continueOnError ? 1 : 0;
393 		sb.add(flags);
394 		sb.add(fullCollectionName);
395 		foreach(doc; documents)
396 			sb.add(doc);
397 
398 		send(sb.data);
399 	}
400 
401 	OP_REPLY getMore(const(char)[] fullCollectionName, int numberToReturn, long cursorId, int limitTotalEntries = int.max) {
402 		MsgHeader header;
403 
404 		header.requestID = ++nextRequestId;
405 		header.opCode = OP.GET_MORE;
406 
407 		SendBuffer sb;
408 
409 		sb.add(header);
410 
411 		int zero;
412 		sb.add(zero);
413 		sb.add(fullCollectionName);
414 		sb.add(numberToReturn);
415 		sb.add(cursorId);
416 
417 		send(sb.data);
418 		
419 		auto reply = stream.readReply();
420 
421 		reply.numberToReturn = numberToReturn;
422 		reply.fullCollectionName = fullCollectionName;
423 		reply.current = 0;
424 		reply.connection = this;
425 
426 		return reply;
427 	}
428 
429 	void delete_(const(char)[] fullCollectionName, bool singleRemove, document selector) {
430 		MsgHeader header;
431 
432 		header.requestID = ++nextRequestId;
433 		header.opCode = OP.DELETE;
434 
435 		SendBuffer sb;
436 
437 		sb.add(header);
438 
439 		int zero;
440 		sb.add(0);
441 		sb.add(fullCollectionName);
442 		int flags = singleRemove ? 1 : 0;
443 		sb.add(flags);
444 		sb.add(selector);
445 
446 		send(sb.data);
447 	}
448 
449 	void killCursors(const(long)[] cursorIds) {
450 		MsgHeader header;
451 
452 		header.requestID = ++nextRequestId;
453 		header.opCode = OP.KILL_CURSORS;
454 
455 		SendBuffer sb;
456 
457 		sb.add(header);
458 		int zero = 0;
459 		sb.add(zero);
460 
461 		sb.add(cast(int) cursorIds.length);
462 		foreach(id; cursorIds)
463 			sb.add(id);
464 
465 		send(sb.data);
466 	}
467 
468 	OP_REPLY query(const(char)[] fullCollectionName, int numberToSkip, int numberToReturn, document query, document returnFieldsSelector = document.init, int flags = 1, int limitTotalEntries = int.max) {
469 		if(flags == 1)
470 			flags = defaultQueryFlags;
471 		MsgHeader header;
472 
473 		header.requestID = ++nextRequestId;
474 		header.opCode = OP.QUERY;
475 
476 		SendBuffer sb;
477 
478 		sb.add(header);
479 
480 		sb.add(flags);
481 		sb.add(fullCollectionName);
482 		sb.add(numberToSkip);
483 		sb.add(numberToReturn);
484 		sb.add(query);
485 
486 		if(returnFieldsSelector != document.init)
487 			sb.add(returnFieldsSelector);
488 
489 		send(sb.data);
490 		
491 		auto reply = stream.readReply();
492 
493 		reply.numberToReturn = numberToReturn;
494 		reply.limitRemaining = limitTotalEntries;
495 		reply.fullCollectionName = fullCollectionName;
496 		reply.current = 0;
497 		reply.connection = this;
498 
499 		return reply;
500 	}
501 
502 	private int nextRequestId;
503 
504 	private void send(const(ubyte)[] data) {
505 		while(data.length) {
506 			auto ret = socket.send(data);
507 			if(ret <= 0)
508 				throw new Exception("wtf");
509 			data = data[ret .. $];
510 		}
511 	}
512 }
513 
514 unittest {
515 	auto uri = Uri("mongo://test:12,foo:14/");
516 	assert(uri.hosts.length == 2);
517 	assert(uri.hosts[0] == UriHost("test", 12));
518 	assert(uri.hosts[1] == UriHost("foo", 14));
519 }
520 
521 interface IReceiveStream {
522 	final OP_REPLY readReply() {
523 		OP_REPLY reply;
524 
525 
526 		reply.header.messageLength = readInt();
527 		reply.header.requestID = readInt();
528 		reply.header.responseTo = readInt();
529 		reply.header.opCode = readInt();
530 
531 		// flag 0: cursor not found. 1: query failure. $err set in a thing.
532 		reply.responseFlags = readInt();
533 		reply.cursorID = readLong();
534 		reply.startingFrom = readInt();
535 		reply.numberReturned = readInt();
536 
537 		reply.documents.length = reply.numberReturned;
538 
539 		foreach(ref doc; reply.documents) {
540 			doc = readBson();
541 		}
542 
543 		return reply;
544 	}
545 
546 	final document readBson() {
547 		document d;
548 		d.bytesCount = readInt();
549 
550 		int remaining = d.bytesCount;
551 		remaining -= 4; // length
552 		remaining -= 1; // terminating zero
553 
554 		while(remaining > 0) {
555 			d.values_ ~= readBsonValue(remaining);
556 		}
557 
558 		d.terminatingZero = readByte();
559 		if(d.terminatingZero != 0)
560 			throw new Exception("something wrong reading bson");
561 
562 		return d;
563 	}
564 
565 	final bson_value readBsonValue(ref int remaining) {
566 		bson_value v;
567 
568 		v.tag = readByte();
569 		remaining--;
570 
571 		v.e_name = readZeroTerminatedChars();
572 		remaining -= v.e_name.length + 1; // include the zero term
573 
574 		switch(v.tag) {
575 			case 0x00: // not supposed to exist!
576 				throw new Exception("invalid bson");
577 			case 0x01:
578 				auto d = readDouble();
579 				remaining -= 8;
580 				v.x01 = d;
581 			break;
582 			case 0x02:
583 				v.x02 = readCharsWithLengthPrefix();
584 				remaining -= 5 + v.x02.length; // length and zero
585 			break;
586 			case 0x03:
587 				v.x03 = readBson();
588 				remaining -= v.x03.bytesCount;
589 			break;
590 			case 0x04:
591 				v.x04 = readBson();
592 				remaining -= v.x04.bytesCount;
593 			break;
594 			case 0x05:
595 				auto length = readInt();
596 				v.x05_tag = readByte();
597 				v.x05_data = readBytes(length);
598 				remaining -= v.x05_data.length + 5;
599 			break;
600 			case 0x06: // undefined
601 				// intentionally blank, no additional data
602 			break;
603 			case 0x07:
604 				foreach(ref b; v.x07) {
605 					b = readByte();
606 					remaining--;
607 				}
608 			break;
609 			case 0x08:
610 				v.x08 = readByte() ? true : false;
611 				remaining--;
612 			break;
613 			case 0x09:
614 				v.x09 = readLong();
615 				remaining -= 8;
616 			break;
617 			case 0x0a: // null
618 				// intentionally blank, no additional data
619 			break;
620 			case 0x0b:
621 				v.x0b_regex = readZeroTerminatedChars();
622 				remaining -= v.x0b_regex.length + 1;
623 				v.x0b_flags = readZeroTerminatedChars();
624 				remaining -= v.x0b_flags.length + 1;
625 			break;
626 			case 0x0d:
627 				v.x0d = readCharsWithLengthPrefix();
628 				remaining -= 5 + v.x0d.length; // length and zero
629 			break;
630 			case 0x10:
631 				v.x10 = readInt();
632 				remaining -= 4;
633 			break;
634 			case 0x11:
635 				v.x11 = readLong();
636 				remaining -= 8;
637 			break;
638 			case 0x12:
639 				v.x12 = readLong();
640 				remaining -= 8;
641 			break;
642 			case 0x13:
643 				foreach(ref b; v.x13) {
644 					b = readByte();
645 					remaining--;
646 				}
647 			break;
648 			default:
649 				import std.conv;
650 				assert(0, "unsupported tag in bson: " ~ to!string(v.tag));
651 		}
652 
653 		return v;
654 	}
655 
656 	final ubyte readByte() {
657 		return readSmall!1[0];
658 	}
659 
660 	final int readInt() {
661 		return readSmall!4.littleEndianToNative!int;
662 	}
663 
664 	final long readLong() {
665 		return readSmall!8.littleEndianToNative!long;
666 	}
667 
668 	final double readDouble() {
669 		return readSmall!8.littleEndianToNative!double;
670 	}
671 	
672 	final string readZeroTerminatedChars() {
673 		return cast(string) readUntilNull();
674 	}
675 
676 	final string readCharsWithLengthPrefix() {
677 		auto length = readInt();
678 		// length prefixed string allows 0 characters
679 		auto bytes = readBytes(length);
680 		if (bytes[$ - 1] != 0)
681 			throw new Exception("Malformed length prefixed string");
682 		return cast(string)bytes[0 .. $ - 1];
683 	}
684 
685 	/// Reads exactly the amount of bytes specified and returns a newly
686 	/// allocated buffer containing the data.
687 	final immutable(ubyte)[] readBytes(size_t length) {
688 		ubyte[] ret = new ubyte[length];
689 		readExact(ret);
690 		return (() @trusted => cast(immutable)ret)();
691 	}
692 
693 	/// Reads a small number of bytes into a stack allocated array. Convenience
694 	/// function calling $(LREF readExact).
695 	ubyte[n] readSmall(size_t n)() {
696 		ubyte[n] buf;
697 		readExact(buf[]);
698 		return buf;
699 	}
700 
701 	/// Reads exactly the expected length into the given buffer argument.
702 	void readExact(scope ubyte[] buffer);
703 
704 	/// Reads until a 0 byte and returns all data before it as newly allocated
705 	/// buffer containing the data.
706 	immutable(ubyte)[] readUntilNull();
707 }
708 
709 class SocketReceiveStream : IReceiveStream {
710 	this(Socket socket) {
711 		this.socket = socket;
712 	}
713 
714 	Socket socket;
715 	ubyte[4096] buffer;
716 	// if bufferLength > bufferPos then following data is linear after bufferPos
717 	// if bufferLength < bufferPos then data is following bufferPos and also from 0..bufferLength
718 	// if bufferLength == bufferPos then all data is read
719 	int bufferLength;
720 	// may only increment until at the end of buffer.length, where it wraps back to 0
721 	int bufferPos;
722 
723 	protected ptrdiff_t receiveSocket(ubyte[] buffer) {
724 		const length = socket.receive(buffer);
725 
726 		if (length == 0)
727 			throw new Exception("Remote side has closed the connection");
728 		else if (length == Socket.ERROR)
729 			throw new Exception(lastSocketError());
730 		else
731 			return length;
732 	}
733 
734 	/// Loads more data after bufferPos if there is space, otherwise load before
735 	/// bufferPos, indicating data is wrapping around.
736 	void loadMore() {
737 		if (bufferPos == bufferLength) {
738 			// we read up all the bytes, start back from 0 for bigger recv buffer
739 			bufferPos = 0;
740 			bufferLength = 0;
741 		}
742 
743 		ptrdiff_t length;
744 		if (bufferLength < bufferPos) {
745 			// length wrapped around before pos
746 			// try to read up to the byte before bufferPos to not trigger the
747 			// bufferPos == bufferLength case
748 			length = receiveSocket(buffer[bufferLength .. bufferPos - 1]);
749 		} else {
750 			length = receiveSocket(buffer[bufferLength .. $ - 1]);
751 		}
752 
753 		bufferLength += length;
754 		if (bufferLength == buffer.length)
755 			bufferLength = 0;
756 	}
757 
758 	void readExact(scope ubyte[] dst) {
759 		if (!dst.length)
760 			return;
761 
762 		if (bufferPos == bufferLength)
763 			loadMore();
764 
765 		auto end = bufferPos + dst.length;
766 		if (end < buffer.length) {
767 			// easy linear copy
768 			// receive until bufferLength wrapped around or we have enough bytes
769 			while (bufferLength >= bufferPos && end > bufferLength)
770 				loadMore();
771 			dst[] = buffer[bufferPos .. bufferPos = cast(typeof(bufferPos))end];
772 		} else {
773 			// copy from wrapping buffer
774 			size_t i;
775 			while (i < dst.length) {
776 				auto remaining = dst.length - i;
777 				if (bufferPos < bufferLength) {
778 					auto d = min(remaining, bufferLength - bufferPos);
779 					dst[i .. i += d] = buffer[bufferPos .. bufferPos += d];
780 				} else if (bufferPos > bufferLength) {
781 					auto d = buffer.length - bufferPos;
782 					if (remaining >= d) {
783 						dst[i .. i += d] = buffer[bufferPos .. $];
784 						dst[i .. i += bufferLength] = buffer[0 .. bufferPos = bufferLength];
785 					} else {
786 						dst[i .. i += remaining] = buffer[bufferPos .. bufferPos += remaining];
787 					}
788 				} else {
789 					loadMore();
790 				}
791 			}
792 		}
793 	}
794 
795 	immutable(ubyte)[] readUntilNull() {
796 		auto ret = appender!(immutable(ubyte)[]);
797 		while (true) {
798 			ubyte[] chunk;
799 			if (bufferPos < bufferLength) {
800 				chunk = buffer[bufferPos .. bufferLength];
801 			} else if (bufferPos > bufferLength) {
802 				chunk = buffer[bufferPos .. $];
803 			} else {
804 				loadMore();
805 				continue;
806 			}
807 
808 			auto zero = chunk.countUntil(0);
809 			if (zero == -1) {
810 				ret ~= chunk;
811 				bufferPos += chunk.length;
812 				if (bufferPos == buffer.length)
813 					bufferPos = 0;
814 			} else {
815 				ret ~= chunk[0 .. zero];
816 				bufferPos += zero + 1;
817 				return ret.data;
818 			}
819 		}
820 	}
821 }
822 
823 unittest {
824 	class MockedSocketReceiveStream : SocketReceiveStream {
825 		int maxChunk = 64;
826 		ubyte generator = 0;
827 
828 		this() {
829 			super(null);
830 		}
831 
832 		protected override ptrdiff_t receiveSocket(ubyte[] buffer) {
833 			if (buffer.length >= maxChunk) {
834 				buffer[0 .. maxChunk] = ++generator;
835 				return maxChunk;
836 			} else {
837 				buffer[] = ++generator;
838 				return buffer.length;
839 			}
840 		}
841 	}
842 
843 	auto stream = new MockedSocketReceiveStream();
844 	stream.buffer[] = 0;
845 	auto b = stream.readBytes(3);
846 	assert(b == [1, 1, 1]);
847 	assert(stream.buffer[0 .. 64].all!(a => a == 1));
848 	assert(stream.buffer[64 .. $].all!(a => a == 0));
849 	b = stream.readBytes(3);
850 	assert(b == [1, 1, 1]);
851 	assert(stream.buffer[64 .. $].all!(a => a == 0));
852 	assert(stream.bufferPos == 6);
853 	assert(stream.bufferLength == 64);
854 	b = stream.readBytes(64);
855 	assert(b[0 .. $ - 6].all!(a => a == 1));
856 	assert(b[$ - 6 .. $].all!(a => a == 2));
857 	assert(stream.bufferPos == 70);
858 	assert(stream.bufferLength == 128);
859 	b = stream.readBytes(57);
860 	assert(b.all!(a => a == 2));
861 	assert(stream.bufferPos == 127);
862 	assert(stream.bufferLength == 128);
863 	b = stream.readBytes(8000);
864 	assert(b[0] == 2);
865 	assert(b[1 .. 65].all!(a => a == 3));
866 	assert(b[65 .. 129].all!(a => a == 4));
867 
868 	stream.buffer[] = 0;
869 	stream.bufferPos = stream.bufferLength = 0;
870 	stream.generator = 0;
871 	assert(stream.readUntilNull().length == 64 * 255);
872 }
873 
874 struct SendBuffer {
875 	private ubyte[4096] backing;
876 
877 	private ubyte[] buffer;
878 	private size_t position;
879 
880 	ubyte[] data() {
881 		assert(position > 4);
882 
883 		resetSize();
884 		return buffer[0 .. position];
885 	}
886 
887 	private void size(size_t addition) {
888 		if(buffer is null)
889 			buffer = backing[];
890 		while (position + addition > buffer.length) {
891 			buffer.length = buffer.length * 2;
892 		}
893 	}
894 
895 	void add(const document d) {
896 		SendBuffer sb;
897 
898 		sb.add(d.bytesCount);
899 		foreach(v; d.values)
900 			sb.add(v);
901 		sb.addByte(0);
902 
903 		auto data = sb.data;
904 		//import std.stdio; writefln("%(%x %)",data);
905 
906 		this.add(data);
907 	}
908 
909 	void add(const MsgHeader header) {
910 		add(header.messageLength);
911 		add(header.requestID);
912 		add(header.responseTo);
913 		add(header.opCode);
914 	}
915 
916 	void add(const bson_value v) {
917 		addByte(v.tag);
918 		add(v.name);
919 
920 		static struct visitor {
921 			this(SendBuffer* buf) { this.buf = buf; }
922 			SendBuffer* buf;
923 			void visit(long v) { buf.add(v); }
924 			void visit(int v) { buf.add(v); }
925 			void visit(bool v) { buf.addByte(v ? 1 : 0); }
926 			void visit(double v) { buf.add(v); }
927 			void visit(const document v) { buf.add(v); }
928 			void visit(const(char)[] v) { buf.add(cast(int) v.length + 1); buf.add(v); }
929 			void visit(const typeof(null)) {  }
930 			void visit(ubyte tag, const(ubyte)[] v) { buf.add(cast(int) v.length); buf.addByte(tag); buf.add(v); }
931 			void visit(const Undefined v) {  }
932 			void visit(const ObjectId v) { buf.add(v.v[]); }
933 			void visit(const Javascript v) { buf.add(cast(int) v.v.length + 1); buf.add(v.v); }
934 			void visit(const Timestamp v) { buf.add(v.v); }
935 			void visit(const UtcTimestamp v) { buf.add(v.v); }
936 			void visit(const Decimal128 v) { buf.add(v.v[]); }
937 			void visit(const RegEx v) { buf.add(v.regex); buf.add(v.flags); }
938 
939 			void visit(const(bson_value)[] v) { buf.add(document(v));  }
940 		}
941 
942 		visitor mv = visitor(&this);
943 		v.visit(mv);
944 	}
945 
946 
947 	void addByte(ubyte a) {
948 		size(1);
949 		buffer[position++] = (a >> 0) & 0xff;
950 	}
951 
952 	void add(double i) {
953 		long a = *(cast(long*) &i);
954 		add(a);
955 	}
956 
957 	void add(int a) {
958 		size(4);
959 		buffer[position++] = (a >> 0) & 0xff;
960 		buffer[position++] = (a >> 8) & 0xff;
961 		buffer[position++] = (a >> 16) & 0xff;
962 		buffer[position++] = (a >> 24) & 0xff;
963 	}
964 
965 	void add(long a) {
966 		size(8);
967 		buffer[position++] = (a >> 0) & 0xff;
968 		buffer[position++] = (a >> 8) & 0xff;
969 		buffer[position++] = (a >> 16) & 0xff;
970 		buffer[position++] = (a >> 24) & 0xff;
971 		buffer[position++] = (a >> 32) & 0xff;
972 		buffer[position++] = (a >> 40) & 0xff;
973 		buffer[position++] = (a >> 48) & 0xff;
974 		buffer[position++] = (a >> 56) & 0xff;
975 	}
976 
977 	// does NOT write the length out first!
978 	void add(const(char)[] a) {
979 		size(a.length + 1);
980 		buffer[position .. position + a.length] = cast(ubyte[]) a[];
981 		position += a.length;
982 		buffer[position++] = 0;
983 	}
984 
985 	// does NOT write the length out first!
986 	void add(const(ubyte)[] a) {
987 		size(a.length);
988 		buffer[position .. position + a.length] = cast(ubyte[]) a[];
989 		position += a.length;
990 	}
991 
992 	private void resetSize() {
993 		auto sz = cast(int) position;
994 		buffer[0] = (sz >> 0) & 0xff;
995 		buffer[1] = (sz >> 8) & 0xff;
996 		buffer[2] = (sz >> 16) & 0xff;
997 		buffer[3] = (sz >> 24) & 0xff;
998 	}
999 }
1000 
1001 unittest {
1002 	// A test for correct buffer allocation
1003 	import std.algorithm;
1004 	import std.range;
1005 	import std.conv;
1006 
1007 	// a large BSON array
1008 	auto anArray = iota(100000)
1009 		.enumerate()
1010 		.map!(tup => bson_value(tup.index.to!string, tup.value))
1011 		.array();
1012 
1013 	auto doc = document([bson_value("array", anArray)]);
1014 	SendBuffer buf;
1015 	buf.add(doc);
1016 }
1017 
1018 struct MsgHeader {
1019 	int messageLength;
1020 	int requestID;
1021 	int responseTo;
1022 	int opCode;
1023 }
1024 
1025 enum OP {
1026 	REPLY = 1,
1027 	UPDATE = 2001,
1028 	INSERT = 2002,
1029 	QUERY = 2004, // sends a reply
1030 	GET_MORE = 2005, // sends a reply
1031 	DELETE = 2006,
1032 	KILL_CURSORS = 2007,
1033 	MSG = 2013
1034 }
1035 
1036 struct OP_UPDATE {
1037 	MsgHeader header;
1038 	int zero;
1039 	const(char)[] fullCollectionName;
1040 	int flags; // bit 0 == upsert, bit 1 == multiupdate if
1041 	document selector;
1042 	document update;
1043 }
1044 
1045 struct OP_INSERT {
1046 	MsgHeader header;
1047 	int flags; // bit 0 == ContinueOnError
1048 	const(char)[] fullCollectionName;
1049 	document[] documents;
1050 }
1051 
1052 struct OP_QUERY {
1053 	MsgHeader header;
1054 	int flags; // SEE: https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-query
1055 	const(char)[] fullCollectionName;
1056 	int numberToSkip;
1057 	int numberToReturn;
1058 	document query;
1059 	document returnFieldsSelector; // optional.....
1060 }
1061 
1062 struct OP_GET_MORE {
1063 	MsgHeader header;
1064 	int zero;
1065 	const(char)[] fullCollectionName;
1066 	int numberToReturn;
1067 	long cursorID;
1068 }
1069 
1070 struct OP_DELETE {
1071 	MsgHeader header;
1072 	int zero;
1073 	const(char)[] fullCollectionName;
1074 	int flags; // bit 0 is single remove
1075 	document selector;
1076 }
1077 
1078 // If a cursor is read until exhausted (read until OP_QUERY or OP_GET_MORE returns zero for the cursor id), there is no need to kill the cursor.
1079 struct OP_KILL_CURSORS {
1080 	MsgHeader header;
1081 	int zero;
1082 	int numberOfCursorIds;
1083 	const(long)[] cursorIDs;
1084 }
1085 
1086 /+
1087 // in mongo 3.6
1088 struct OP_MSG {
1089 	MsgHeader header,
1090 	uint flagBits;
1091 	Section[]
1092 }
1093 +/
1094 
1095 struct OP_REPLY {
1096 	MsgHeader header;
1097 	int responseFlags; // flag 0: cursor not found. 1: query failure. $err set in a thing.
1098 	long cursorID;
1099 	int startingFrom;
1100 	int numberReturned;
1101 	document[] documents;
1102 
1103 	/* range elements */
1104 	int numberToReturn;
1105 	const(char)[] fullCollectionName;
1106 	size_t current;
1107 	MongoConnection connection;
1108 	int limitRemaining = int.max;
1109 
1110 	string errorMessage;
1111 	int errorCode;
1112 
1113 	@property bool empty() {
1114 		return errorCode != 0 || numberReturned == 0 || limitRemaining <= 0;
1115 	}
1116 
1117 	void popFront() {
1118 		limitRemaining--;
1119 		current++;
1120 		if(current == numberReturned) {
1121 			this = connection.getMore(fullCollectionName, numberToReturn, cursorID, limitRemaining);
1122 			if(numberReturned == 1) {
1123 				auto err = documents[0]["$err"];
1124 				auto ecode = documents[0]["code"];
1125 				if(err !is bson_value.init) {
1126 					errorMessage = err.get!string;
1127 					errorCode = ecode.get!int;
1128 				}
1129 			}
1130 		}
1131 	}
1132 
1133 	@property document front() {
1134 		return documents[current];
1135 	}
1136 }
1137 
1138 /* bson */
1139 
1140 struct document {
1141 	private int bytesCount;
1142 	private const(bson_value)[] values_;
1143 	private ubyte terminatingZero;
1144 
1145 	const(bson_value) opIndex(string name) {
1146 		foreach(value; values)
1147 			if(value.name == name)
1148 				return value;
1149 		return bson_value.init;
1150 	}
1151 
1152 	this(const(bson_value)[] values) {
1153 		values_ = values;
1154 		terminatingZero = 0;
1155 		foreach(v; values)
1156 			bytesCount += v.size;
1157 	}
1158 
1159 	size_t length() const {
1160 		return values_.length;
1161 	}
1162 
1163 	const(bson_value)[] values() const {
1164 		return values_;
1165 	}
1166 
1167 	string toString(int indent = 0) const {
1168 		string s;
1169 
1170 		s ~= "{\n";
1171 
1172 		foreach(value; values) {
1173 			foreach(i; 0 .. indent + 1) s ~= "\t";
1174 			s ~= value.name;
1175 			s ~= ": ";
1176 			s ~= value.toString();
1177 			s ~= "\n";
1178 		}
1179 
1180 		foreach(i; 0 .. indent) s ~= "\t";
1181 		s ~= "}\n";
1182 
1183 		return s;
1184 	}
1185 }
1186 
1187 struct ObjectId {
1188 	ubyte[12] v;
1189 	string toString() const {
1190 		import std.format;
1191 		return format("ObjectId(%(%02x%))", v[]);
1192 	}
1193 
1194 	this(const ubyte[12] v) {
1195 		this.v = v;
1196 	}
1197 
1198 	this(string s) {
1199 		import std.algorithm;
1200 		if(s.startsWith("ObjectId("))
1201 			s = s["ObjectId(".length .. $-1];
1202 		if(s.length != 24)
1203 			throw new Exception("invalid object id: " ~ s);
1204 		static int hexToDec(char c) {
1205 			if(c >= '0' && c <= '9')
1206 				return c - '0';
1207 			if(c >= 'A' && c <= 'F')
1208 				return c - 'A' + 10;
1209 			if(c >= 'a' && c <= 'f')
1210 				return c - 'a' + 10;
1211 			throw new Exception("Invalid hex char " ~ c);
1212 		}
1213 		foreach(ref b; v) {
1214 			b = cast(ubyte) ((hexToDec(s[0]) << 4) | hexToDec(s[1]));
1215 			s = s[2 .. $];
1216 		}
1217 	}
1218 
1219 	int opCmp(ObjectId id) {
1220 		import std.algorithm;
1221 		return cmp(cast(ubyte[])this.v, cast(ubyte[])id.v);
1222 	}
1223 }
1224 struct UtcTimestamp {
1225 	long v;
1226 }
1227 struct RegEx {
1228 	const(char)[] regex;
1229 	const(char)[] flags;
1230 }
1231 struct Javascript {
1232 	string v;
1233 }
1234 struct Timestamp {
1235 	ulong v;
1236 }
1237 struct Decimal128 {
1238 	ubyte[16] v;
1239 }
1240 struct Undefined {}
1241 
1242 
1243 struct ToStringVisitor(T) if (isSomeString!T) {
1244 	import std.conv;
1245 
1246 	// of course this could have been a template but I wrote it out long-form for copy/paste purposes
1247 	T visit(const double v) { return to!T(v); }
1248 	T visit(const string v) { return to!T(v); }
1249 	T visit(const document v) { return to!T(v); }
1250 	T visit(const const(bson_value)[] v) { return to!T(v); }
1251 	T visit(const ubyte tag, const ubyte[] v) { return to!T(v); }
1252 	T visit(const ObjectId v) { return to!T(v); }
1253 	T visit(const bool v) { return to!T(v); }
1254 	T visit(const UtcTimestamp v) { return to!T(v); }
1255 	T visit(const typeof(null)) { return to!T(null); }
1256 	T visit(const RegEx v) { return to!T(v); }
1257 	T visit(const Javascript v) { return to!T(v); }
1258 	T visit(const int v) { return to!T(v); }
1259 	T visit(const Undefined) { return "undefined".to!T; }
1260 	T visit(const Timestamp v) { return to!T(v); }
1261 	T visit(const long v) { return to!T(v); }
1262 	T visit(const Decimal128 v) { return to!T(v); }
1263 }
1264 
1265 struct GetVisitor(T) {
1266 	T visit(V)(const V t) {
1267 		static if (isIntegral!V) {
1268 			static if(isIntegral!T || isFloatingPoint!T)
1269 				return cast(T)t;
1270 			else throw new Exception("incompatible type");
1271 		} else static if (isFloatingPoint!V) {
1272 			static if(isFloatingPoint!T)
1273 				return cast(T)t;
1274 			else throw new Exception("incompatible type");
1275 		} else {
1276 			static if(is(V : T))
1277 				return t;
1278 			else throw new Exception("incompatible type");
1279 		}
1280 	}
1281 
1282 	T visit(ubyte tag, const(ubyte)[] v) {
1283 		static if(is(typeof(v) : T))
1284 			return v;
1285 		else throw new Exception("incompatible type " ~ T.stringof);
1286 	}
1287 }
1288 
1289 struct bson_value {
1290 	private ubyte tag;
1291 	private const(char)[] e_name;
1292 
1293 	// It only allows integer types or const getting in order to work right in the visitor...
1294 	/// Tries to get the value matching exactly this type. The type will convert
1295 	/// between different floating point types and integral types as well as
1296 	/// perform a conversion from integral types to floating point types.
1297 	T get(T)() const if (__traits(compiles, GetVisitor!T)) {
1298 		GetVisitor!T v;
1299 		return visit(v);
1300 	}
1301 
1302 	const(char)[] name() const {
1303 		return e_name;
1304 	}
1305 
1306 	this(const(char)[] name, bson_value v) {
1307 		this = v;
1308 		e_name = name;
1309 	}
1310 
1311 	this(const(char)[] name, double v) {
1312 		e_name = name;
1313 		tag = 0x01;
1314 		x01 = v;
1315 	}
1316 	this(const(char)[] name, string v) {
1317 		e_name = name;
1318 		tag = 0x02;
1319 		x02 = v;
1320 	}
1321 	this(const(char)[] name, document v) {
1322 		e_name = name;
1323 		tag = 0x03;
1324 		x03 = v;
1325 	}
1326 	this(const(char)[] name, bson_value[] v) {
1327 		e_name = name;
1328 		tag = 0x04;
1329 		const(bson_value)[] n;
1330 		n.reserve(v.length);
1331 		import std.conv;
1332 		foreach(idx, i; v)
1333 			n ~= bson_value(to!string(idx), i);
1334 		x04 = document(n);
1335 	}
1336 	this(const(char)[] name, const(ubyte)[] v, ubyte tag = 0x00) {
1337 		e_name = name;
1338 		this.tag = 0x05;
1339 		x05_tag = tag;
1340 		x05_data = v;
1341 	}
1342 	this(const(char)[] name, ObjectId v) {
1343 		e_name = name;
1344 		tag = 0x07;
1345 		x07 = v.v;
1346 	}
1347 	this(const(char)[] name, bool v) {
1348 		e_name = name;
1349 		tag = 0x08;
1350 		x08 = v;
1351 	}
1352 	this(const(char)[] name, UtcTimestamp v) {
1353 		e_name = name;
1354 		tag = 0x09;
1355 		x09 = v.v;
1356 	}
1357 	this(const(char)[] name, typeof(null)) {
1358 		e_name = name;
1359 		tag = 0x0a;
1360 	}
1361 	this(const(char)[] name, RegEx v) {
1362 		e_name = name;
1363 		tag = 0x0b;
1364 		x0b_regex = v.regex;
1365 		x0b_flags = v.flags;
1366 	}
1367 	this(const(char)[] name, Javascript v) {
1368 		e_name = name;
1369 		tag = 0x0d;
1370 		x0d = v.v;
1371 	}
1372 	this(const(char)[] name, int v) {
1373 		e_name = name;
1374 		tag = 0x10;
1375 		x10 = v;
1376 	}
1377 	this(const(char)[] name, Timestamp v) {
1378 		e_name = name;
1379 		tag = 0x11;
1380 		x11 = v.v;
1381 	}
1382 	this(const(char)[] name, long v) {
1383 		e_name = name;
1384 		tag = 0x12;
1385 		x12 = v;
1386 	}
1387 	this(const(char)[] name, Decimal128 v) {
1388 		e_name = name;
1389 		tag = 0x13;
1390 		x13 = v.v;
1391 	}
1392 
1393 	auto visit(T)(T t) const {
1394 		switch(tag) {
1395 			case 0x00: throw new Exception("invalid bson");
1396 			case 0x01: return t.visit(x01);
1397 			case 0x02: return t.visit(x02);
1398 			case 0x03: return t.visit(x03);
1399 			case 0x04: return t.visit(x04.values);
1400 			case 0x05: return t.visit(x05_tag, x05_data);
1401 			case 0x06: return t.visit(Undefined());
1402 			case 0x07: return t.visit(ObjectId(x07));
1403 			case 0x08: return t.visit(x08);
1404 			case 0x09: return t.visit(UtcTimestamp(x09));
1405 			case 0x0a: return t.visit(null);
1406 			case 0x0b: return t.visit(RegEx(x0b_regex, x0b_flags));
1407 			case 0x0d: return t.visit(Javascript(x0d));
1408 			case 0x10: return t.visit(x10);
1409 			case 0x11: return t.visit(Timestamp(x11));
1410 			case 0x12: return t.visit(x12);
1411 			case 0x13: return t.visit(Decimal128(x13));
1412 			default:
1413 				import std.conv;
1414 				assert(0, "unsupported tag in bson: " ~ to!string(tag));
1415 
1416 		}
1417 	}
1418 
1419 	string toString() const {
1420 		ToStringVisitor!string v;
1421 		return visit(v);
1422 	}
1423 
1424 	private union {
1425 		void* zero;
1426 		double x01;
1427 		string x02; // given with an int length preceding and 0 terminator
1428 		document x03; // child object
1429 		document x04; // array with indexes as key values
1430 		struct {
1431 			ubyte x05_tag;
1432 			const(ubyte)[] x05_data; // binary data
1433 		}
1434 		void* undefined;
1435 		ubyte[12] x07; // a guid/ ObjectId
1436 		bool x08;
1437 		long x09; // utc datetime stamp
1438 		void* x0a; // null
1439 		struct {
1440 			const(char)[] x0b_regex;
1441 			const(char)[] x0b_flags; // alphabetized
1442 		}
1443 		string x0d; // javascript
1444 		int x10; // integer!!! omg
1445 		ulong x11; // timestamp
1446 		long x12; // integer!!!
1447 		ubyte[16] x13; // decimal 128
1448 	}
1449 
1450 	size_t size() const {
1451 		auto count = 2 + e_name.length;
1452 
1453 		switch(this.tag) {
1454 			case 0x00: // not supposed to exist!
1455 				throw new Exception("invalid bson");
1456 			case 0x01:
1457 				count += 8;
1458 			break;
1459 			case 0x02:
1460 				count += 5 + x02.length; // length and zero
1461 			break;
1462 			case 0x03:
1463 				count += x03.bytesCount;
1464 			break;
1465 			case 0x04:
1466 				count += x04.bytesCount;
1467 			break;
1468 			case 0x05:
1469 				count += x05_data.length;
1470 				count += 5; // length and tag
1471 			break;
1472 			case 0x06: // undefined
1473 				// intentionally blank, no additional data
1474 			break;
1475 			case 0x07:
1476 				count += x07.length;
1477 			break;
1478 			case 0x08:
1479 				count++;
1480 			break;
1481 			case 0x09:
1482 				count += 8;
1483 			break;
1484 			case 0x0a: // null
1485 				// intentionally blank, no additional data
1486 			break;
1487 			case 0x0b:
1488 				count += x0b_regex.length + 1;
1489 				count += x0b_flags.length + 1;
1490 			break;
1491 			case 0x0d:
1492 				count += 5 + x0d.length; // length and zero
1493 			break;
1494 			case 0x10:
1495 				count += 4;
1496 			break;
1497 			case 0x11:
1498 				count += 8;
1499 			break;
1500 			case 0x12:
1501 				count += 8;
1502 			break;
1503 			case 0x13:
1504 				count += x13.length;
1505 			break;
1506 			default:
1507 				import std.conv;
1508 				assert(0, "unsupported tag in bson: " ~ to!string(tag));
1509 		}
1510 
1511 		return count;
1512 	}
1513 }
1514 
1515 struct UriHost {
1516 	string host;
1517 	int port;
1518 }
1519 
1520 /* copy/pasted from arsd.cgi, modified for mongo-specific comma behavior. used with permission */
1521 struct Uri {
1522 	// scheme//userinfo@host:port/path?query#fragment
1523 
1524 	string scheme; /// e.g. "http" in "http://example.com/"
1525 	string userinfo; /// the username (and possibly a password) in the uri
1526 	UriHost[] hosts;
1527 	string path; /// e.g. "/folder/file.html" in "http://example.com/folder/file.html"
1528 	string query; /// the stuff after the ? in a uri
1529 	string fragment; /// the stuff after the # in a uri.
1530 
1531 	/// Breaks down a uri string to its components
1532 	this(string uri) {
1533 		reparse(uri);
1534 	}
1535 
1536 	private void reparse(string uri) {
1537 		// from RFC 3986
1538 		// the ctRegex triples the compile time and makes ugly errors for no real benefit
1539 		// it was a nice experiment but just not worth it.
1540 		// enum ctr = ctRegex!r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?";
1541 		/*
1542 			Captures:
1543 				0 = whole url
1544 				1 = scheme, with :
1545 				2 = scheme, no :
1546 				3 = authority, with //
1547 				4 = authority, no //
1548 				5 = path
1549 				6 = query string, with ?
1550 				7 = query string, no ?
1551 				8 = anchor, with #
1552 				9 = anchor, no #
1553 		*/
1554 		// Yikes, even regular, non-CT regex is also unacceptably slow to compile. 1.9s on my computer!
1555 		// instead, I will DIY and cut that down to 0.6s on the same computer.
1556 		/*
1557 
1558 				Note that authority is
1559 					user:password@domain:port
1560 				where the user:password@ part is optional, and the :port is optional.
1561 
1562 				Regex translation:
1563 
1564 				Scheme cannot have :, /, ?, or # in it, and must have one or more chars and end in a :. It is optional, but must be first.
1565 				Authority must start with //, but cannot have any other /, ?, or # in it. It is optional.
1566 				Path cannot have any ? or # in it. It is optional.
1567 				Query must start with ? and must not have # in it. It is optional.
1568 				Anchor must start with # and can have anything else in it to end of string. It is optional.
1569 		*/
1570 
1571 		this = Uri.init; // reset all state
1572 
1573 		// empty uri = nothing special
1574 		if(uri.length == 0) {
1575 			return;
1576 		}
1577 
1578 		size_t idx;
1579 
1580 		scheme_loop: foreach(char c; uri[idx .. $]) {
1581 			switch(c) {
1582 				case ':':
1583 				case '/':
1584 				case '?':
1585 				case '#':
1586 					break scheme_loop;
1587 				default:
1588 			}
1589 			idx++;
1590 		}
1591 
1592 		if(idx == 0 && uri[idx] == ':') {
1593 			// this is actually a path! we skip way ahead
1594 			goto path_loop;
1595 		}
1596 
1597 		if(idx == uri.length) {
1598 			// the whole thing is a path, apparently
1599 			path = uri;
1600 			return;
1601 		}
1602 
1603 		if(idx > 0 && uri[idx] == ':') {
1604 			scheme = uri[0 .. idx];
1605 			idx++;
1606 		} else {
1607 			// we need to rewind; it found a / but no :, so the whole thing is prolly a path...
1608 			idx = 0;
1609 		}
1610 
1611 		if(idx + 2 < uri.length && uri[idx .. idx + 2] == "//") {
1612 			// we have an authority....
1613 			idx += 2;
1614 
1615 			auto authority_start = idx;
1616 			authority_loop: foreach(char c; uri[idx .. $]) {
1617 				switch(c) {
1618 					case '/':
1619 					case '?':
1620 					case '#':
1621 						break authority_loop;
1622 					default:
1623 				}
1624 				idx++;
1625 			}
1626 
1627 			auto authority = uri[authority_start .. idx];
1628 
1629 			auto idx2 = authority.indexOf("@");
1630 			if(idx2 != -1) {
1631 				userinfo = authority[0 .. idx2];
1632 				authority = authority[idx2 + 1 .. $];
1633 			}
1634 
1635 			auto remaining = authority;
1636 
1637 			while(remaining.length) {
1638 				auto idx3 = remaining.indexOf(",");
1639 				if(idx3 != -1) {
1640 					authority = remaining[0 .. idx3];
1641 					remaining = remaining[idx3 + 1 .. $];
1642 				} else {
1643 					authority = remaining;
1644 					remaining = null;
1645 				}
1646 
1647 				string host;
1648 				int port;
1649 
1650 				idx2 = authority.indexOf(":");
1651 				if(idx2 == -1) {
1652 					port = 0; // 0 means not specified; we should use the default for the scheme
1653 					host = authority;
1654 				} else {
1655 					host = authority[0 .. idx2];
1656 					port = to!int(authority[idx2 + 1 .. $]);
1657 				}
1658 
1659 				hosts ~= UriHost(host, port);
1660 			}
1661 		}
1662 
1663 		path_loop:
1664 		auto path_start = idx;
1665 		
1666 		foreach(char c; uri[idx .. $]) {
1667 			if(c == '?' || c == '#')
1668 				break;
1669 			idx++;
1670 		}
1671 
1672 		path = uri[path_start .. idx];
1673 
1674 		if(idx == uri.length)
1675 			return; // nothing more to examine...
1676 
1677 		if(uri[idx] == '?') {
1678 			idx++;
1679 			auto query_start = idx;
1680 			foreach(char c; uri[idx .. $]) {
1681 				if(c == '#')
1682 					break;
1683 				idx++;
1684 			}
1685 			query = uri[query_start .. idx];
1686 		}
1687 
1688 		if(idx < uri.length && uri[idx] == '#') {
1689 			idx++;
1690 			fragment = uri[idx .. $];
1691 		}
1692 
1693 		// uriInvalidated = false;
1694 	}
1695 }
1696 /* end */
1697 
1698 
1699 version(linux)
1700 	@trusted
1701 	extern(C)
1702 	int getentropy(scope void*, size_t);
1703 else version(Windows) {
1704 	import core.sys.windows.windows;
1705 	import core.sys.windows.ntdef;
1706 	enum STATUS_SUCCESS = 0;
1707 	pragma(lib, "bcrypt");
1708 	@trusted
1709 	extern(Windows)
1710 	NTSTATUS BCryptGenRandom(scope void*, PUCHAR, ULONG, ULONG);
1711 } else static assert(0);
1712 
1713 
1714 
1715 
1716 // copy pasted from vibe.d; https://raw.githubusercontent.com/vibe-d/vibe.d/master/mongodb/vibe/db/mongo/sasl.d
1717 
1718 /*
1719 	SASL authentication functions
1720 
1721 	Copyright: © 2012-2016 Nicolas Gurrola
1722 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
1723 	Authors: Nicolas Gurrola
1724 */
1725 
1726 import std.algorithm;
1727 import std.base64;
1728 import std.conv;
1729 import std.digest.hmac;
1730 import std.digest.sha;
1731 import std.exception;
1732 import std.format;
1733 import std..string;
1734 import std.traits;
1735 import std.utf;
1736 
1737 @safe:
1738 
1739 package struct ScramState
1740 {
1741 	@safe:
1742 
1743 	private string m_firstMessageBare;
1744 	private string m_nonce;
1745 	private DigestType!SHA1 m_saltedPassword;
1746 	private string m_authMessage;
1747 
1748 	string createInitialRequest(string user)
1749 	{
1750 		ubyte[18] randomBytes;
1751 		version(linux) {
1752 			if(getentropy(&(randomBytes[0]), randomBytes.length) != 0)
1753 				throw new Exception("get random failure");
1754 		} else version(Windows) {
1755 			if(BCryptGenRandom(null, &(randomBytes[0]), randomBytes.length, 2 /*BCRYPT_USE_SYSTEM_PREFERRED_RNG */) != STATUS_SUCCESS)
1756 				throw new Exception("get random failure");
1757 		} else static assert(0);
1758 
1759 		m_nonce = Base64.encode(randomBytes);
1760 
1761 		m_firstMessageBare = format("n=%s,r=%s", escapeUsername(user), m_nonce);
1762 		return format("n,,%s", m_firstMessageBare);
1763 	}
1764 
1765 	version (unittest) private string createInitialRequestWithFixedNonce(string user, string nonce)
1766 	{
1767 		m_nonce = nonce;
1768 
1769 		m_firstMessageBare = format("n=%s,r=%s", escapeUsername(user), m_nonce);
1770 		return format("n,,%s", m_firstMessageBare);
1771 	}
1772 
1773 	// MongoDB drivers require 4096 min iterations https://github.com/mongodb/specifications/blob/59390a7ab2d5c8f9c29b8af1775ff25915c44036/source/auth/auth.rst#scram-sha-1
1774 	string update(string password, string challenge, int minIterations = 4096)
1775 	{
1776 		string serverFirstMessage = challenge;
1777 
1778 		string next = challenge.find(',');
1779 		if (challenge.length < 2 || challenge[0 .. 2] != "r=" || next.length < 3 || next[1 .. 3] != "s=")
1780 			throw new Exception("Invalid server challenge format: " ~ challenge);
1781 		string serverNonce = challenge[2 .. $ - next.length];
1782 		challenge = next[3 .. $];
1783 		next = challenge.find(',');
1784 		ubyte[] salt = Base64.decode(challenge[0 .. $ - next.length]);
1785 
1786 		if (next.length < 3 || next[1 .. 3] != "i=")
1787 			throw new Exception("Invalid server challenge format");
1788 		int iterations = next[3 .. $].to!int();
1789 
1790 		if (iterations < minIterations)
1791 			throw new Exception("Server must request at least " ~ minIterations.to!string ~ " iterations");
1792 
1793 		if (serverNonce[0 .. m_nonce.length] != m_nonce)
1794 			throw new Exception("Invalid server nonce received");
1795 		string finalMessage = format("c=biws,r=%s", serverNonce);
1796 
1797 		m_saltedPassword = pbkdf2(password.representation, salt, iterations);
1798 		m_authMessage = format("%s,%s,%s", m_firstMessageBare, serverFirstMessage, finalMessage);
1799 
1800 		auto proof = getClientProof(m_saltedPassword, m_authMessage);
1801 		return format("%s,p=%s", finalMessage, Base64.encode(proof));
1802 	}
1803 
1804 	string finalize(string challenge)
1805 	{
1806 		if (challenge.length < 2 || challenge[0 .. 2] != "v=")
1807 		{
1808 			throw new Exception("Invalid server signature format");
1809 		}
1810 		if (!verifyServerSignature(Base64.decode(challenge[2 .. $]), m_saltedPassword, m_authMessage))
1811 		{
1812 			throw new Exception("Invalid server signature");
1813 		}
1814 		return null;
1815 	}
1816 
1817 	private static string escapeUsername(string user)
1818 	{
1819 		char[] buffer;
1820 		foreach (i, dchar ch; user)
1821 		{
1822 			if (ch == ',' || ch == '=') {
1823 				if (!buffer) {
1824 					buffer.reserve(user.length + 2);
1825 					buffer ~= user[0 .. i];
1826 				}
1827 				if (ch == ',')
1828 					buffer ~= "=2C";
1829 				else
1830 					buffer ~= "=3D";
1831 			} else if (buffer)
1832 				encode(buffer, ch);
1833 		}
1834 		return buffer ? () @trusted { return assumeUnique(buffer); } () : user;
1835 	}
1836 
1837 	unittest
1838 	{
1839 		string user = "user";
1840 		assert(escapeUsername(user) == user);
1841 		assert(escapeUsername(user) is user);
1842 		assert(escapeUsername("user,1") == "user=2C1");
1843 		assert(escapeUsername("user=1") == "user=3D1");
1844 		assert(escapeUsername("u,=ser1") == "u=2C=3Dser1");
1845 		assert(escapeUsername("u=se=r1") == "u=3Dse=3Dr1");
1846 	}
1847 
1848 	private static auto getClientProof(DigestType!SHA1 saltedPassword, string authMessage)
1849 	{
1850 		auto clientKey = () @trusted { return hmac!SHA1("Client Key".representation, saltedPassword); } ();
1851 		auto storedKey = sha1Of(clientKey);
1852 		auto clientSignature = () @trusted { return hmac!SHA1(authMessage.representation, storedKey); } ();
1853 
1854 		foreach (i; 0 .. clientKey.length)
1855 		{
1856 			clientKey[i] = clientKey[i] ^ clientSignature[i];
1857 		}
1858 		return clientKey;
1859 	}
1860 
1861 	private static bool verifyServerSignature(ubyte[] signature, DigestType!SHA1 saltedPassword, string authMessage)
1862 	@trusted {
1863 		auto serverKey = hmac!SHA1("Server Key".representation, saltedPassword);
1864 		auto serverSignature = hmac!SHA1(authMessage.representation, serverKey);
1865 		return serverSignature == signature;
1866 	}
1867 }
1868 
1869 private DigestType!SHA1 pbkdf2(const ubyte[] password, const ubyte[] salt, int iterations)
1870 {
1871 	import std.bitmanip;
1872 
1873 	ubyte[4] intBytes = [0, 0, 0, 1];
1874 	auto last = () @trusted { return hmac!SHA1(salt, intBytes[], password); } ();
1875 	static assert(isStaticArray!(typeof(last)),
1876 		"Code is written so that the hash array is expected to be placed on the stack");
1877 	auto current = last;
1878 	foreach (i; 1 .. iterations)
1879 	{
1880 		last = () @trusted { return hmac!SHA1(last[], password); } ();
1881 		foreach (j; 0 .. current.length)
1882 		{
1883 			current[j] = current[j] ^ last[j];
1884 		}
1885 	}
1886 	return current;
1887 }
1888 
1889 unittest {
1890 	ScramState state;
1891 	assert(state.createInitialRequestWithFixedNonce("user", "fyko+d2lbbFgONRv9qkxdawL")
1892 		== "n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL");
1893 	auto last = state.update(makeDigest("user", "pencil"),
1894 		"r=fyko+d2lbbFgONRv9qkxdawLHo+Vgk7qvUOKUwuWLIWg4l/9SraGMHEE,s=rQ9ZY3MntBeuP3E1TDVC4w==,i=10000");
1895 	assert(last == "c=biws,r=fyko+d2lbbFgONRv9qkxdawLHo+Vgk7qvUOKUwuWLIWg4l/9SraGMHEE,p=MC2T8BvbmWRckDw8oWl5IVghwCY=",
1896 		last);
1897 	last = state.finalize("v=UMWeI25JD1yNYZRMpZ4VHvhZ9e0=");
1898 	assert(last == "", last);
1899 }
1900 
1901 string makeDigest(string username, string password) {
1902 	import std.digest.md;
1903 	return md5Of(username ~ ":mongo:" ~ password).toHexString().idup.toLower();
1904 }